How to integrate MariaDB with Airflow in PostgreSQL

Galaxy Glossary

How do I connect Airflow to MariaDB and run scheduled SQL?

MariaDB-Airflow integration lets DAG tasks read and write MariaDB tables through Airflow’s MySqlHook/MySqlOperator.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.

Description

Table of Contents

What problem does MariaDB integration with Airflow solve?

It schedules, orchestrates, and monitors SQL jobs against MariaDB, replacing ad-hoc scripts with version-controlled, retry-aware DAGs.

How do I add a MariaDB connection in Airflow?

Use the Airflow UI, CLI, or environment variables. Provide host, port (3306), user, password, and schema. Choose the “mysql” conn_type—MariaDB is wire-compatible.

Airflow CLI example

airflow connections add mariadb_ecom \
--conn-uri 'mariadb://analytics:SECRETPW@db.prod.local:3306/ecommerce'

What syntax does the MySqlOperator support?

MySqlOperator(sql="…", mysql_conn_id="mariadb_ecom", parameters=None, autocommit=False, database=None, **kwargs) executes SQL strings or files. Use Jinja in sql for templating.

Templated example

add_daily_orders = MySqlOperator(
task_id="agg_daily_orders",
sql="sql/agg_daily_orders.sql",
mysql_conn_id="mariadb_ecom",
parameters={"ds": '{{ ds }}'},
)

How do I read data inside a Python task?

MySqlHook offers get_records(), get_pandas_df(), and run(). It returns lists or DataFrames you can process further.

PythonOperator example

def stale_customers(**ctx):
hook = MySqlHook(mysql_conn_id="mariadb_ecom")
rows = hook.get_records("""
SELECT id, email
FROM Customers
WHERE created_at < DATE_SUB(CURDATE(), INTERVAL 1 YEAR)
""")
# send email or write to log

When should I enable autocommit?

Enable autocommit=True for DDL or large ETL inserts to avoid implicit transaction overhead. Keep it False when you need rollback safety.

Best practices for production DAGs

  • Store SQL in version-controlled files, not inline strings.
  • Template dates with Airflow macros ({{ ds }}, {{ ds_nodash }}).
  • Use dedicated service accounts with least-privilege grants.
  • Fail fast—set retries and retry_delay.

Common mistake: missing schema in connection URI

Omitting the database name forces you to fully qualify every table. Add /ecommerce in the URI to default the schema.

Common mistake: forgetting autocommit on multi-statement SQL

Running bulk INSERT … SELECT without autocommit=True can fill transaction logs and lock tables. Enable autocommit or split the job.

How do I test MariaDB tasks locally?

Launch a Docker MariaDB container, set an Airflow .env with the same connection URI, and trigger the DAG with airflow dags test.

Example DAG overview

with DAG(
dag_id="daily_ecommerce_metrics",
schedule_interval="0 2 * * *",
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:

agg_orders = MySqlOperator(
task_id="agg_daily_orders",
sql="sql/agg_daily_orders.sql",
mysql_conn_id="mariadb_ecom",
autocommit=True,
)

mark_stale = PythonOperator(
task_id="flag_stale_customers",
python_callable=stale_customers,
)

agg_orders >> mark_stale

Want to write SQL faster?

Galaxy’s AI copilot completes SQL, optimizes queries, and shares them across your team—perfect alongside Airflow.

Why How to integrate MariaDB with Airflow in PostgreSQL is important

How to integrate MariaDB with Airflow in PostgreSQL Example Usage


SELECT c.name, SUM(oi.quantity) AS items
FROM Customers c
JOIN Orders o  ON o.customer_id = c.id
JOIN OrderItems oi ON oi.order_id   = o.id
WHERE o.order_date >= DATE_SUB(CURDATE(), INTERVAL 7 DAY)
GROUP BY c.name
ORDER BY items DESC;

How to integrate MariaDB with Airflow in PostgreSQL Syntax


airflow connections add CONNECTION_ID \
    --conn-uri 'mariadb://USER:PASSWORD@HOST:PORT/SCHEMA?charset=utf8mb4'

MySqlOperator(
    task_id="TASK_NAME",
    sql="SQL_STRING_OR_FILE",
    mysql_conn_id="CONNECTION_ID",
    parameters={"key": "value"},  # optional Jinja params
    autocommit=False,               # True to auto-commit each statement
    database=None,                  # override schema
    dag=DAG_OBJECT,
)

MySqlHook(mysql_conn_id="CONNECTION_ID").get_records("SELECT …")

Common Mistakes

Frequently Asked Questions (FAQs)

Can I use MariaDB 10.11 with Airflow 2.6?

Yes. The mysql-client Python driver supports MariaDB ≥10.3. Ensure Airflow image has mysqlclient >=2.1 installed.

Does Airflow support transactional rollbacks?

MySqlOperator runs inside one transaction unless autocommit=True. Raise an exception to roll back.

How do I migrate from PostgresHook to MySqlHook?

Replace the conn_id, import path, and adjust SQL syntax (e.g., use ? for parameter placeholders in MySqlHook).

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie Logo
Bauhealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.