How to Integrate ClickHouse with Apache Airflow

Galaxy Glossary

How do I integrate ClickHouse with Apache Airflow?

Configure Airflow to run ClickHouse SQL, transfer data, and orchestrate pipelines.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.

Description

Table of Contents

Why use Airflow with ClickHouse?

Airflow lets you schedule, parameterize, and monitor ClickHouse queries and ETL steps, turning ad-hoc SQL into repeatable data pipelines.

Which Airflow provider supports ClickHouse?

The open-source clickhouse-connect package ships an ClickHouseHook and ClickHouseOperator. Install with pip install apache-airflow-providers-clickhouse.

How do I create the Airflow connection?

In the Airflow UI, add a connection with Conn Id clickhouse_default, type ClickHouse, host, port, user, password, and database.

What does a minimal DAG look like?

Use ClickHouseOperator to run SQL inline or from a file, or call ClickHouseHook inside a Python task for dynamic queries.

Example: daily revenue aggregation

The DAG below sums daily Orders.total_amount and inserts the result into a reporting table.

How do I pass parameters safely?

Use Jinja templating with {{ ds }} or {{ params }}. The operator parameter parameters binds values to named placeholders to avoid SQL injection.

Can I load CSVs from S3 into ClickHouse?

Chain an S3ToClickHouseOperator (or Python task with clickhouse_connect) after an S3 sensor. Airflow handles dependencies and retries.

How do I test locally?

Spin up Airflow and ClickHouse via Docker Compose. Export AIRFLOW_CONN_CLICKHOUSE_DEFAULT for quick CLI tests using airflow tasks test.

Best practices

• Set retries and retry_delay to handle transient ClickHouse errors.
• Batch large inserts with INSERT ... SELECT instead of row-by-row.
• Store SQL in repo files for version control.

Why How to Integrate ClickHouse with Apache Airflow is important

How to Integrate ClickHouse with Apache Airflow Example Usage


-- Verify the daily aggregation
SELECT sales_date, total_sales
FROM sales_per_day
ORDER BY sales_date DESC
LIMIT 7;

How to Integrate ClickHouse with Apache Airflow Syntax


```python
from airflow import DAG
from airflow.providers.clickhouse.operators.clickhouse import ClickHouseOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id="daily_clickhouse_sales",
    start_date=days_ago(1),
    schedule_interval="@daily",
    catchup=False,
    default_args={"retries": 2, "retry_delay": 300},
) as dag:

    aggregate_sales = ClickHouseOperator(
        task_id="aggregate_sales",
        clickhouse_conn_id="clickhouse_default",
        sql="""
        INSERT INTO sales_per_day (sales_date, total_sales)
        SELECT '', sum(total_amount)
        FROM Orders
        WHERE order_date = '';
        """,
    )

    aggregate_sales
```

Common Mistakes

Frequently Asked Questions (FAQs)

Do I need a special Airflow version?

Any Airflow ≥2.3 works. Ensure the ClickHouse provider version matches your Airflow core version to avoid dependency conflicts.

Can I execute multiple statements?

Yes. Wrap them in a single SQL string separated by semicolons or call several ClickHouseOperators in sequence for clearer logs.

How do I fetch query results in Python?

Use ClickHouseHook. Call .get_conn().query(sql) or .get_pandas_df(sql) inside a PythonOperator task.

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie Logo
Bauhealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.