Postgres-Airflow integration lets you schedule, orchestrate, and monitor PostgreSQL queries from Apache Airflow DAGs by using built-in hooks and operators.
Integrating Postgres with Airflow automates recurring queries, transforms data, and keeps analytics pipelines reproducible. Airflow’s scheduler triggers DAGs, while PostgresHook manages connections and credentials securely.
Open Admin → Connections, click + Add, choose Postgres. Enter conn_id
(e.g., postgres_ecom
), host, port, database, user, and password. Save; Airflow encrypts credentials in its metadata DB.
Install extras: pip install apache-airflow[postgres]
. This pulls psycopg2-binary
and enables PostgresHook
and PostgresOperator
.
Use PostgresOperator
. Provide task_id
, postgres_conn_id
, and an inline or file-based SQL query. Airflow handles transactions and logging automatically.
load_customers = PostgresOperator(
task_id="load_customers",
postgres_conn_id="postgres_ecom",
sql="""
COPY Customers(id, name, email)
FROM '/tmp/customers.csv'
DELIMITER ',' CSV;
""",
)
Use PostgresHook
inside a PythonOperator
. The hook’s get_records()
or get_pandas_df()
returns data for in-memory processing.
def top_products():
hook = PostgresHook(postgres_conn_id="postgres_ecom")
sql = """
SELECT p.name, SUM(oi.quantity) AS units
FROM Products p
JOIN OrderItems oi ON oi.product_id = p.id
GROUP BY p.name
ORDER BY units DESC
LIMIT 10;
"""
return hook.get_records(sql)
Prefer Jinja parameters or psycopg2
placeholders. Airflow renders templates at runtime, letting you pass execution dates or config values without string concatenation.
PostgresOperator
wraps SQL in a transaction by default. Set autocommit=True
when running DDL like CREATE TABLE
to avoid “implicit transaction” errors.
email_on_failure=True
and retries
for robustness.Ensure postgres_conn_id
exactly matches the Airflow connection ID. Always install the Postgres extra; missing drivers cause ModuleNotFoundError: psycopg2
.
Yes. Pass a list of file paths to sql=["query1.sql", "query2.sql"]
. Airflow executes them sequentially in one transaction unless autocommit
is true.
Yes. Chain multiple PostgresOperator
tasks or invoke tools like Flyway via BashOperator
. Use autocommit=True
for schema changes.
Store the connection in AWS Secrets Manager, HashiCorp Vault, or Kubernetes secrets and configure Airflow’s SecretsBackend
to fetch at runtime.
Airflow opens and closes connections per task. Use PgBouncer in front of Postgres to pool and reuse connections without code changes.