How to Integrate PostgreSQL with Airflow

Galaxy Glossary

How do I integrate PostgreSQL with Apache Airflow?

Postgres-Airflow integration lets you schedule, orchestrate, and monitor PostgreSQL queries from Apache Airflow DAGs by using built-in hooks and operators.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
You'll be receiving a confirmation email

Follow us on twitter :)
Oops! Something went wrong while submitting the form.

Description

Table of Contents

Why integrate PostgreSQL with Airflow?

Integrating Postgres with Airflow automates recurring queries, transforms data, and keeps analytics pipelines reproducible. Airflow’s scheduler triggers DAGs, while PostgresHook manages connections and credentials securely.

How to add a PostgreSQL connection in Airflow UI?

Open Admin → Connections, click + Add, choose Postgres. Enter conn_id (e.g., postgres_ecom), host, port, database, user, and password. Save; Airflow encrypts credentials in its metadata DB.

What Python packages are required?

Install extras: pip install apache-airflow[postgres]. This pulls psycopg2-binary and enables PostgresHook and PostgresOperator.

How do I run a single SQL statement?

Use PostgresOperator. Provide task_id, postgres_conn_id, and an inline or file-based SQL query. Airflow handles transactions and logging automatically.

Example

load_customers = PostgresOperator(
task_id="load_customers",
postgres_conn_id="postgres_ecom",
sql="""
COPY Customers(id, name, email)
FROM '/tmp/customers.csv'
DELIMITER ',' CSV;
""",
)

How to fetch query results inside a Python task?

Use PostgresHook inside a PythonOperator. The hook’s get_records() or get_pandas_df() returns data for in-memory processing.

Example

def top_products():
hook = PostgresHook(postgres_conn_id="postgres_ecom")
sql = """
SELECT p.name, SUM(oi.quantity) AS units
FROM Products p
JOIN OrderItems oi ON oi.product_id = p.id
GROUP BY p.name
ORDER BY units DESC
LIMIT 10;
"""
return hook.get_records(sql)

How to parameterize SQL safely?

Prefer Jinja parameters or psycopg2 placeholders. Airflow renders templates at runtime, letting you pass execution dates or config values without string concatenation.

How to manage transactions?

PostgresOperator wraps SQL in a transaction by default. Set autocommit=True when running DDL like CREATE TABLE to avoid “implicit transaction” errors.

Best practices for production DAGs

  • Use a dedicated service account with least-privilege rights.
  • Store connection secrets in Airflow’s Secrets Backend for Kubernetes or Vault.
  • Split heavy transformations into multiple tasks to enable retries and lineage.
  • Add email_on_failure=True and retries for robustness.

What common mistakes should I avoid?

Ensure postgres_conn_id exactly matches the Airflow connection ID. Always install the Postgres extra; missing drivers cause ModuleNotFoundError: psycopg2.

Can I run multiple SQL files in one task?

Yes. Pass a list of file paths to sql=["query1.sql", "query2.sql"]. Airflow executes them sequentially in one transaction unless autocommit is true.

Why How to Integrate PostgreSQL with Airflow is important

How to Integrate PostgreSQL with Airflow Example Usage


from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime

def calc_daily_revenue(ds, **_):
    hook = PostgresHook(postgres_conn_id="postgres_ecom")
    sql = """
        SELECT SUM(total_amount) AS revenue
        FROM Orders
        WHERE order_date::date = %s::date;
    """
    rev = hook.get_first(sql, parameters=[ds])
    print(f"Daily revenue: ${rev[0]:,.2f}")

with DAG(
    dag_id="ecom_reporting",
    start_date=datetime(2024, 1, 1),
    schedule_interval="@daily",
    catchup=False,
) as dag:

    refresh_materialized_view = PostgresOperator(
        task_id="refresh_sales_mv",
        postgres_conn_id="postgres_ecom",
        sql="REFRESH MATERIALIZED VIEW CONCURRENTLY sales_summary;",
        autocommit=True,
    )

    compute_revenue = PythonOperator(
        task_id="daily_revenue",
        python_callable=calc_daily_revenue,
    )

    refresh_materialized_view >> compute_revenue

How to Integrate PostgreSQL with Airflow Syntax


PostgresOperator(
    task_id="task_name",
    postgres_conn_id="postgres_ecom",   -- Connection ID set in Airflow UI
    sql="""<SQL string or path/glob>""",  -- Query, list, or template
    parameters={"key": "value"},           -- Optional dict for Jinja params
    autocommit=False,                       -- True for DDL
    database="ecommerce",                  -- Override database
    dag=dag
)

PostgresHook(postgres_conn_id="postgres_ecom")
    .get_records(sql, parameters=None)
    .run(sql, autocommit=False)

Common Mistakes

Frequently Asked Questions (FAQs)

Can Airflow handle Postgres migrations?

Yes. Chain multiple PostgresOperator tasks or invoke tools like Flyway via BashOperator. Use autocommit=True for schema changes.

How do I secure credentials?

Store the connection in AWS Secrets Manager, HashiCorp Vault, or Kubernetes secrets and configure Airflow’s SecretsBackend to fetch at runtime.

Is connection pooling supported?

Airflow opens and closes connections per task. Use PgBouncer in front of Postgres to pool and reuse connections without code changes.

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie Logo
Bauhealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
You'll be receiving a confirmation email

Follow us on twitter :)
Oops! Something went wrong while submitting the form.