
In this post, I build a data pipeline in Python to ingest US Census data into BigQuery tables, using Apache Airflow and BigQuery DataFrames (aka BigFrames). I’ll demonstrate how utilizing Google Cloud Platform (GCP) operators and BigFrames can significantly reduce the load on Airflow’s orchestration nodes and enable robust data validation for quality assurance
BigFrames is an open source Python library offered by Google. BigFrames scales Python data processing by transpiling common Python data science APIs to BigQuery SQL. You can read more about BigFrames in the official introduction to BigFrames and can refer to the public git repository for BigFrames.
Data pipelines are an essential component for modern analytics, machine learning, and AI. Apache Airflow is a popular open source platform to author, schedule, and monitor workflows. It is particularly useful for creating production data engineering pipelines. These pipelines are made up of nodes that depend on each other and form Directed Acyclic Graphs (DAGs for short). As an open source tool, there are many options for deploying Apache Airflow. In these examples, I use Google Cloud Composer, but the sample DAGs should work in any Airflow environment where you have configured a Google Cloud connection.
Unfortunately, common data engineering practices such as using pandas to transform data can overload the Airflow orchestration nodes, causing slower pipelines and delays or even out-of-memory crashes. BigFrames is “drop-in compatible” with pandas in many workloads by just changing the imports to bigframes.pandas. This keeps the data and the associated processing (transformations, joins, aggregations, ML training, and validations) in the scalable BigQuery engine and out of the Airflow orchestration nodes where it causes problems.
This post will focus on the typical data engineering scenario of (1) getting data from some source — in this case the US Census, (2) cleaning it up, (3) validating the data is good quality, and (4) loading it into the final destination in your data warehouse / data lake. Since this pipeline will technically load data into BigQuery as temporary resources before transforming the data, this pipeline is what’s called an ELT pipeline rather than an ETL pipeline.
The combination of Airflow and BigFrames is very flexible. For example, my teammate Shobhit has written how BigFrames can be used to train an ML model. You can use his sample and the Airflow techniques I describe in this post to build a pipeline that trains an ML model, validates it, and then deploys it to production or does batch prediction with it to enrich a dataset.
In this post, I’ll focus specifically on the steps needed to run BigFrames. For complete, runnable DAG examples, please refer to the following samples in my code snippets GitHub repository:
The US Census publishes many different datasets. In this pipeline, you’ll load the County Population by Characteristics: 2020–2024 dataset, which are provided as CSV files over HTTPS.
BigQuery cannot read directly from HTTPS, so the first node in the DAG downloads the CSV file and uploads it to GCS. I use the BashOperator for simplicity.
GCS_BUCKET = "your-bucket-id"
GCS_LOCATION = f"gs://{GCS_BUCKET}/us-census/cc-est2024-agesex-all.csv"download_upload = bash.BashOperator(
task_id="download_upload",
bash_command=f"""
wget https://www2.census.gov/programs-surveys/popest/datasets/2020-2024/counties/asrh/cc-est2024-agesex-all.csv -P ~;
gcloud storage cp ~/cc-est2024-agesex-all.csv {GCS_LOCATION}
""",
)
For larger dataset there are many options for getting data into Google Cloud Storage. For example, the Cloud Storage Transfer Service can copy files from public HTTPS URLs to a Cloud Storage bucket and there are Apache Airflow operators for running a transfer service job.
BigFrames can run anywhere that can run Python and has access to Google Cloud resources. This includes the KubernetesPodOperator, DockerOperator, ExternalPythonOperator, PythonVirtualenvOperator, and PythonOperator. The KubernetesPodOperator is especially useful to provide an isolated environment running separately from the orchestration nodes, but this comes with additional complexity. Since this post uses bigframes, there is less reason to isolate from the orchestration nodes because the data processing happens in the BigQuery engine.
To simplify deployment, I recommend using the PythonOperator if you are able to install additional Python libraries such as the bigframes package to your Airflow environment. From the servers where Airflow is running, install bigframes:
pip install --upgrade bigframes==2.11.0
In Cloud Composer, configure the packages in the Composer product page in Google Cloud Console.
Use the PythonVirtualenvOperator if you need to install the bigframes package in an isolated environment.
Once BigFrames is installed, you can use the BigFrames pandas compatible module. Import the bigframes.pandas module in the Python operator you’ve chosen:
def callable_python():
import bigframes.pandas as bpd
# Operator logic will go here.bf_to_gbq = PythonOperator(
task_id="bf_to_gbq",
python_callable=callable_python,
)
I recommend first configuring a few BigFrames options to ensure the best performance:
- bpd.options.compute.maximum_result_rows: This setting acts as a safeguard, preventing inadvertent large downloads of data from BigQuery to your Airflow worker. If a BigFrames operation would result in more rows than this specified limit being downloaded, the operation will fail, thereby protecting your worker’s memory resources.
- bpd.options.bigquery.ordering_mode = “partial”: Activating this mode generally yields optimal performance for BigFrames operations by avoiding the pandas feature that causes the worst performance: a default, sequential ordering and index.
- bpd.options.bigquery.project: Depending on how you’ve configured your Google Cloud credentials, you may need to set this setting so that BigFrames knows which project to bill for the BigQuery queries it runs.
BigFrames uses a BigQuery session to store temporary resources. A BigQuery session automatically closes after 24 hours of inactivity, but I recommend putting a try/finally clause with bpd.close_session() to explicitly close the session to save on storage costs of the temporary resources bigframes creates.
bpd.options.bigquery.ordering_mode = "partial"
bpd.options.bigquery.project = "my-project-id"
bpd.options.compute.maximum_result_rows = 10_000try:
# Your logic here.
...
finally:
bpd.close_session()
BigFrames provides several pandas-compatible methods to read data. Where possible, I recommend setting engine=”bigquery”. By loading with the BigQuery engine, you can avoid having to read the file into memory and use BigQuery to parse the file(s) and load them into a temporary table.
# Inside the `try` block above.
GCS_BUCKET = "your-bucket-id"
GCS_LOCATION = f"gs://{GCS_BUCKET}/us-census/cc-est2024-agesex-all.csv"df = bpd.read_csv(GCS_LOCATION, engine="bigquery")
The Census data has a “YEAR” column, but it’s not clear which year it refers to. You can use pandas-compatible methods to preprocess the data before writing it to the final destination. This can help cleanup the data to make it more useful and easier to understand.
# Perform preprocessing. For example, you can map some coded data
# into a form that is easier to understand.
df_dates = df.assign(
ESTIMATE_DATE=df["YEAR"].case_when(
caselist=[
(df["YEAR"].eq(1), datetime.date(2020, 4, 1)),
(df["YEAR"].eq(2), datetime.date(2020, 7, 1)),
(df["YEAR"].eq(3), datetime.date(2021, 7, 1)),
(df["YEAR"].eq(4), datetime.date(2022, 7, 1)),
(df["YEAR"].eq(5), datetime.date(2023, 7, 1)),
(df["YEAR"].eq(6), datetime.date(2024, 7, 1)),
(True, None),
]
),
).drop(columns=["YEAR"])# TODO(developer): Add additional processing and cleanup as needed.
One of the benefits of using BigQuery DataFrames in your operators is that it makes it easy to perform data validations before committing to write the data to its final destination.
# Note: cache() is optional, but if any of the preprocessing above is
# complicated, it hints to BigQuery DataFrames to run those first and
# avoid duplicating work.
df_dates.cache()row_count, column_count = df_dates.shape
assert row_count > 0
assert column_count > 0
assert not df_dates["ESTIMATE_DATE"].hasnans
# TODO(developer): Add additional validations as needed.
# For example, conider comparing the distribution of data in DataFrame
# with the distribution of the data in the destination and alerting if there is a
# significant shift or unknown outliers.
Writing the data to its final destination is as easy as calling to_gbq(). Or you can use other pandas-compatible I/O methods such as to_parquet() to write to GCS, instead.
BIGQUERY_DESTINATION = "your-gcp-project.airflow_demo.us_census_by_county2020_to_present"# Now that you have validated the data, it should be safe to write
# to the final destination table.
df_dates.to_gbq(
BIGQUERY_DESTINATION,
if_exists="replace",
clustering_columns=["ESTIMATE_DATE", "STATE", "COUNTY"],
)
In this sample, I put all of the BigFrames code in a single operator. This has the benefit of all temporary resources getting cleaned up at the same time, but one loses out on Airflow’s ability to retry failed operations separately, which can allow for a more robust pipeline.
To prevent data from being cleaned up when the method operator finishes, call to_gbq(). If you don’t supply a name argument, one will be automatically assigned. You can then use Airflow’s XCom feature to pass the table ID along to the next task. Make sure to create a node that cleans up the table that is created when the DAG finishes or else the table may live for up to 7 days before being automatically cleaned up by BigQuery.
def preprocess(task_instance):
# Imports and settings go here.try:
df = bpd.read_csv(GCS_LOCATION, engine="bigquery")
# Preprocessing code goes here.
task_instance.xcom_push(key="census_preprocessed_table", value=df.to_gbq())
finally:
bpd.close_session()
def validate_and_write(task_instance):
# Imports and settings go here.
try:
# Get the table ID from the previous step.
bigquery_source = task_instance.xcom_pull(
task_ids="bf_preprocess",
key="census_preprocessed_table",
)
df = bpd.read_gbq(bigquery_source)
# Validation code goes here.
df.to_gbq(bigquery_destination, if_exists="replace")
finally:
bpd.close_session()
bf_preprocess = PythonOperator(
task_id="bf_preprocess",
python_callable=preprocess,
)
bf_validate_and_write = PythonOperator(
task_id="bf_validate_and_write",
python_callable=validate_and_write,
)
cleanup_preprocess_table = BigQueryDeleteTableOperator(
task_id="cleanup_preprocess_table",
deletion_dataset_table="{{ task_instance.xcom_pull(task_ids='bf_preprocess', key='census_preprocessed_table' }}",
# Always execute, even if the previous task failed.
# https://stackoverflow.com/a/44441890/101923
trigger_rule="all_done",
)
download_upload >> bf_preprocess >> bf_validate_and_write >> cleanup_preprocess_table
bf_preprocess >> cleanup_preprocess_table
For a detailed example, checkout my code snippets GitHub repository:
The integration of BigQuery DataFrames with Apache Airflow provides data engineers with the means to construct more robust, scalable, and efficient data pipelines. By performing data transformations and validations directly within BigQuery, you gain several significant advantages:
- Enhanced Safety: Minimize the risk of memory overloads on Airflow workers and execute crucial data validations before any impact on production systems.
- Greater Flexibility: Harness the extensive capabilities of BigQuery for complex transformations, unburdened by the memory constraints of individual Airflow workers.
- Familiar Workflow: Data engineers and analysts already proficient with pandas will find BigFrames a natural and intuitive extension for interacting with BigQuery data.
This approach effectively delegates the intensive computational tasks to your data warehouse, allowing Airflow to focus on its core strength: precise and reliable workflow orchestration. Happy pipelining!
The BigFrames team would love to hear from you. If you would like to reach out, please send an email to: bigframes-feedback@google.com or by filing an issue at the open source BigFrames repository. To receive updates about BigFrames, subscribe to the BigFrames email list.
Source Credit: https://medium.com/google-cloud/creating-a-production-ready-data-pipeline-with-apache-airflow-and-bigframes-bead7d7d164b?source=rss—-e52cf94d98af—4