
We’ve all been there: you’ve built a fantastic machine learning model in a notebook. You’ve engineered features, tuned hyperparameters, and the AUC looks great. But a model living in a notebook doesn’t drive business value. To make an impact, it needs to be accessible — ideally, via a scalable, serverless API.
In this post, we’ll explore how to take a Logistic Regression model trained with Apache Spark, deploy it as a containerized inference service on Google Cloud Run, and finally fronting it with a natural language agent.
Step 1: Serialize the Model to Cloud Storage
Say you’ve gone through the process of training your model on a series of data using a tool like Google Cloud Serverless for Apache Spark (you may have even taken advantage of the awesome new Lightning Engine accelerator for even speedier performance).
This sample uses Logistic Regression to predict whether a user will make a purchase on an eCommerce website. The model is trained on a BigQuery Public Dataset, theLook eCommerce.
import pyspark.sql.functions as F
from pyspark.sql import SparkSessions
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.functions import array_to_vector
spark = SparkSession.builder \
.appName("ModelTrainer") \
.getOrCreate()
users = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.users").load()
order_items = spark.read.format("bigquery").option("table", "bigquery-public-data.thelook_ecommerce.order_items").load()
# Identify distinct purchasers
purchasers = order_items.select("user_id").distinct()
# Left Join Users to Purchasers
features = (
users
.join(purchasers, users.id == purchasers.user_id, "left")
.select(
F.col("age").cast("double"),
F.hash("country").cast("double").alias("country_hash"),
F.hash("gender").cast("double").alias("gender_hash"),
F.hash("traffic_source").cast("double").alias("traffic_source_hash"),
# If the join found a match (user_id is not null), the label is 1, else 0
F.when(F.col("user_id").isNotNull(), 1).otherwise(0).alias("label")
)
)
#Split Train and Test Data (80:20)
train_data, test_data = features.randomSplit([0.8, 0.2])
# Initialize VectorAssembler
assembler = VectorAssembler(
inputCols=["age_hash", "country_hash", "gender_hash", "traffic_hash"],
outputCol="assembled_features"
)
# Initialize StandardScaler
scaler = StandardScaler(inputCol="assembled_features", outputCol="scaled_features")
# Initialize Logistic Regression model
lr = LogisticRegression(
maxIter=100,
regParam=0.2,
threshold=0.8,
featuresCol="scaled_features",
labelCol="label"
)
# Define pipeline
pipeline = Pipeline(stages=[assembler, scaler, lr])
# Fit the model
pipeline_model = pipeline.fit(train_data)
# Transform the dataset using the trained model
transformed_dataset = pipeline_model.transform(test_data)
Spark MLlib models are portable. In your PySpark workflow, once your PipelineModel is fitted, you simply save it to a bucket.
# Save the model to GCS
pipeline_model.save("gs://your-bucket/models/purchase_prediction_v1")
This saves the metadata and parquet data required to reconstruct the model in a different environment, including the feature engineering steps defined in the Pipeline object.
Step 2: The Inference Container (Flask + Spark)
To serve this model on Cloud Run, we need a container that can:
- Listen for HTTP requests.
- Run a localized Spark context to load the model.
- Perform inference.
We use Flask to handle web requests. The trick here is configuring the Dockerfile to support both Python and the Java dependencies required by PySpark, even though we aren’t running a full cluster.
The Dockerfile
The Dockerfile contains all of the necessary dependencies. In our case, this includes Python, Java, and PySpark. It also installs Python libraries defined in a requirements.txt file.
The Dockerfile contains all of the necessary dependencies. In our case, this includes Python, Java, and PySpark. It also installs Python libraries defined in a requirements.txt file.
flask==3.0.3
google-cloud-storage==2.16.0
gunicorn==23.0.0
pyspark==4.0.1
Werkzeug==3.1.4
pandas
numpy
The Dockerfile kicks off the service using gunicorn.
FROM python:3.12-slim
# Install OpenJDK-21 (Required for Spark)
RUN apt-get update && \
apt-get install -y openjdk-21-jre-headless procps && \
rm -rf /var/lib/apt/lists/*
ENV JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app.py .
CMD ["gunicorn", "--bind", "0.0.0.0:8080", "--workers", "1", "--threads", "8", "--timeout", "0", "main:app"]
The Application Logic
Your Flask app (app.py) initializes a local Spark session on startup to reduce latency for individual requests:
from flask import Flask, request, jsonify
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import PipelineModel
app = Flask(__name__)
# Initialize local Spark Session
spark = SparkSession.builder \
.appName("CloudRunInference") \
.master("local[*]") \
.getOrCreate()
# Load the model from GCS on startup
model = PipelineModel.load("gs://your-bucket/models/purchase_prediction_v1")
@app.route('/predict', methods=['POST'])
def predict():
data = request.get_json()
# Convert JSON to Spark DataFrame
input_df = spark.createDataFrame([data])
# You can add transformation logic here
input_df = input_df.select(
col("age").cast("DOUBLE").alias("age"),
(hash(col("country")).cast("BIGINT") * 1.0).alias("country_hash"),
(hash(col("gender")).cast("BIGINT") * 1.0).alias("gender_hash"),
(hash(col("traffic_source")).cast("BIGINT") * 1.0).alias("traffic_source_hash")
)
# Run Inference
predictions = model.transform(input_df)
# Return result
result = predictions.select("prediction", "probability").collect()[0]
return jsonify({"will_buy": result.prediction, "confidence": result.probability})
# Gunicorn entry point uses 'app' from this file
if __name__ == '__main__':
# Local testing only: Cloud Run uses Gunicorn/CMD command
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Step 3: Deploying to Cloud Run
With the Dockerfile and app.py ready, deployment is handled via the Google Cloud SDK. Cloud Run abstracts away the infrastructure, meaning the service will scale down to zero when not in use and scale up automatically during high traffic.
gcloud run deploy spark-inference \
--source . \
--region us-central1 \
--port 8080 \
--memory "2Gi" \
--allow-unauthenticated
You now have a REST endpoint capable of accepting user attributes and returning purchase probabilities, powered by the robust feature engineering of Spark ML.
Going Further: Natural Language Queries with Agent Engine
An API is great for applications, but what if business stakeholders want to query the model?
We can take this deployment a step further using Vertex AI Agent Engine and the Agent Development Kit (ADK). Instead of writing curl commands, we can configure an Agent to interact with our Cloud Run inference server.
By defining the Cloud Run endpoint as a tool for the agent, a user can ask:
“Is a 25 year old Male from the United States who found us via Search likely to make a purchase?”
The agent interprets the intent, queries the Cloud Run service, and synthesizes the JSON response into a natural language answer.
Summary
By utilizing Cloud Run and Vertex AI Agent Engine, we bridged the gap between data engineering and production microservices. We moved from raw data to a scalable, containerized AI application without managing a single server.
Ready to try it yourself? Check out the full end-to-end Codelab here: Data Science with Spark Agents
Deploy Apache Spark MLlib models to Google Cloud Run was originally published in Google Cloud – Community on Medium, where people are continuing the conversation by highlighting and responding to this story.
Source Credit: https://medium.com/google-cloud/deploy-apache-spark-mllib-models-to-google-cloud-run-e3c6e21b5268?source=rss—-e52cf94d98af—4
