
Building a real-time evaluation pipeline is crucial for the continuous health and improvement of your data agent. It’s not enough to simply deploy an agent; you need immediate feedback on its performance to ensure it’s always providing accurate, relevant, and helpful responses. Think of it like a continuous health check for your AI.
One of the primary reasons for this pipeline is ensuring accuracy. In the dynamic world of AI, models can “drift” over time. User language evolves, new trends emerge, and even subtle changes in data can impact your agent’s understanding and response quality. A real-time evaluation pipeline allows you to instantly detect when the agent starts to misinterpret requests, provide incorrect information, or “hallucinate” nonsensical answers. By catching these issues as they happen, you can proactively intervene, retrain your models, or adjust prompt engineering strategies before they impact a large number of users.
Furthermore, a real-time pipeline is essential for identifying patterns and anomalies. Are certain types of queries consistently leading to inaccurate responses? Is the agent frequently struggling with a new feature or a particular user segment? By collecting and analyzing interaction data as it flows through the pipeline, you can uncover these patterns. This is where Apache Beam’s stateful DoFn becomes incredibly powerful. It allows you to maintain state across elements within a window, meaning you can aggregate data, count occurrences, and even store examples of problematic interactions in real-time. This capability is vital for generating immediate reports and alerts, so you can see if there are any problems and address them without delay. Without this continuous feedback loop, you’d be operating in the dark, potentially leading to a degraded user experience and a less effective agent over time.
Now, you might wonder why use a framework like Apache Beam instead of building all this logic from scratch. The answer lies in scalability, complexity management, and efficiency.
Developing a custom, distributed data processing system from the ground up is an incredibly complex undertaking. You’d be responsible for meticulously managing worker nodes, sharding data, implementing fault tolerance mechanisms, optimizing data movement, and handling concurrency — all of which are prone to errors and require deep expertise in distributed systems.
Apache Beam provides a unified programming model that abstracts away much of this complexity. You define your data transformations once, and Beam handles the heavy lifting of executing them across a distributed environment like Google Cloud Dataflow. This brings several key advantages, for example:
- Built-in Robustness: Beam comes with battle-tested features for fault tolerance, error handling, and data consistency, which are notoriously difficult to implement reliably in custom code.
- Optimized Performance Features: For machine learning inference, Beam’s RunInference transform, for example, automatically handles batching of elements. This is a critical optimization for efficiency: instead of sending individual requests to your ML model, RunInference intelligently groups them into larger batches, significantly improving throughput and reducing latency. Building this kind of sophisticated batching logic, considering factors like batch size, latency thresholds, and resource utilization, would be a substantial engineering effort in a custom solution.
- Simplified Infrastructure: With Beam, you don’t need to deploy and manage separate infrastructure components, such as a dedicated database for handling stateful interactions (like tracking counts or collecting bad examples over time). Beam’s built-in state management capabilities handle this automatically, reducing operational overhead and complexity.
By leveraging a framework, you can focus on the unique business logic of your evaluation pipeline — like defining what constitutes a “good” or “bad” interaction — rather than getting bogged down in the intricacies of distributed systems infrastructure. It allows you to build sophisticated, scalable pipelines much faster and with greater reliability.
Google Cloud Dataflow significantly abstracts away the complexities of GPU & CPU usage, model memory management, and scalability for your evaluation pipelines. When you define your machine learning inference steps using Beam’s RunInference transform, Dataflow handles the underlying infrastructure intricacies. This means you don’t need to manually configure GPU instances, optimize memory allocation for your models, or manage distributed scaling across multiple machines. Dataflow automatically provisions and scales the necessary resources, including GPUs, and intelligently batches inference requests to maximize throughput and efficiency. This abstraction allows you to focus on the logic of your evaluation and the performance of your models, rather than getting bogged down in the operational overhead of managing specialized hardware and distributed systems.
The first step is to ingest the data. While your production system will undoubtedly rely on live message streams from services like Pub/Sub, the development phase often benefits from a more agile approach. This is where test_mode truly shines. Instead of continuously publishing messages to a Pub/Sub topic and waiting for them to propagate, a test mode allows your pipeline to read predefined, in-memory sample data directly. This means you can execute your pipeline locally, instantly observing its behavior and debugging any issues without the complexities of network communication or external dependencies. This immediate feedback loop dramatically compresses the time needed for testing and validation, enabling a much faster development cycle. You can see the implementation in the following code snippet.
with beam.Pipeline(options=pipeline_options) as p:
if known_args.test_mode:
logging.info("Running in test mode with in-memory data.")
parsed_elements = p | 'CreateTestData' >> beam.Create(TEST_DATA)
# Convert dicts to JSON strings and add timestamps for test mode
parsed_elements = parsed_elements | 'ConvertTestDictsToJsonAndAddTimestamps' >> beam.Map(
lambda x: beam.window.TimestampedValue(json.dumps(x), x['timestamp'])
)
else:
logging.info(f"Reading from Pub/Sub topic: {known_args.input_topic}")
parsed_elements = (
p
| 'ReadFromPubSub' >> beam.io.ReadFromPubSub(
topic=known_args.input_topic
).with_output_types(bytes)
| 'DecodeBytes' >> beam.Map(lambda b: b.decode('utf-8')) # Output is JSON string
# Extract timestamp from JSON string for Pub/Sub messages
| 'AddTimestampsFromParsedJson' >> beam.Map(lambda s: beam.window.TimestampedValue(s, json.loads(s)['timestamp']))
)
Beam’s RunInference transform offers significant flexibility through custom ModelHandlers, allowing you to abstract away boilerplate code and integrate your own models seamlessly. This is particularly useful when working with specialized or proprietary models that don’t fit standard frameworks. This handler would encapsulate all the necessary logic to communicate with the Gemini API on Vertex AI, allowing your pipeline to interact with Gemini as if it were a local model. This abstraction simplifies your pipeline code, promotes reusability, and ensures that the complexities of interacting with external AI services are managed in a single, well-defined component.
To call an API, such as Gemini on VertexAI, instead of returning a model in the load_model function you can return a Client. The Client object is then used in the subsequent run_inference function you provide.
class GeminiAccuracyModelHandler(ModelHandler[str, PredictionResult, genai.Client]):
def __init__(self, project: str, location: str, model_name: str, model_kwargs: Optional[Dict[str, Any]] = None, **kwargs):
super().__init__(**kwargs)
self._project = project
self._location = location
self._model_name = model_name
self._model_kwargs = model_kwargs or {}def load_model(self) -> genai.Client:
"""Loads and initializes a model for processing."""
client = genai.Client(
vertexai=True,
project=self._project,
location=self._location,
)
return client
def run_inference(
self,
batch: Sequence[str], # Each item is a JSON string
model: genai.Client,
inference_args: Optional[Dict[str, Any]] = None
) -> Iterable[PredictionResult]:
"""Runs inference on a batch of JSON strings to verify accuracy."""
for json_string in batch:
try:
element_dict = json.loads(json_string)
original_prompt = element_dict.get('original_prompt', '')
original_text = element_dict.get('original_text', '')
if not original_prompt or not original_text:
logging.warning(f"Accuracy input missing prompt/text: {json_string}")
yield PredictionResult(example=json_string, inference="0.0 - ERROR_ACCURACY_INPUT")
continue
prompt_for_accuracy = f"""
You are an AI assistant evaluating the accuracy of another AI agent's response to a given prompt.
Score the accuracy of the 'text' in fulfilling the 'prompt' from 0.0 to 1.0 (float).
0.0 is very bad, 1.0 is excellent.
Example of very bad, score of 0:
prompt: Give me the SQL for test_Table
text: SUre, here's a picture of a dog
Example of very good score of 1:
prompt: generate a sql statement to select all fields from test_table
text: SELECT * from test_table;
Your response should be ONLY the float score, followed by a brief explanation of why.
For example: "0.8 - The response was mostly accurate but missed a minor detail."
Prompt: "{original_prompt}"
Text: "{original_text}"
Score and Explanation:
"""
gemini_response = model.models.generate_content_stream(model=self._model_name, contents=[prompt_for_accuracy], config=self._model_kwargs)
gemini_response_text = ""
for chunk in gemini_response:
if chunk.text is not None:
gemini_response_text+=chunk.text
yield PredictionResult(example=json_string, inference=gemini_response_text)
except Exception as e:
logging.error(f"Error during Gemini accuracy inference for input {json_string}: {e}")
yield PredictionResult(example=json_string, inference="0.0 - ERROR_INFERENCE")
You are also not limited to one model, you can have multiple models. If using Dataflow, the memory management will be handled for you.
If your evaluation needs extend beyond element-wise transformations and involve more intricate logic — such as tracking cumulative metrics, performing aggregations over time, or managing complex session-based data — then Apache Beam’s stateful capabilities become indispensable. Building these kinds of sophisticated operations from scratch in a distributed environment is notoriously challenging, often leading to brittle and inefficient solutions. Beam’s built-in support for states and timers elegantly addresses these complexities, providing a robust and scalable framework to handle even the most demanding real-time evaluation scenarios.
Stateful operations allow you to maintain and update information across elements within a window or even globally. This is crucial for scenarios where you need to track cumulative metrics, detect patterns over time, or trigger actions based on temporal conditions. States enable you to store arbitrary data associated with a key, allowing you to build aggregations, counts, or even complex data structures. Timers, on the other hand, provide a mechanism to schedule future computations, which is invaluable for scenarios like handling late-arriving data, processing event time windows, or triggering periodic reports. Together, states and timers unlock a powerful dimension of data processing, enabling sophisticated real-time analytics and intelligent automation within your pipelines.
At their core, Beam’s states provide a mechanism for DoFn instances to persistently store and retrieve data between processing elements that share the same key. This allows you to build sophisticated processing logic that depends on historical information or aggregates data over time. Think of it as a small, keyed database accessible directly within your DoFn.
There are different types of state, each suited for different use cases, in this example you will use BagState and ValueState.
- BagState: BagState allows you to append multiple values for a given key. It’s like a list where you can keep adding items, and you can read all the items currently in the bag. This is useful when you need to collect a set of related data points for a key over a period, such as all the problematic interactions associated with a specific user ID within a window, or all the events that contribute to a cumulative metric.
- ValueState: ValueState stores a single, mutable value for a given key. You can read the current value, update it, or clear it. It’s ideal for maintaining a running total, a configuration setting, or the latest observed value for a key. For example, you might use ValueState to keep a count of successful inferences for a particular model, or to track the last timestamp an event was processed for a specific session.
class StatefulCountFn(beam.DoFn):
"""A stateful DoFn to count elements per key within a window."""
# Define a state specification for a combining value.
# This will store the running sum for each key.
# The coder is specified for efficiency.
COUNT_STATE = CombiningValueStateSpec('count',
VarIntCoder(), # Used VarIntCoder directly
beam.transforms.combiners.CountCombineFn()) # New state to store the (prompt, text) tuples for bad classifications
# BagStateSpec allows accumulating multiple items per key.
BAD_PROMPTS_STATE = beam.transforms.userstate.BagStateSpec(
'bad_prompts', coder=beam.coders.TupleCoder([beam.coders.StrUtf8Coder(), beam.coders.StrUtf8Coder()])
)
# Define a timer to fire at the end of the window, using WATERMARK as per blog example.
WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)
def process(
self,
element: Tuple[str, Tuple[int, Tuple[str, str]]], # (key, (count_val, (prompt, text)))
key=beam.DoFn.KeyParam,
count_state=beam.DoFn.StateParam(COUNT_STATE),
bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE), # New state param
window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
window=beam.DoFn.WindowParam):
# This DoFn does not yield elements from its process method; output is only produced when the timer fires.
if key == 'bad': # Only count 'bad' elements
count_state.add(element[1][0]) # Add the count (which is 1)
bad_prompts_state.add(element[1][1]) # Add the (prompt, text) tuple
window_timer.set(window.end) # Set timer to fire at window end
@on_timer(WINDOW_TIMER)
def on_window_timer(self, key=beam.DoFn.KeyParam, count_state=beam.DoFn.StateParam(COUNT_STATE), bad_prompts_state=beam.DoFn.StateParam(BAD_PROMPTS_STATE)):
final_count = count_state.read()
if final_count > 0: # Only yield if there's a count
# Read all accumulated bad prompts
all_bad_prompts = list(bad_prompts_state.read())
# Clear the state for the next window to avoid carrying over data.
count_state.clear()
bad_prompts_state.clear()
yield (key, final_count, all_bad_prompts) # Yield count and list of prompts
You can test out this pattern yourself.
git clone https://github.com/GoogleCloudPlatform/devrel-demos
cd devrel-demos/data-analytics/beam_as_eval
This repository contains all the necessary code, including the Beam pipeline definition, custom ModelHandler for Gemini, and the stateful DoFn for aggregating metrics.
Once you have cloned the repository, you can run the pipeline. The provided main.py script includes command-line arguments to switch between test_mode (using in-memory sample data) and reading from a Pub/Sub topic. This allows for flexible development and deployment. Further instructions on setting up your environment and running the pipeline will be provided in the repository’s README.
# **1. Install Dependencies:**
pip install "apache-beam[gcp]" google-cloud-aiplatform# **2. Authenticate with Google Cloud:**
gcloud auth application-default login
gcloud config set project YOUR_GCP_PROJECT_ID
# **3. Run in Test Mode (Local):**
# This mode uses in-memory data and is useful for quick local testing without Pub/Sub or Dataflow.
python Gemini_Beam_Eval.py \
- project YOUR_GCP_PROJECT_ID \
- location us-central1 \
- model_name gemini-2.5-flash \
- test_mode \
- runner DirectRunner
Alternatively you can also use the codelab to learn more. Now you can evaluate your AI and ML easily with customized pipelines!
Source Credit: https://medium.com/google-cloud/ai-evaluation-with-apache-beam-and-dataflow-323e06c34717?source=rss—-e52cf94d98af—4