


Google DeepMind has just launched Gemini 2.5, and I couldn’t resist putting it to the test for my day-to-day tasks. I decided to compare its efficiency with Gemini 2.0 by trying out code generation for a spark job. My goal was to assess how much the latest model surpasses its predecessor.
After thorough testing, I have documented my findings below:
Give me a spark code to migrate an avro file from gcs bucket to bigquery adaptive to the target schema. Make the file path as well as target schema mapped to file contents configurable
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
def migrate_avro_to_bigquery(
spark: SparkSession,
gcs_avro_path: str,
bigquery_table: str,
schema_mapping_config: dict,
temp_gcs_bucket: str,
write_mode: str = "append", # or "overwrite"
**bigquery_options,
) -> None:
"""
Migrates an Avro file from a Google Cloud Storage (GCS) bucket to a BigQuery table,
adapting to the target schema based on a provided schema mapping configuration.
Args:
spark: SparkSession instance.
gcs_avro_path: Path to the Avro file in GCS (e.g., "gs://your-bucket/path/to/file.avro").
bigquery_table: BigQuery table name in the format "project.dataset.table"
schema_mapping_config: Dictionary defining the mapping between Avro schema and BigQuery schema.
Example:
{
"field1_avro": "field1_bigquery", # Simple rename
"field2_avro": "CAST(field2_avro AS STRING) AS field2_bigquery", # Transformation
"field3_avro": None, # Omit this field in BigQuery
"new_field_bigquery": "'default_value' AS new_field_bigquery" # Add new field with default
}
temp_gcs_bucket: GCS bucket to use as a temporary location for BigQuery write operations
(required by Spark for large writes to BigQuery). Example: "gs://your-temp-bucket"
write_mode: "append" (default) to add data to the table or "overwrite" to replace the table
(Note: "overwrite" will truncate the table).
bigquery_options: Additional options to pass to the BigQuery connector. This is a dictionary.
Example: {"partitionField": "date_column", "partitionType": "DAY"}
Raises:
ValueError: If gcs_avro_path or bigquery_table are empty or None.
ValueError: If schema_mapping_config is not a dictionary.
ValueError: If temp_gcs_bucket is empty or None.
"""
if not gcs_avro_path:
raise ValueError("gcs_avro_path cannot be empty or None.")
if not bigquery_table:
raise ValueError("bigquery_table cannot be empty or None.")
if not isinstance(schema_mapping_config, dict):
raise ValueError("schema_mapping_config must be a dictionary.")
if not temp_gcs_bucket:
raise ValueError("temp_gcs_bucket cannot be empty or None.")
try:
# 1. Read Avro data from GCS
df = spark.read.format("avro").load(gcs_avro_path)
# 2. Apply Schema Mapping & Transformations
select_expressions = []
for avro_field, bq_expression in schema_mapping_config.items():
if bq_expression is None:
# Omit the field
continue
elif avro_field in df.columns:
# Rename or transform existing field
select_expressions.append(expr(f"{bq_expression}"))
else:
select_expressions.append(expr(f"{bq_expression}"))
df_transformed = df.select(*select_expressions)
# 3. Write to BigQuery
writer = df_transformed.write.format("bigquery") \
.option("table", bigquery_table) \
.option("temporaryGcsBucket", temp_gcs_bucket) \
.mode(write_mode)
for option, value in bigquery_options.items():
writer = writer.option(option, value)
writer.save() # Triggers the BigQuery write
print(f"Successfully migrated data from {gcs_avro_path} to {bigquery_table} with schema mapping.")
except Exception as e:
print(f"An error occurred during migration: {e}")
raise
if __name__ == "__main__":
# Example Usage (replace with your actual values)
# 1. Configure Spark Session
spark = SparkSession.builder \
.appName("AvroToBigQueryMigration") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.4.1,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.34.0") \
.getOrCreate()
# 2. Define Input/Output Parameters
gcs_avro_path = "gs://your-bucket/input/your_avro_file.avro"
bigquery_table = "your-project.your_dataset.your_table"
temp_gcs_bucket = "gs://your-temp-bucket"
write_mode = "append" # or "overwrite"
# 3. Define Schema Mapping Configuration
schema_mapping_config = {
"user_id": "CAST(user_id AS STRING) AS user_id", # Cast to string for BigQuery
"username": "username", # Simple rename
"email": "email",
"registration_date": "CAST(registration_date AS STRING) AS registration_date",
"city": None, # Omit this field.
"country": "'USA' AS country" # Add a column named country with value USA.
}
# 4. BigQuery Write Options (optional)
bigquery_options = {
"partitionField": "registration_date", # If applicable
"partitionType": "DAY"
}
# 5. Run the Migration
try:
migrate_avro_to_bigquery(
spark=spark,
gcs_avro_path=gcs_avro_path,
bigquery_table=bigquery_table,
schema_mapping_config=schema_mapping_config,
temp_gcs_bucket=temp_gcs_bucket,
write_mode=write_mode,
**bigquery_options
)
except Exception as e:
print(f"Migration failed: {e}")
finally:
spark.stop()
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
import argparse
import json
def migrate_avro_to_bigquery(
spark: SparkSession,
gcs_avro_path: str,
bigquery_table: str,
schema_mapping_config: dict,
temp_gcs_bucket: str,
write_mode: str = "append",
**bigquery_options,
) -> None:
"""
Migrates Avro data from GCS to BigQuery, applying schema transformations.
Args:
spark: Active SparkSession.
gcs_avro_path: GCS path to the source Avro file(s) (e.g., "gs://bucket/path/*.avro").
Can be a single file or a pattern.
bigquery_table: Target BigQuery table in "project.dataset.table" format.
schema_mapping_config: Dictionary defining the mapping and transformations.
Keys are source Avro field names (or arbitrary names for new fields).
Values are Spark SQL expressions defining the target BigQuery column.
Use None as a value to drop a source field.
Example:
{
"avro_col1": "bq_col1", # Rename
"avro_col2": "CAST(avro_col2 AS STRING) AS bq_col2", # Transform
"avro_col3": None, # Drop
"new_bq_col": "current_timestamp() AS ingestion_time" # Add new
}
temp_gcs_bucket: GCS bucket for temporary BigQuery write data
(e.g., "your-temp-bucket-name"). Must be in the same region as the BigQuery dataset.
write_mode: Spark write mode ('append', 'overwrite', 'errorifexists', 'ignore').
Defaults to 'append'. Use 'overwrite' with caution as it replaces table data.
bigquery_options: Additional options for the BigQuery Spark connector (passed as kwargs).
Example: partitionField="date_col", clusteredFields="id_col"
"""
print(f"Starting migration for Avro path: {gcs_avro_path}")
print(f"Target BigQuery table: {bigquery_table}")
print(f"Write mode: {write_mode}")
print(f"Temporary GCS bucket: {temp_gcs_bucket}")
print(f"Schema mapping config: {schema_mapping_config}")
print(f"Additional BigQuery options: {bigquery_options}")
if not all([gcs_avro_path, bigquery_table, temp_gcs_bucket]):
raise ValueError("GCS path, BigQuery table, and temporary GCS bucket must be provided.")
if not isinstance(schema_mapping_config, dict):
raise ValueError("schema_mapping_config must be a dictionary.")
try:
# 1. Read Avro data from GCS
print(f"Reading Avro data from {gcs_avro_path}…")
df_avro = spark.read.format("avro").load(gcs_avro_path)
print("Avro schema:")
df_avro.printSchema()
print(f"Read {df_avro.count()} records.")
# 2. Prepare SELECT expressions based on schema mapping config
select_exprs = []
avro_source_columns = set(df_avro.columns) # Get actual columns from the Avro file
for source_key, target_expression in schema_mapping_config.items():
if target_expression is None: # Explicitly marked to be dropped
print(f"Dropping source key/column: {source_key}")
continue
elif not isinstance(target_expression, str):
raise ValueError(
f"Invalid expression type for key '{source_key}': {type(target_expression)}. Expected string or None."
)
elif source_key not in avro_source_columns and not target_expression.strip().lower().startswith(
("'", '"', "lit(", "current_", "cast(", "struct(", "[")
):
# Warn if source key doesn't exist AND expression doesn't look like literal/function
# Helps catch typos in source field names when intending to transform them.
print(
f"Warning: Source key '{source_key}' not found in Avro columns {avro_source_columns}. "
f"Assuming '{target_expression}' defines a new column."
)
# Add the expression (handles renaming, transformation, and new columns)
select_exprs.append(expr(target_expression))
if not select_exprs:
raise ValueError("Schema mapping resulted in no columns to select. Check your configuration.")
# 3. Apply transformations and select target columns
print("Applying schema mapping and transformations…")
df_transformed = df_avro.select(select_exprs)
print("Transformed schema for BigQuery:")
df_transformed.printSchema()
# 4. Configure BigQuery writer
writer = df_transformed.write.format("bigquery") \
.option("table", bigquery_table) \
.option("temporaryGcsBucket", temp_gcs_bucket) \
.mode(write_mode)
# Apply any additional BigQuery options
for key, value in bigquery_options.items():
writer = writer.option(key, value)
# 5. Write to BigQuery
print(f"Writing data to BigQuery table {bigquery_table}…")
writer.save()
print(f"Successfully migrated data from {gcs_avro_path} to {bigquery_table}.")
except Exception as e:
print(f"Error during migration: {e}")
raise # Re-raise the exception to signal failure
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Migrate Avro from GCS to BigQuery with schema mapping.")
parser.add_argument(" - gcs-avro-path", required=True, help="GCS path to source Avro file(s) (e.g., gs://bucket/path/*.avro)")
parser.add_argument(" - bigquery-table", required=True, help="Target BigQuery table (project.dataset.table)")
parser.add_argument(" - schema-mapping-json", required=True, help="JSON string or path to JSON file defining the schema mapping")
parser.add_argument(" - temp-gcs-bucket", required=True, help="GCS bucket for BQ temporary writes")
parser.add_argument(" - write-mode", default="append", choices=['append', 'overwrite', 'errorifexists', 'ignore'], help="Spark write mode")
parser.add_argument(" - partition-field", help="BigQuery partition field name")
parser.add_argument(" - partition-type", default="DAY", help="BigQuery partition type (DAY, HOUR, MONTH, YEAR)")
parser.add_argument(" - clustered-fields", help="Comma-separated list of BigQuery clustering fields")
args = parser.parse_args()
print("Initializing Spark Session…")
spark = SparkSession.builder \
.appName("GCSAvroToBigQueryMigration") \
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.4.1,com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.34.0") \
.getOrCreate()
try:
schema_mapping_input = args.schema_mapping_json
if schema_mapping_input.strip().startswith("{"):
print("Parsing schema mapping from JSON string argument…")
schema_mapping_config = json.loads(schema_mapping_input)
else:
print(f"Reading schema mapping from file: {schema_mapping_input}…")
schema_content = spark.sparkContext.textFile(schema_mapping_input).collect()
schema_mapping_config = json.loads("\n".join(schema_content))
bq_options = {}
if args.partition_field:
bq_options["partitionField"] = args.partition_field
bq_options["partitionType"] = args.partition_type
if args.clustered_fields:
bq_options["clusteredFields"] = args.clustered_fields
migrate_avro_to_bigquery(
spark=spark,
gcs_avro_path=args.gcs_avro_path,
bigquery_table=args.bigquery_table,
schema_mapping_config=schema_mapping_config,
temp_gcs_bucket=args.temp_gcs_bucket,
write_mode=args.write_mode,
**bq_options
)
except Exception as e:
print(f"Migration script failed: {e}", )
# Consider exiting with a non-zero status code in a production script
# import sys
# sys.exit(1)
finally:
print("Stopping Spark Session.")
spark.stop()
Now lets compare these codes and see who won
Winner: Snippet 2 generated a Fully CLI-driven code easier to integrate into pipelines or cron jobs
Snippet 2 is more likely to avoid silent errors because pipeline expects schema evolution and hence reacts well to error in schema mapping
Snippet 2 has better debugging support and clarity for data engineers during transformation
Snippet 2 is generated with production in mind (think: Airflow job, Dataproc, CI/CD pipeline, etc.)
Gemini 2.5’s CoT thinking
CoT for Gemini 2.5 when the prompt was provided:
Reflects a very very strong engineering thinking, from the very vague prompt it picked up the core requirements, tech stack, identifies the logical steps of implementation, anticipates real world applications, identifies good coding practices. If an instructor is guiding an implementation, here’s what they do and thats exactly how CoT is replicating it:
- ● Here’s what we’re trying to do.
- ● Here’s the tech stack we need.
- ● Here’s how you break it down into pieces.
- ● Here’s how you handle the ugly bits like schema mismatch and missing fields.
- ● Here’s how to make it solid and reusable.
Understand the Core Requirement: The goal is to read an Avro file from GCS and write it to BigQuery using PySpark. The key challenges are:
Making the GCS path configurable.
Making the target BigQuery table configurable.Handling potential differences between the Avro schema and the desired BigQuery schema (schema adaptation).
Making this schema adaptation configurable.
Identify Necessary Tools/Libraries:
PySpark: The core processing engine.
Spark Avro Connector: To read Avro files (org.apache.spark:spark-avro_2.12:
). Spark BigQuery Connector: To write to BigQuery (com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:
). Google Cloud Storage (GCS): The source location.
BigQuery: The target destination.
Outline the Steps:
Initialize a Spark Session. This needs the necessary connector JARs configured.
Define input parameters: GCS path, BigQuery table, temporary GCS bucket (required by BQ connector).
Define the schema mapping mechanism. How will the user specify the desired transformations? A dictionary seems like a good, flexible choice. Keys could be Avro field names, and values could be how they map to BigQuery (simple rename, transformation expression, or None to drop).
Read the Avro file from GCS using Spark’s Avro reader.
Apply the schema mapping. This involves selecting and potentially transforming columns based on the configuration dictionary.
Write the transformed DataFrame to BigQuery using the Spark BigQuery connector. This requires specifying the table, temporary bucket, and write mode (append or overwrite).
Handle potential errors gracefully.
Provide an example of how to use the code. Ensure the Spark Session is stopped properly. Flesh out the Spark Session Initialization:
Use SparkSession.builder.
Set appName.Crucially, add .config(“spark.jars.packages”, “…”) with the correct Maven coordinates for the Avro and BigQuery connectors. Self-correction: Initially might forget the Scala version suffix (_2.12), but it’s important for compatibility. Need to mention that the user should adjust versions as needed.
Design the Schema Mapping Configuration:
A dictionary schema_mapping_config is intuitive.
Example mappings:
“avro_col”: “bq_col” (Rename)
“avro_col”: “CAST(avro_col AS STRING) AS bq_col” (Transform + Rename) “avro_col”: None (Drop)
“new_bq_col”: “‘some_value’ AS new_bq_col” (Add new column with literal/expression)Consider edge cases: What if a key in the mapping doesn’t exist in the Avro file? The code should handle this — treat it as a new column definition. What if a value is just a rename? Just use the value as the alias. What if it’s a transformation? Use expr().
Implement the Core Logic (Reading, Transforming, Writing):
Read: spark.read.format(“avro”).load(gcs_avro_path) — straightforward. Transform:
Iterate through the schema_mapping_config dictionary.Build a list of Spark SQL expressions (select_expressions). For each (avro_field, bq_expression) pair:
If bq_expression is None, skip (drop the column).Check if avro_field exists in the input DataFrame’s columns (df.columns). This helps differentiate between transforming an existing field and adding a new one based solely on the bq_expression. Correction: The initial thought might be simpler, just using the bq_expression. But explicitly checking avro_field in df.columns might make the intent clearer, though expr() handles both cases well. Let’s stick with iterating through the config and using expr(bq_expression) for simplicity and power, as expr can handle literals, column references, and functions.
Append expr(bq_expression) to the select_expressions list.
Apply the selection: df_transformed = df.select(*select_expressions) Write:
df_transformed.write.format(“bigquery”)
.option(“table”, bigquery_table)
.option(“temporaryGcsBucket”, temp_gcs_bucket) .mode(write_mode)Add flexibility for other BigQuery options using **kwargs or a dedicated dictionary (bigquery_options). Passing **bigquery_options to the main function and iterating through them to set .option() is clean.
.save() triggers the write. Add Robustness:
Function Definition: Encapsulate the logic in a function (migrate_avro_to_bigquery) for reusability.
Type Hinting: Add type hints for clarity (spark: SparkSession, gcs_avro_path: str, etc.).
Docstrings: Write clear docstrings explaining the function’s purpose, arguments, and potential exceptions.
Input Validation: Check for empty/None paths and tables. Check if the schema config is a dictionary. Add ValueError exceptions.
Error Handling: Use a try…except…finally block. Catch general Exception, print an informative message, and re-raise it if needed. Use finally to ensure spark.stop().
Create an Example Usage (if _name_ == “_main_”:)
Instantiate SparkSession.
Define sample values for paths, table, bucket, and the schema mapping config. Include examples of renaming, casting, dropping, and adding columns.
Define sample bigquery_options.
Call the migration function within a try…except block.
Stop the Spark Session in a finally block.
Review and Refine:
Read through the code. Is it clear? Is it logical?
Are the comments helpful?
Are the variable names descriptive?
Does the schema mapping logic correctly handle all cases (rename, transform, drop, add)? The use of expr() seems robust enough.
Are the necessary dependencies mentioned? (Both pip packages and Spark JARs).
Are authentication and permissions mentioned as prerequisites? Yes, crucial for GCP interactions.
Is the temporary GCS bucket requirement clearly explained? Yes.
Is the write_mode explanation clear (especially the impact of overwrite)? Yes.
This thought process moves from the high-level requirement to specific implementation details, considering potential issues, adding robustness, and ensuring usability through clear examples and documentation. The use of the schema mapping dictionary and expr() is central to achieving the configurable schema adaptation.
In conclusion, Gemini 2.5 excels in code generation. Based on my observations, Gemini 2.5 may very well be Google’s best model yet and I wont be surprised if all the workbench results will resonate the same.
Source Credit: https://medium.com/google-cloud/gemini-2-0-vs-gemini-2-5-43ab0123cd3c?source=rss—-e52cf94d98af—4