
Here’s a high-level view of the operator’s responsibilities:
class CloudStorageTransferCheckRunJobOperator(BaseOperator):
...
def execute(self, context):
# Check if the job exists, create if necessary
# Run the job and poll for completion
# Create an external table once done
We designed our transfer jobs to be fully configurable via JSON files, allowing maximum flexibility.
Sample JSON file:
{
"storagetransfer_config": {
"projectId": "yourbigqueryprojectname",
"description": "yourbigqueryprojectname-yourdatasetname-ext_azure_yourtablename",
"status": "ENABLED",
"transferSpec": {
"azureBlobStorageDataSource": {
"storageAccount": "xxx",
"container": "xxx",
"path": "deltalake/yourtablefoldername/",
"azureCredentials": {
"sasToken": "xxx"
}
},
"gcsDataSink": {
"bucketName": "yourgcsbucketname",
"path": "azure/xxx/xxx/deltalake/yourtablename/"
},
"transferOptions": {
"metadataOptions": {
"storageClass": "STORAGE_CLASS_DESTINATION_BUCKET_DEFAULT",
"timeCreated": "TIME_CREATED_SKIP"
},
"overwriteWhen": "DIFFERENT"
}
}
},
"labels": {
"department": "xxx",
"repo": "xxx",
"type": "incremental"
}
}
gcloud transfer jobs describe 11111111111111111111 --format=json
bq update --set_label department:xxx --set_label repo:xxx --set_label type:incremental yourbigqueryprojectname:bronzelayer.ext_azure_yourtablename
Using the custom operator, a task in Airflow can be created easily:
transfer_table = create_storagetransfer_task(
dag,
job_config['storagetransfer_config'],
f"{table_name}",
priority,
schedule_time,
transfer_task_group
)
This ensures that every table transfer is managed through a clean, repeatable Airflow task.
import time as time_module
import json
import os
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from google.cloud import bigquery
from googleapiclient.discovery import build
from functools import partial
class CloudStorageTransferCheckRunJobOperator(BaseOperator):
@apply_defaults
def __init__(self, project_id, body, *args, **kwargs):
super().__init__(*args, **kwargs)
self.project_id = project_id
self.body = body
def _poll_job_status(self, service, operation_name, poll_interval=30):
self.log.info(f"Polling was started for Job Status. Operation: {operation_name}")
while True:
operation_status = service.transferOperations().get(name=operation_name).execute()
if operation_status.get('done'):
if 'error' in operation_status:
self.log.error(f"Job is over with an error: {operation_status['error']}")
raise Exception(f"Job is over with an error: {operation_status['error']}")
self.log.info("Job was completed successfully.")
return operation_status
else:
self.log.info("Job is still ongoing, expected...")
time_module.sleep(poll_interval) def create_external_table(self):
client = bigquery.Client(project=self.project_id)
dataset = "bronzelayer"
transfer_config = self.body.get('storagetransfer_config', self.body)
bucket_name = transfer_config['transferSpec']['gcsDataSink']['bucketName']
path = transfer_config['transferSpec']['gcsDataSink']['path'].rstrip('/')
gcs_uri = f"gs://{bucket_name}/{path}"
table_name = path.split('/')[-1]
table_ref = f"{self.project_id}.{dataset}.ext_azure_{table_name}"
query = f"""
CREATE OR REPLACE EXTERNAL TABLE `{table_ref}`
OPTIONS (
format = "DELTA_LAKE",
uris = ["{gcs_uri}"]
)
"""
self.log.info(f"Creating an external table: {table_ref}")
self.log.info(f"Query: {query}")
query_job = client.query(query)
query_job.result()
self.log.info(f"External table created successfully: {table_ref}") def execute(self, context):
target_description = self.body.get('description')
if not target_description:
raise ValueError("Job body must contain 'description' field")
if 'projectId' not in self.body:
self.body['projectId'] = self.project_id
service = build('storagetransfer', 'v1')
filter_str = json.dumps({"projectId": self.project_id})
response = service.transferJobs().list(filter=filter_str).execute()
transfer_jobs = response.get('transferJobs', [])
for job in transfer_jobs:
if job.get('description') == target_description:
job_id = job.get('name')
self.log.info(f"Current job found: {job_id}. Job is running.")
run_response = service.transferJobs().run(
jobName=job_id,
body={"projectId": self.project_id}
).execute()
operation_name = run_response.get('name')
self._poll_job_status(service, operation_name)
self.create_external_table()
return run_response
self.log.info("Matching job not found. Creating a new job...")
create_response = service.transferJobs().create(body=self.body).execute()
new_job_id = create_response.get('name')
self.log.info(f"A new job has been created: {new_job_id}. Job is running.")
run_response = service.transferJobs().run(
jobName=new_job_id,
body={"projectId": self.project_id}
).execute()
operation_name = run_response.get('name')
self._poll_job_status(service, operation_name)
self.create_external_table()
return run_responsedef create_storagetransfer_task(dag, body, task_id, priority=8, schedule_time=None, task_group=None):
return CloudStorageTransferCheckRunJobOperator(
task_id=task_id,
project_id=dag.params['PROJECT_ID'],
body=body,
dag=dag,
priority_weight=priority,
task_group=task_group,
on_failure_callback=partial(notify_failure_callback)
)
During the development and deployment of this system, we encountered several challenges:
- Polling Transfer Job Status:
The Storage Transfer Service API doesn’t provide instant callbacks. We had to design a safe polling mechanism with retries and exponential backoff in case of API rate limits. - External Table Consistency:
If a transfer failed midway, we needed a way to ensure that no invalid external tables were created. We added safeguards to only create tables after successful transfers. - Handling Job Idempotency:
Storage Transfer Service (STS) jobs are created with system-generated random IDs, which makes it difficult to ensure uniqueness based on the job ID alone.
To address this, we enforced uniqueness by using thedescription
field as an identifier.
Our custom operator first checks for an existing job by matching the description before attempting to create a new one, ensuring that duplicate jobs are not created.
- Use Descriptive Job Names:
Always use structured and meaningful descriptions for transfer jobs, making automation easier and safer. - Plan for Partial Failures:
Design your Airflow DAGs to be resilient. Even if a transfer partially fails, your pipeline should recover or alert you clearly. - Leverage JSON Configurations:
Keeping transfer jobs fully JSON-configurable allowed us to scale rapidly and adapt to new tables with minimal code changes.
By combining the flexibility of Apache Airflow, the scalability of Google Cloud Storage, and the reliability of Storage Transfer Service, we successfully built a modern, fully automated data ingestion pipeline that continuously pulls Delta Lake data from Azure into Google Cloud.
Source Credit: https://medium.com/google-cloud/building-a-modern-data-pipeline-with-storage-transfer-service-and-apache-airflow-on-google-cloud-d08e2aef2091?source=rss—-e52cf94d98af—4