
The Temporal workflow orchestrates a streamlined 4-step process with human approval gates:
# Pattern: HITL Multi-Agent Workflow Orchestration
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class HITLProcessingWorkflow:
"""
Orchestrates a 4-step processing pipeline with human approval gates.
Each step includes agent execution followed by human review.
"""@workflow.run
async def run(self, input_data: dict) -> dict:
"""Execute the HITL workflow with durable state and approval gates"""
task_id = input_data["task_id"]
input_sources = input_data["input_sources"]
# Initialize workflow context
workflow_context = {
"task_id": task_id,
"workflow_id": workflow.info().workflow_id,
"start_time": workflow.now(),
"results": {},
"approvals": {}
}
# Step 1: Analysis Agent + Human Review
analysis_result = await workflow.execute_activity(
call_analysis_agent_activity,
args=[input_sources, task_id, workflow_context],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(minutes=1),
maximum_attempts=3
)
)
# Human approval gate for analysis
analysis_approval = await workflow.execute_activity(
request_human_approval_activity,
args=["analysis", analysis_result, workflow_context],
start_to_close_timeout=timedelta(hours=24) # Allow 24h for human review
)
if not analysis_approval["approved"]:
return {"status": "rejected_at_analysis", "reason": analysis_approval["reason"]}
workflow_context["results"]["analysis"] = analysis_result
workflow_context["approvals"]["analysis"] = analysis_approval
# Step 2: Strategy Agent + Human Review
strategy_result = await workflow.execute_activity(
call_strategy_agent_activity,
args=[analysis_result, task_id, workflow_context],
start_to_close_timeout=timedelta(minutes=15)
)
strategy_approval = await workflow.execute_activity(
request_human_approval_activity,
args=["strategy", strategy_result, workflow_context],
start_to_close_timeout=timedelta(hours=24)
)
if not strategy_approval["approved"]:
return {"status": "rejected_at_strategy", "reason": strategy_approval["reason"]}
# Steps 3-4 follow the same pattern: Agent execution + Human approval
return {
"status": "completed",
"task_id": task_id,
"workflow_id": workflow.info().workflow_id,
"duration": (workflow.now() - workflow_context["start_time"]).total_seconds(),
"results": workflow_context["results"],
"approvals": workflow_context["approvals"]
}
Activities encapsulate agent calls and human approval processes, providing error handling and data transformation:
@activity.defn
async def call_analysis_agent_activity(
input_sources: str,
task_id: str,
workflow_context: dict
) -> dict:
"""Activity that calls the Analysis Agent via A2A"""try:
# Prepare input for the agent
input_data = {
"prompt": f"Analyze data sources: {input_sources}",
"task_id": task_id,
"workflow_id": workflow_context["workflow_id"],
"analysis_type": "comprehensive",
"include_metrics": True
}
# Call agent via A2A protocol
client = A2AAgentClient()
result = await client.call_agent("analysis_agent", input_data)
# Store large results externally to avoid workflow size limits
if len(json.dumps(result)) > 100_000: # 100KB threshold
storage = PayloadStorage()
payload_id = await storage.store_payload(result)
return {"payload_ref": payload_id, "status": "success"}
return result
except Exception as e:
# Activity failures trigger Temporal's retry mechanism
raise ActivityError(f"Analysis failed: {str(e)}")
@activity.defn
async def request_human_approval_activity(
step_name: str,
agent_result: dict,
workflow_context: dict
) -> dict:
"""Activity that requests human approval via web UI"""try:
# Create approval request in database
approval_request = {
"workflow_id": workflow_context["workflow_id"],
"task_id": workflow_context["task_id"],
"step_name": step_name,
"agent_result": agent_result,
"status": "pending_approval",
"created_at": datetime.now().isoformat(),
"timeout_at": (datetime.now() + timedelta(hours=24)).isoformat()
}
# Store approval request for UI to display
approval_storage = ApprovalStorage()
approval_id = await approval_storage.create_approval_request(approval_request)
# Send notification to approvers (email, Slack, etc.)
notification_service = NotificationService()
await notification_service.notify_approvers(
f"Approval needed for {step_name} in workflow {workflow_context['workflow_id']}",
approval_id,
step_name
)
# Wait for human decision (polling or signal-based)
while True:
approval_status = await approval_storage.get_approval_status(approval_id)
if approval_status["status"] == "approved":
return {
"approved": True,
"approval_id": approval_id,
"approver": approval_status["approver"],
"approved_at": approval_status["approved_at"],
"comments": approval_status.get("comments", "")
}
elif approval_status["status"] == "rejected":
return {
"approved": False,
"approval_id": approval_id,
"approver": approval_status["approver"],
"rejected_at": approval_status["rejected_at"],
"reason": approval_status.get("reason", "No reason provided")
}
# Wait 30 seconds before checking again
await asyncio.sleep(30)
except Exception as e:
raise ActivityError(f"Human approval request failed: {str(e)}")
The human approval system integrates with a web-based dashboard that presents agent results for review:
# Pattern: Web-Based Approval Dashboard
class ApprovalUIService:
"""Handles the web UI for human approvals"""async def get_pending_approvals(self, user_id: str) -> List[dict]:
"""Get all pending approval requests for a user"""
approval_storage = ApprovalStorage()
pending_requests = await approval_storage.get_pending_for_user(user_id)
# Enrich with agent result summaries
enriched_requests = []
for request in pending_requests:
summary = await self.generate_result_summary(request["agent_result"])
enriched_requests.append({
**request,
"summary": summary,
"time_remaining": self.calculate_time_remaining(request["timeout_at"])
})
return enriched_requests
async def submit_approval_decision(
self,
approval_id: str,
user_id: str,
decision: str,
comments: str = ""
) -> dict:
"""Submit an approval decision (approve/reject)"""
approval_storage = ApprovalStorage()
decision_data = {
"approval_id": approval_id,
"approver": user_id,
"decision": decision, # "approved" or "rejected"
"comments": comments,
"decided_at": datetime.now().isoformat()
}
# Update approval status
await approval_storage.update_approval_decision(decision_data)
# Log the decision for audit trail
audit_logger.info("approval_decision", extra={
"approval_id": approval_id,
"approver": user_id,
"decision": decision,
"workflow_context": await approval_storage.get_workflow_context(approval_id)
})
return {"status": "success", "decision_recorded": True}
The HITL approach provides several critical advantages for production AI systems:
1. Quality Assurance
Human experts review AI outputs before proceeding to next stages, catching edge cases and errors that automated testing might miss.
2. Risk Mitigation
Prevents cascading failures from early-stage errors and allows for course correction before significant resources are invested.
3. Continuous Learning
Human feedback can be used to improve agent performance over time, creating valuable training data for future model improvements.
4. Stakeholder Confidence
Maintains human oversight for critical business processes and provides transparency into AI decision-making.
Temporal workflows have payload size limits (typically 2MB). When processing large datasets, agents generate extensive reports that can exceed these limits. Enter the payload storage pattern:
# Pattern: External Payload Storage
class PayloadStorage:
"""Manages large data payloads outside of Temporal workflows"""def __init__(self):
self.storage_dir = Path("external_storage/payloads")
self.storage_dir.mkdir(parents=True, exist_ok=True)
async def store_payload(self, data: Any, reference_key: str = None) -> str:
"""Store large payload and return reference ID"""
# Generate unique ID if not provided
if reference_key is None:
reference_key = self._generate_payload_id(data)
file_path = self.storage_dir / f"{reference_key}.json"
# Store with metadata
payload_data = {
"metadata": {
"id": reference_key,
"timestamp": datetime.now().isoformat(),
"size_bytes": len(json.dumps(data).encode()),
"checksum": hashlib.sha256(
json.dumps(data, sort_keys=True).encode()
).hexdigest()
},
"data": data
}
# Async write for performance
async with aiofiles.open(file_path, 'w') as f:
await f.write(json.dumps(payload_data, indent=2, default=str))
return reference_key
async def resolve_payload(self, reference: Any) -> Any:
"""Resolve data from reference or return as-is"""
# If not a reference, return the data directly
if not isinstance(reference, dict) or "payload_ref" not in reference:
return reference
# Load from storage
file_path = self.storage_dir / f"{reference['payload_ref']}.json"
async with aiofiles.open(file_path, 'r') as f:
content = await f.read()
payload_data = json.loads(content)
# Verify integrity
stored_checksum = payload_data["metadata"]["checksum"]
computed_checksum = hashlib.sha256(
json.dumps(payload_data["data"], sort_keys=True).encode()
).hexdigest()
if stored_checksum != computed_checksum:
raise ValueError(f"Payload integrity check failed for {reference['payload_ref']}")
return payload_data["data"]
Every production system needs comprehensive monitoring. Our implementation includes multi-layer health checks:
# Pattern: Comprehensive Health Monitoring
class A2AAgentClient:async def check_all_agents_health(self) -> dict:
"""Comprehensive health check for distributed agent fleet"""
health_status = {
"timestamp": datetime.now().isoformat(),
"agents": {},
"overall_status": "healthy"
}
async with aiohttp.ClientSession() as session:
for agent_name, endpoint in self.agent_registry.items():
try:
# Check basic connectivity
async with session.get(
f"{endpoint}/health",
timeout=aiohttp.ClientTimeout(total=5)
) as response:
if response.status == 200:
# Verify agent card is accessible
async with session.get(
f"{endpoint}/.well-known/agent.json",
timeout=aiohttp.ClientTimeout(total=5)
) as card_response:
if card_response.status == 200:
agent_card = await card_response.json()
health_status["agents"][agent_name] = {
"status": "healthy",
"version": agent_card.get("version", "unknown"),
"capabilities": agent_card.get("capabilities", [])
}
else:
health_status["agents"][agent_name] = {
"status": "degraded",
"reason": "Agent discovery unavailable"
}
else:
health_status["agents"][agent_name] = {
"status": "unhealthy",
"http_status": response.status
}
health_status["overall_status"] = "degraded"
except asyncio.TimeoutError:
health_status["agents"][agent_name] = {
"status": "unhealthy",
"reason": "Connection timeout"
}
health_status["overall_status"] = "degraded"
except Exception as e:
health_status["agents"][agent_name] = {
"status": "unhealthy",
"reason": str(e)
}
health_status["overall_status"] = "degraded"
return health_status
Production systems must handle partial failures gracefully:
# Pattern: Resilient Agent Communication with Fallbacks
@activity.defn
async def call_agent_with_fallback(
agent_name: str,
input_data: dict,
fallback_data: dict = None
) -> dict:
"""Call agent with automatic fallback strategies"""try:
# Attempt real agent call
client = A2AAgentClient()
# Check agent health first
health = await client.check_agent_health(agent_name)
if health["status"] != "healthy":
raise Exception(f"Agent {agent_name} is {health['status']}")
# Call the agent
result = await client.call_agent(agent_name, input_data)
result["execution_mode"] = "live_agent"
return result
except Exception as e:
# Log the failure for monitoring and debugging
logger.error(f"Agent {agent_name} failed: {e}")
# Use fallback data if available
if fallback_data:
return {
"execution_mode": "fallback",
"fallback_reason": str(e),
"data": fallback_data
}
# Generate synthetic response as last resort
return {
"execution_mode": "synthetic",
"status": "completed_with_synthetic_data",
"message": f"Agent {agent_name} unavailable, using synthetic response",
"data": generate_synthetic_response(agent_name, input_data)
}
While this example uses local deployment, production systems require robust security:
# Pattern: Production Security for A2A Communication
class SecureA2AClient:def __init__(self):
self.auth_token = os.environ.get("A2A_AUTH_TOKEN")
self.tls_cert = os.environ.get("A2A_TLS_CERT")
async def call_agent_secure(self, agent_name: str, input_data: dict) -> dict:
"""Secure agent communication with authentication and encryption"""
headers = {
"Authorization": f"Bearer {self.auth_token}",
"X-Request-ID": str(uuid.uuid4()),
"X-Workflow-ID": input_data.get("workflow_id", "unknown")
}
# Use HTTPS with certificate verification
ssl_context = ssl.create_default_context()
if self.tls_cert:
ssl_context.load_cert_chain(self.tls_cert)
connector = aiohttp.TCPConnector(ssl=ssl_context)
async with aiohttp.ClientSession(connector=connector) as session:
# Call agent with secure connection
async with session.post(
f"https://{agent_name}.agents.internal/process",
json=self._encrypt_payload(input_data),
headers=headers
) as response:
encrypted_result = await response.json()
return self._decrypt_payload(encrypted_result)
Before deploying a single agent, we instrumented comprehensive logging and metrics:
# Structured logging for every agent interaction
logger.info("agent_call", extra={
"agent": agent_name,
"workflow_id": workflow_id,
"step_number": step_number,
"input_size": len(json.dumps(input_data)),
"duration_ms": duration,
"status": result.get("status"),
"execution_mode": result.get("execution_mode")
})
In a distributed agent system, assume at least one service will be degraded at any time. Design workflows to continue even when agents fail, marking sections as “completed_with_fallback” rather than failing entirely.
Different agents require different model configurations based on their tasks:
- Data Analysis: Uses fast models (Gemini 2.5 Flash) for speed
- Strategic Planning: Uses advanced models (Gemini 2.5 Pro) for complex reasoning
- Pattern Extraction: Uses low temperature (0.1) for consistency
- Creative Generation: Uses higher temperature (0.7) for variety
Agent responses are often deterministic for the same input. Implement caching at multiple levels:
# Pattern: Multi-level Caching Strategy
context_cache_key = hashlib.sha256(
f"{task_id}:{input_hash}:{analysis_type}".encode()
).hexdigest()
if context_cache_key in cache:
return cache[context_cache_key]
Agents evolve. We version agent definitions, prompts, and even workflow definitions:
# Pattern: Comprehensive Agent Versioning
{
"name": "domain-analysis-agent",
"version": "2.3.1",
"api_version": "1.0",
"model_version": "gemini-2.5-flash-002",
"prompt_version": "2024-11-15",
"capabilities": [...],
"changelog": "Enhanced pattern recognition algorithms"
}
The integration of Google ADK, A2A protocol, and Temporal isn’t just a technical achievement — it’s a paradigm shift in how we approach complex AI systems. By combining:
- ADK’s agent intelligence — Sophisticated AI capabilities with tool integration. I will talk more about the “Moat” agents need in the coming post.
- A2A’s communication standards — Seamless inter-agent collaboration, Security
- Temporal’s durable execution — Enterprise-grade reliability and fault tolerance
We’ve created architectural patterns that can tackle complex, multi-step problems that require coordinated intelligence across specialized domains.
The patterns we’ve explored — from payload storage to health monitoring, from graceful degradation to version management — aren’t domain-specific. They’re blueprints for any complex multi-agent system that needs to operate reliably at scale.
As we move into an era where AI agents become the primary interface for complex system interactions, the foundations we lay today with frameworks like ADK, protocols like A2A, and orchestrators like Temporal will determine whether these systems are fragile experiments or robust production solutions.
The concepts are proven, the patterns are battle-tested, and the future is collaborative. Welcome to the age of orchestrated intelligence. I will publish a saample repository later when I am done scrubbing the business code out 🙂
Source Credit: https://medium.com/google-cloud/orchestrating-multi-agent-systems-a-deep-dive-into-google-adk-a2a-protocol-and-temporal-b13a18638890?source=rss—-e52cf94d98af—4