How to Integrate Snowflake with Airflow

Galaxy Glossary

How do I integrate Snowflake with Apache Airflow?

Snowflake-Airflow integration lets you schedule and monitor Snowflake SQL jobs through Python DAGs.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.

Description

Table of Contents

How does Airflow connect to Snowflake?

Airflow uses the SnowflakeOperator and a snowflake_conn_id stored in the Airflow connection UI. The operator authenticates with username/password, key-pair, or OAuth and then sends SQL to Snowflake.

What are the main steps?

1) Create a Snowflake user/role. 2) Add a Snowflake connection in Airflow. 3) Install apache-airflow-providers-snowflake. 4) Define a DAG with SnowflakeOperator. 5) Trigger or schedule the DAG.

How to write a SnowflakeOperator task?

Pass the SQL string or file path, choose warehouse, database, schema, and optional parameters. Each task becomes a node in your DAG just like any Python function.

How to load ecommerce data?

Use the operator to execute DDL, DML, or COPY statements against tables such as Customers, Orders, Products, and OrderItems. You can chain tasks for staging, transformation, and quality checks.

Best practices for production DAGs

Store SQL in version-controlled files, enable autocommit=True, set depends_on_past=True for incremental loads, and add on_failure_callback to alert on errors.

What are common pitfalls?

Incorrect connection IDs and missing warehouses cause most failures. Always test the connection in the Airflow UI and specify the warehouse in each task or connection.

How to monitor Snowflake tasks?

Airflow’s Tree, Gantt, and Log views show real-time status. Inside Snowflake, query QUERY_HISTORY to cross-check execution time and cost.

Why How to Integrate Snowflake with Airflow is important

How to Integrate Snowflake with Airflow Example Usage


from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from datetime import datetime

default_args = {"owner": "data_eng", "retries": 1}
with DAG(
    dag_id="daily_snowflake_etl",
    start_date=datetime(2023, 1, 1),
    schedule_interval="0 2 * * *",
    default_args=default_args,
    catchup=False,
) as dag:

    load_customers = SnowflakeOperator(
        task_id="load_customers",
        snowflake_conn_id="snowflake_default",
        sql="""
            COPY INTO Customers
            FROM @stage/customers
            FILE_FORMAT = (TYPE = 'CSV' FIELD_OPTIONALLY_ENCLOSED_BY='"');
        """,
        warehouse="LOAD_WH",
        database="E_COMMERCE",
    )

    calc_customer_ltv = SnowflakeOperator(
        task_id="calc_customer_ltv",
        snowflake_conn_id="snowflake_default",
        sql="sql/calc_customer_ltv.sql",
        warehouse="ANALYTICS_WH",
        database="E_COMMERCE",
    )

    load_customers >> calc_customer_ltv

How to Integrate Snowflake with Airflow Syntax


SnowflakeOperator(
    task_id="load_customers",
    snowflake_conn_id="snowflake_default",   -- Airflow connection id
    sql="/sql/load_customers.sql",            -- str, list, or reference to template
    autocommit=True,                           -- bool, default False
    parameters={"batch_date": ""},    -- dict for templated params
    warehouse="LOAD_WH",                      -- optional, overrides connection
    database="E_COMMERCE",                    -- optional
    schema="PUBLIC",                          -- optional
    role="LOADER_ROLE"                        -- optional
)

Common Mistakes

Frequently Asked Questions (FAQs)

Is the SnowflakeOperator the only option?

No. You can also use S3ToSnowflakeOperator, SnowflakeSqlApiOperator, or generic PythonOperator with the Snowflake Python connector for advanced logic.

How do I secure credentials?

Store passwords or key-pair secrets in Airflow’s backend (Env Var, AWS Secrets Manager, Vault). Reference them in the Snowflake connection; never embed creds in DAG code.

Can I run tasks in parallel warehouses?

Yes. Define separate tasks with different warehouse values or create multiple Airflow connections, each pointing to a distinct warehouse to isolate workloads.

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie Logo
Bauhealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.