How to Integrate BigQuery with Airflow

Galaxy Glossary

How do I integrate BigQuery with Airflow to automate SQL and load jobs?

BigQuery integration with Airflow lets you orchestrate, schedule, and monitor BigQuery SQL jobs directly from Apache Airflow DAGs.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.

Description

Why orchestrate BigQuery with Airflow?

Running BigQuery tasks in Airflow centralizes scheduling, retries, and alerting while keeping SQL logic version-controlled alongside other data pipelines.

How do I configure the BigQuery connection?

Create a service-account key with BigQuery permissions, add it as a Google Cloud connection in Airflow (Conn Id: google_cloud_default), and set the JSON key path in the connection’s extra field.

What is the syntax of BigQueryInsertJobOperator?

Use the operator to submit a query or load job. Key arguments: task_id, configuration (job JSON), location, and optional gcp_conn_id.

How can I query the Orders table nightly?

Embed your SQL inside the operator’s query field or job JSON. Airflow will handle retries, backfills, and templating.

How do I load CSVs from GCS into OrderItems?

Use BigQueryInsertJobOperator with a load configuration pointing to the GCS URI, schema fields, and destination table.

Best practices for production DAGs

Parameterize dataset and table names, enable write_disposition="WRITE_TRUNCATE" carefully, store SQL in external files, and monitor job cost via the Cloud Billing export.

Common errors and quick fixes

Missing credentials cause google.auth.exceptions.DefaultCredentialsError; verify the service-account key path. Long-running queries may hit Airflow’s default 1-hour timeout—extend with execution_timeout.

Why How to Integrate BigQuery with Airflow is important

How to Integrate BigQuery with Airflow Example Usage


from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}

with DAG("gcs_to_bq_orderitems", schedule_interval="@daily", default_args=default_args) as dag:
    load_items = BigQueryInsertJobOperator(
        task_id="load_items",
        configuration={
            "load": {
                "sourceUris": ["gs://ecom-data/stage/order_items/*.csv"],
                "destinationTable": {
                    "projectId": "my_proj",
                    "datasetId": "ecom",
                    "tableId": "OrderItems"
                },
                "schema": {
                    "fields": [
                        {"name": "id", "type": "INTEGER"},
                        {"name": "order_id", "type": "INTEGER"},
                        {"name": "product_id", "type": "INTEGER"},
                        {"name": "quantity", "type": "INTEGER"}
                    ]
                },
                "writeDisposition": "WRITE_APPEND",
                "skipLeadingRows": 1
            }
        },
        location="US"
    )

How to Integrate BigQuery with Airflow Syntax


from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago

default_args = {"start_date": days_ago(1)}

with DAG("bq_orders_summary", schedule_interval="0 2 * * *", default_args=default_args) as dag:
    summarize_orders = BigQueryInsertJobOperator(
        task_id="summarize_orders",
        configuration={
            "query": {
                "query": """
                SELECT customer_id,
                       COUNT(*)   AS order_count,
                       SUM(total_amount) AS total_spent
                FROM `ecom.Orders`
                WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
                GROUP BY customer_id
                """,
                "destinationTable": {
                    "projectId": "my_proj",
                    "datasetId": "ecom_reporting",
                    "tableId": "customer_30d_spend"
                },
                "writeDisposition": "WRITE_TRUNCATE",
                "useLegacySql": False
            }
        },
        location="US"
    )

Common Mistakes

Frequently Asked Questions (FAQs)

Do I need the BigQueryHook directly?

For most use cases, BigQueryInsertJobOperator is sufficient. The hook is only required for advanced interactions such as polling job status outside an operator.

Can I run multiple BigQuery tasks in parallel?

Yes. Airflow executes tasks concurrently as long as you respect your GCP project’s job quota and Airflow’s max_active_tasks.

How do I pass runtime parameters to SQL?

Embed Jinja templates like {{ ds }} inside the query. Airflow renders them at runtime, allowing date-partitions and dynamic datasets.

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie
BauHealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.