How to Integrate BigQuery with Apache Spark

Galaxy Glossary

How do I connect Apache Spark to Google BigQuery and move data efficiently?

Load and save BigQuery tables directly from Apache Spark jobs using the Spark-BigQuery connector.

Sign up for the latest in SQL knowledge from the Galaxy Team!
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.

Description

Why integrate BigQuery with Spark?

Move data between Google BigQuery and Spark to enrich analytics, offload processing, or build ML pipelines without complex ETL.

Which connector should I use?

Use the open-source spark-bigquery connector (com.google.cloud.spark:spark-bigquery). It supports batch and streaming, push-down predicates, and write-disposition options.

How do I add the connector dependency?

Add --packages com.google.cloud.spark:spark-bigquery_2.12:0.36.1 (Scala 2.12) to spark-submit or include it in your build.sbt / requirements.txt.

What authentication works best?

Service Account JSON is simplest.Set GOOGLE_APPLICATION_CREDENTIALS or pass credentialsFile in the Spark options.

How to read a BigQuery table into Spark?

Use spark.read.format("bigquery") with the fully-qualified table name project.dataset.table.Optionally filter columns or rows to minimize data scanned.

Example – Load Orders

val ordersDF = spark.read.format("bigquery")
.option("table", "shop.analytics.Orders")
.load()

How to write a Spark DataFrame back to BigQuery?

Call dataframe.write.format("bigquery") and set writeMethod (direct or indirect) plus writeDisposition (WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY).

Example – Save daily revenue

dailyRevenueDF.write.format("bigquery")
.option("table", "shop.analytics.daily_revenue")
.option("writeDisposition", "WRITE_TRUNCATE")
.save()

Can I push SQL filters to BigQuery?

Yes. Use spark.conf.set("viewsEnabled","true") and spark.conf.set("materializationDataset","temp_ds").Spark translates filters into BigQuery SQL, reducing data transfer.

Best practices

• Partition & cluster BigQuery tables to speed reads.
• Select only needed columns.
• Prefer writeMethod=direct for small-medium writes; use indirect for huge datasets.
• Monitor Data Proc/Spark job and BigQuery slot usage.

Common mistakes

See below for fixes.

.

Why How to Integrate BigQuery with Apache Spark is important

How to Integrate BigQuery with Apache Spark Example Usage


-- PySpark example: compute top-selling products last 7 days and write to BigQuery
from pyspark.sql import functions as F

orders = (spark.read.format("bigquery")
          .option("table", "shop.analytics.Orders")
          .load()
          .filter(F.col("order_date") >= F.date_sub(F.current_date(),7)))

items = spark.read.format("bigquery") \
        .option("table", "shop.analytics.OrderItems") \
        .load()

products = spark.read.format("bigquery") \
           .option("table", "shop.catalog.Products") \
           .load()

top = (items.join(orders, "order_id")
            .groupBy("product_id")
            .agg(F.sum("quantity").alias("units"))
            .join(products, "product_id")
            .orderBy(F.desc("units"))
            .limit(10))

top.write.format("bigquery") \
    .option("table", "shop.analytics.top_products_7d") \
    .option("writeDisposition", "WRITE_TRUNCATE") \
    .save()

How to Integrate BigQuery with Apache Spark Syntax


Read:

spark.read.format("bigquery")
  .option("table", "<project>.<dataset>.<table>")
  [.option("filter", "<predicate>")]
  [.option("schema", "<jsonSchema>")]
  [.option("credentialsFile", "/path/key.json")]
  .load()

Write:

dataframe.write.format("bigquery")
  .option("table", "<project>.<dataset>.<table>")
  [.option("writeMethod", "direct|indirect")]
  [.option("writeDisposition", "WRITE_TRUNCATE|WRITE_APPEND|WRITE_EMPTY")]
  [.option("createDisposition", "CREATE_IF_NEEDED|CREATE_NEVER")]
  [.option("temporaryGcsBucket", "<gcs_bucket>")]
  .save()

Example (e-commerce – join Customers + Orders → high_value_customers):

val hvDF = spark.read.format("bigquery")
  .option("table", "shop.analytics.Orders")
  .load()
  .filter("total_amount > 500")
  .join(
    spark.read.format("bigquery").option("table", "shop.crm.Customers").load(),
    "customer_id")

hvDF.write.format("bigquery")
  .option("table", "shop.analytics.high_value_customers")
  .option("writeDisposition", "WRITE_TRUNCATE")
  .save()

Common Mistakes

Frequently Asked Questions (FAQs)

Is the connector free?

The connector itself is open-source and free. You pay for BigQuery storage/compute and any Dataproc or Spark cluster costs.

Can I use Dataproc Serverless?

Yes. Pass the same --packages flag in your job configuration. Serverless automatically scales Spark executors.

Does predicate push-down always happen?

Push-down works for simple filters, projections, and limits. Complex UDFs or non-deterministic expressions disable push-down.

Want to learn about other SQL terms?

Trusted by top engineers on high-velocity teams
Aryeo Logo
Assort Health
Curri
Rubie
BauHealth Logo
Truvideo Logo
Welcome to the Galaxy, Guardian!
Oops! Something went wrong while submitting the form.