How to Integrate MySQL with Airflow

Galaxy Glossary

How do I connect MySQL to Airflow and run automated SQL tasks?

Airflow integrates with MySQL through connections, hooks, and operators to execute SQL and orchestrate data pipelines.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
You'll be receiving a confirmation email

Follow us on twitter :)
Oops! Something went wrong while submitting the form.

Description

Table of Contents

Why integrate MySQL with Airflow?

Airflow automates recurring MySQL queries, extracts, and transformations so you no longer rely on cron jobs or manual execution. DAG scheduling, retry logic, and alerting make pipelines reliable and observable.

How to declare a MySQL connection in Airflow?

Add a connection via the UI or environment variable. Set Conn Id mysql_ecom, host, port (3306), user, password, and schema ecommerce. Airflow stores credentials in its metadatabase and injects them into hooks and operators.

What is the syntax of MySqlOperator?

MySqlOperator is the workhorse for running SQL. Provide the task id, connection id, and SQL string or file. Templates let you inject runtime variables, making DAGs dynamic.

How to run a MySQL SELECT task?

Define a DAG file, import MySqlOperator, and create a task that pulls yesterday’s orders. Push results to XCom for downstream tasks.

How to pass data between tasks?

Set do_xcom_push=True in MySqlOperator to store query results. Subsequent tasks access them with {{ ti.xcom_pull(task_ids='task_name') }}. Limit large result sets to avoid metadatabase bloat.

What are best practices?

Use parametrized SQL to avoid SQL injection, keep transactions short to minimize lock time, and store large extracts in object storage, not XComs. Wrap DDL in Autocommit or BEGIN…COMMIT as needed.

Common mistakes and fixes

Bad Conn Id: Typos cause InvalidConnection. Verify the Conn Id matches conn_id in your operator.
Missing client libs: Airflow needs mysqlclient or pymysql. Install with pip install 'apache-airflow[mysql]'.

FAQ

Do I need a separate MySQL server for Airflow metadata?

No, the metadata database can be Postgres or MySQL. The MySQL integration discussed here targets your source/target business database.

Can I load CSVs into MySQL with Airflow?

Yes. Use BashOperator to call mysqlimport or a Python task with LOAD DATA INFILE.

Why How to Integrate MySQL with Airflow is important

How to Integrate MySQL with Airflow Example Usage


from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.mysql.operators.mysql import MySqlOperator

default_args = {"retries": 2}

dag = DAG(
    "daily_order_summary",
    start_date=days_ago(1),
    schedule_interval="0 2 * * *",
    default_args=default_args,
)

fetch_orders = MySqlOperator(
    task_id="fetch_yesterday_orders",
    mysql_conn_id="mysql_ecom",
    sql="""
        SELECT customer_id, SUM(total_amount) AS daily_total
        FROM Orders
        WHERE order_date = CURDATE() - INTERVAL 1 DAY
        GROUP BY customer_id;
    """,
    do_xcom_push=True,
    dag=dag,
)

fetch_orders

How to Integrate MySQL with Airflow Syntax


MySqlOperator(
    task_id="task_fetch_orders",
    mysql_conn_id="mysql_ecom",   # Conn Id configured in Airflow
    sql="""
        SELECT id, customer_id, total_amount
        FROM Orders
        WHERE order_date = CURDATE() - INTERVAL 1 DAY;
    """,
    do_xcom_push=True,             # store results in XCom
    autocommit=True,               # wrap each statement in autocommit
    parameters=None                # optional dict or tuple for parameterized SQL
)

Common Mistakes

Frequently Asked Questions (FAQs)

How do I install MySQL client libraries for Airflow?

Run pip install "apache-airflow[mysql]". This installs MySQL client drivers and the provider package.

Can I execute multiple statements in one task?

Yes. Pass a list of SQL strings to sql or separate statements with semicolons and set autocommit=False to run them in a single transaction.

Is MySqlOperator templated?

Absolutely. Surround variables with Jinja delimiters like {{ ds }} to insert execution dates at runtime.

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie Logo
Bauhealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
You'll be receiving a confirmation email

Follow us on twitter :)
Oops! Something went wrong while submitting the form.