
Inserting data to multiple tables in Google Cloud Spanner via the Apache Beam Python SDK has the benefit of fast distributed processing but comes with restrictions. When working with the Apache Beam Python SDK and Google Cloud Spanner, the `SpannerInsert` module requires a specific input format, a `NamedTuple`. This can pose a challenge when dealing with multiple tables with varying schemas, especially if these schemas are not known beforehand. This post will explore how to dynamically create `NamedTuple` classes to overcome this hurdle, allowing for seamless insertion into multiple Spanner tables regardless of their structure.
The `SpannerInsert` module expects a `NamedTuple` where the field names correspond to the column names in the target Spanner table and the values correspond to the data to be inserted. When dealing with many tables or schemas that are not known until runtime, manually defining `NamedTuple` classes for each table is impractical.
What if instead of manually defining a NamedTuple for each table, our Python script could connect to a source database or a schema file on a Google Cloud Bucket, understand the schema, and generate those NamedTuples for us to use within a Beam pipeline?
To do this we first need to understand the SpannerInsert and NamedTuple classes.
The SpannerInsert class is included in the apache_beam.io.gcp.spanner module. SpannerInsert is A PTransform which writes insert mutations to the specified Spanner table. This transform receives rows defined as NamedTuple.
The NamedTuple python class from the Typing module is a class that has an extra attribute __annotations__ giving a dict that maps the field names to the field types.
Below is a simple example of how to use SpannerInsert and NamedTuple together in an Apache Beam pipeline.
from typing import NamedTuple
from apache_beam import codersclass ExampleRow(NamedTuple):
id: int
name: unicode
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with Pipeline() as p:
_ = (
p
| 'Impulse' >> beam.Impulse()
| 'Generate' >> beam.FlatMap(lambda x: range(num_rows))
| 'To row' >> beam.Map(lambda n: ExampleRow(n, str(n))
.with_output_types(ExampleRow)
| 'Write to Spanner' >> SpannerInsert(
instance_id='your_instance',
database_id='existing_database',
project_id='your_project_id',
table='your_table'))
This example only shows data for one table being inserted to Spanner. If we have many tables we face the following challenges:
- Manually defining many tables is time consuming and becomes inefficient
- We need to properly assign each row of data to the corresponding NamedTuple
- Because SpannerInsert requires a table argument we need to loop through the tables
Lets go through each of these challenges
Manually defining many tables is time consuming and becomes inefficient. For this reason we want to dynamically create NamedTuples and write them to a new .py file. A NamedTuple definition requires the column and column type. To get these definitions you must extract the schema from your source database. Depending on your source database the schema will look different. Parse through schema and store the column names and data types in a python dictionary with the key being the table name and the value being a nested dictionary containing column_name:data type.
tables_and_columns = {
"Order": {"orderId" : "Int","order_status": "String"},
"Product": {"productId": "Int", "product_name": "String"},
"Supplier" : {"supplierId" : "Int", "supplier_name" : "String"}
}
You may need a data type conversion function to convert the data types to be compatible with python. Below is an example:
def convert_column_type(string_type):
type_mapping = {
"String": 'str',
"Long": 'int',
"Date": 'str',
"StringArray": 'List[str]',
"Double": 'float',
None: 'str',
"Boolean": 'bool'
}
return type_mapping.get(string_type, string_type)
Now that you have your column names and data types in a dictionary you can 1) create a new python file, 2) loop through your dictionary, 3) convert the data type definitions, and 4) write to the new file
pk_map = {
"Category":"categoryID",
"Supplier":"supplierID",
"Customer":"customerID",
"Order":"orderID",
"Product":"productID"
}generated_named_tuple_file = "./generated_named_tuples.py"
#create a file and write import statements
with open(generated_named_tuple_file, 'w') as f:
f.write("from apache_beam import coders\n")
f.write("from typing import NamedTuple, List, Optional\n")
f.write("import datetime\n")
#loop through tables_and_columns dict
for table_name, column_defs in tables_and_columns.items():
named_tuple_string = []
#NamedTuples require you write the non-null columns first
pk = pk_map.get(table_name)
pk_type = column_defs.get(pk)
converted_pk_type = convert_column_type(pk_type)
named_tuple_string.append(f"{pk} : {converted_pk_type}")
#Write the optional of the columns
for column_name, column_type in column_defs.items():
if column_name != pk:
converted_column_type = convert_column_type(column_type)
named_tuple_string.append(f"{column_name} : Optional[{converted_column_type}] = None")
generated_string = f '''class SpannerRow_{table_name}(NamedTuple):
\t{'\n\t'.join(named_tuple_string)}'''
f.write(generated_string + "\n")
#register the NamedTuple row coders
for table_name, column_defs in tables_and_columns.items():
class_name = f"SpannerRow_{table_name}"
target_class = getattr(generated_named_tuples, class_name)
coders.registry.register_coder(target_class, coders.RowCoder)
logging.info(f"REGISTERED ROWCODER FOR {class_name}")
#import and reload the new file
import generated_named_tuples
importlib.reload(generated_named_tuples)
Here is an example output of this function:
Once you have your schema parsed out into a dictionary similar to `tables_and_columns` shown above, you can also dynamically create Spanner tables. This can be done inside or outside of your beam pipeline using the python Spanner SDK. Below is an example class to do this.
import logging
from google.cloud import spanner
import apache_beam as beam
from google.api_core import exceptionsclass CreateSpannerTables:
def __init__(self, spanner_instance_id, spanner_database_id, project_id, pk_map):
self.spanner_instance_id = spanner_instance_id
self.spanner_database_id = spanner_database_id
self.pk_map = pk_map
self.spanner_client = None
self.database = None
self.project_id = project_id
def _convert_type(self, neo4j_type):
type_mapping = {
"String": "STRING(MAX)",
"Long": "INT64",
"Integer": "INT64",
"Double": "FLOAT64",
"Float": "FLOAT64",
"Boolean": "BOOL",
"Date": "STRING(MAX)",
"DateTime": "TIMESTAMP",
"Point": "STRING(MAX)",
"StringArray": "ARRAY",
"LongArray": "ARRAY",
"null" : "STRING(MAX)"
}
spanner_type = type_mapping.get(neo4j_type)
if spanner_type is None:
return "STRING(MAX)"
return spanner_type
def process(self, tables_and_columns):
self.spanner_client = spanner.Client(project=self.project_id)
instance = self.spanner_client.instance(self.spanner_instance_id)
self.database = instance.database(self.spanner_database_id)
for table_name, columns in tables_and_columns.items():
pk_column = self.pk_map[table_name]
column_defs = []
for column_name, column_date_type in columns.items():
spanner_type = self._convert_type(column_date_type)
column_defs.append(f"{column_name} {spanner_type}")
ddl_statement = f"CREATE TABLE `{table_name}` ({', '.join(column_defs)}) PRIMARY KEY (`{pk_column}`)"
try:
operation = self.database.update_ddl([ddl_statement])
operation.result()
logging.info(f"CreateSpannerTables: Created table '{table_name}' with primary key '{pk_column}'.")
except exceptions.AlreadyExists:
logging.info(f"CreateSpannerTables: Table '{table_name}' already exists.")
except exceptions.FailedPrecondition as e:
if "Duplicate name in schema" in str(e):
logging.warning(f"CreateSpannerTables: Table '{table_name}' already exists.")
else:
raise e
except Exception as ddl_error:
logging.error(f"CreateSpannerTables: Error creating table '{table_name}': {ddl_error}")
raise ddl_error
Now that we have our NamedTuple classes defined we can start our Beam pipeline and use the classes to transform row data. In this example dump data is stored in jsonl formatted files on a GCP bucket. Each file represents a separate table. My pipeline reads those files, reads through each json line, and transforms each json line to a NamedTuple depending on the table the file is a dump from. We need to properly assign each row of data to the corresponding NamedTuple. Because SpannerInsert requires a table argument we also need to loop through the tables.
with beam.Pipeline(options=pipeline_options) as p:
#Loop through multiple tables
for table in tables_and_columns.keys():
#Step 1: Read the dump file. Yields each json line
read_dump = p | f"Read JSON dump file for {table}" >> beam.io.ReadFromText(f'{known_args.output_dir}{table}.json-*', strip_trailing_newlines=True, validate=False)#Step 2: Clean and transform data. Ex: remove empty lines, convert data types, etc. Ensure these steps yeild a dictionary {column_name:value_to_insert}. See example of RemoveAndExtract below
dump_data = read_dump | f"Remove Empty Lines and extract values for {table}" >> beam.ParDo(RemoveAndExtract())
#Step 3: Logging
dump_data | f"Log dump data for {table}" >> beam.Map(lambda x: logging.info(f'DUMP DATA IN PIPELINE:{x}'))
#Step 4: Convert rows to appropriate class
class_name = f"SpannerRow_{table}"
target_class = getattr(generated_named_tuples, class_name)
data_rows = dump_data | f"Convert to {table} row class" >> beam.Map(lambda x, target_class=target_class : target_class(**x)).with_output_types(target_class)
#Step 5 Insert to Spanner
data_rows | f"Write to Spanner for {label}" >> SpannerInsertOrUpdate(
project_id=project_id,
instance_id=spanner_instance_id,
database_id=spanner_database_id,
table=table
)
import json
import logging
import apache_beam as beam#example JSONL
#{table_name: Order, column_data:{orderId: 123, order_status: "Shipped"}, last_modified: 05-14-2025}
class RemoveAndExtract(beam.DoFn):
def process(self, element):
if not element or not element.strip():
pass
else:
element = json.loads(element)
column_data = element["data"]
yield column_data
And that’s it! As we loop through the tables, each line of data gets transformed into the appropriate NamedTuple class and inserted into the appropriate Spanner table.
This dynamic approach can be integrated into a Beam pipeline that reads data from a source (e.g., Pub/Sub, file system), determines the table schema, and then creates the appropriate `NamedTuple` class before using `SpannerInsert`.
- Read Data: Fetch data from the source.
- Schema Discovery: Determine the schema of the target Spanner table. This could involve fetching the schema from Spanner’s metadata or from the data itself.
- Dynamic Tuple Creation: Create a function to generate a `NamedTuple` class based on the discovered schema.
- Table Creation: Create empty tables in Spanner to hold the migrated data.
- Data Transformation: Transform the incoming data into instances of the dynamically created `NamedTuple`.
- Spanner Insert: Use `beam.io.WriteToSpanner` with `SpannerInsert` to write the data to the respective tables.
- Flexibility: Handles various tables with different schemas without hardcoding `NamedTuple` definitions.
- Scalability: Simplifies pipelines dealing with numerous tables.
- Reduced Maintenance: Avoids the need to update code when table schemas change.
- Error Handling: Ensure proper error handling for schema mismatches or data transformation failures.
- Performance: Dynamically creating classes might have a minor performance overhead, although negligible in most use cases.
- Schema Retrieval: Implement robust schema retrieval mechanisms to ensure accuracy and prevent data insertion errors.
Dynamically creating `NamedTuple` classes provides a powerful solution for working with the `SpannerInsert` Beam module and multiple tables with dynamic schemas. This approach enhances the flexibility and scalability of Beam pipelines, making them suitable for complex data processing workflows.
Code Repo: https://github.com/shaynahpatel/insert_multiple_tables_to_spanner/tree/main
Source Credit: https://medium.com/google-cloud/dynamically-creating-namedtuple-classes-for-spannerinsert-beam-python-module-0f5dd8e4a874?source=rss—-e52cf94d98af—4