Real-Time Updates
This document describes the real-time update infrastructure in Automatos AI, including Redis Pub/Sub messaging, SSE streaming to the frontend, and live progress tracking for workflow and recipe executions.
Scope: This page covers the event publishing, message routing, and frontend consumption of execution updates. For workflow execution logic, see Execution Layer. For recipe execution details, see Recipe Execution. For frontend state management, see State Management.
Architecture Overview
Automatos AI uses a dual-path real-time update system: Redis Pub/Sub for backend event logging and monitoring, and SSE (Server-Sent Events) for direct streaming to the frontend UI.
Sources: orchestrator/core/redis/client.py, orchestrator/api/workflows.py:37-136, frontend/components/workflows/execution-kitchen.tsx:1-50
Redis Pub/Sub System
The RedisClient class provides a connection pool-based pub/sub infrastructure for workflow execution events.
RedisClient Implementation
Core Methods:
publish(channel, message)- Publish JSON message to Redis channelpublish_workflow_event(workflow_id, execution_id, event_type, data)- Typed workflow event publisherget_async_pubsub(channel)- Get async Redis pubsub for non-blocking streamingget_redis()- Get connection from pool
Configuration:
Lazy Initialization Pattern: The global get_redis_client() function supports both REDIS_URL (Railway/Heroku) and individual environment variables, returning None if Redis is unconfigured (optional service).
Sources: orchestrator/core/redis/client.py:14-198
Channel Naming Convention
Events are published to channels following the pattern:
Example: workflow:42:execution:1337
This allows frontend clients to subscribe to specific execution streams without receiving unrelated events.
Sources: orchestrator/core/redis/client.py:110-119
Message Structure
All Redis messages follow a consistent JSON structure:
Sources: orchestrator/core/redis/client.py:91-119
SSE Streaming to Frontend
The backend has transitioned from WebSockets to SSE (Server-Sent Events) for real-time updates, using a stream_manager for broadcast operations.
WorkflowStageTracker SSE Integration
The WorkflowStageTracker class emits events during 9-stage workflow execution:
Key Implementation Details:
start_stage()andcomplete_stage()both publish to SSE and RedisSSE provides instant UI updates via
stream_manager.broadcast_event()Redis provides logging/monitoring via
redis.publish_workflow_event()Stage duration calculated via
stage_start_timesdictNon-blocking error handling with
try/exceptwrappers
Sources: orchestrator/api/workflows.py:37-136
9-Stage Workflow Events
The STAGES dict defines the workflow pipeline stages:
1
Task Decomposition
Break down complex task into subtasks
2
Agent Selection
Choose agents for each subtask
3
Context Engineering
Build optimized context for agents
4
Agent Execution
Execute agents with tools
5
Result Aggregation
Combine outputs from agents
6
Learning Update
Update learning models
7
Quality Assessment
Score execution quality
8
Memory Storage
Persist learnings
9
Response Generation
Format final response
Sources: orchestrator/api/workflows.py:38-49, frontend/components/workflows/execution-kitchen.tsx:74-84
Legacy WebSocket Removal
Comments throughout the codebase indicate WebSocket support was removed in favor of SSE:
This appears at workflow update endpoints, delete endpoints, and creation endpoints where WebSocket broadcasts were previously emitted.
Sources: orchestrator/api/workflows.py:394-396, orchestrator/api/workflows.py:442-443, orchestrator/api/workflows.py:598-599
Recipe Step Progress Updates
Recipe executions use a different update pattern optimized for sequential step-by-step workflows.
Direct Execution with Persistence
The execute_recipe_direct() function executes recipes step-by-step and persists progress in the database:
Key Data Structures:
step_results - Flat JSONB array persisted to database:
step_outputs - In-memory keyed dict for inter-step data passing:
Sources: orchestrator/api/recipe_executor.py:313-555
Step Result Persistence
The _persist_step_results() helper updates the execution record after each step:
This ensures that partial progress is saved even if the recipe fails mid-execution, enabling resume/retry logic and frontend progress display.
Sources: orchestrator/api/recipe_executor.py:578-583
Event Types and Payloads
The system emits various event types throughout execution:
Workflow Stage Events
stage_start:
stage_complete:
Sources: orchestrator/api/workflows.py:58-93, orchestrator/api/workflows.py:95-135
Agent Execution Events
Events generated during agent task execution (typically in Stage 4):
agent_spawn:
task_progress:
task_complete:
task_error:
Sources: frontend/components/workflows/execution-kitchen.tsx:61-71
Frontend Integration
Frontend components consume real-time updates via SSE streaming and HTTP polling.
ExecutionKitchen Component
The ExecutionKitchen component provides a theater-style live execution viewer:
Component Structure:
Key Features:
StreamingLog- Real-time log display with filtering and auto-scrollTheaterStageProgress- Visual 9-stage progress indicatorTheaterStepExecution- Recipe step-by-step visualizationRecipeStepProgress- Step completion tracker
State Management:
Sources: frontend/components/workflows/execution-kitchen.tsx:47-85
SSE Event Processing
Frontend components use the EventSource API for SSE streaming:
Sources: frontend/components/workflows/execution-kitchen.tsx:98-257
Streaming Log Component
The StreamingLog displays events with expandable details:
Features:
Auto-scroll toggle with manual override
Stage filtering (click stage badge to filter)
Expandable log entries for full response text
Color-coded event types
Timestamp display with millisecond precision
Log Entry Structure:
Sources: frontend/components/workflows/execution-kitchen.tsx:61-71, frontend/components/workflows/execution-kitchen.tsx:99-258
HTTP Polling Fallback
For recipe executions, the frontend polls the REST API when SSE is unavailable:
Response includes:
status: "pending" | "running" | "completed" | "failed"current_step: Current step index (0-based)step_results: Array of completed step resultsoutput_data: Final execution output and metadata
Sources: frontend/components/workflows/view-recipe-modal.tsx:251-329
Performance Considerations
Redis Connection Pooling
The RedisClient uses a connection pool to avoid connection overhead:
This allows up to 50 concurrent Redis connections, sufficient for multi-execution scenarios.
Sources: orchestrator/core/redis/client.py:22-31
Non-Blocking Event Publishing
Both SSE and Redis publishing use non-blocking error handling to prevent execution failures from broken connections:
This ensures execution continues even if the real-time update infrastructure is unavailable.
Sources: orchestrator/api/workflows.py:66-93, orchestrator/api/workflows.py:104-135
JSONB Step Results Efficiency
Recipe step results are stored as JSONB in PostgreSQL, allowing:
Atomic updates: Single
db.commit()per stepPartial reads: Frontend can query specific execution without full table scan
Schema flexibility: Step result structure can evolve without migrations
Index support: PostgreSQL can index into JSONB for filtering
Example query with JSONB filtering:
Sources: orchestrator/api/recipe_executor.py:413-528
Frontend Auto-Scroll Optimization
The StreamingLog component uses a ref-based scroll mechanism with manual override:
This prevents excessive re-renders while maintaining smooth scroll behavior during high-frequency log updates.
Sources: frontend/components/workflows/execution-kitchen.tsx:100-107
Error Handling and Resilience
Optional Redis Service
The system gracefully degrades when Redis is unavailable:
When get_redis_client() returns None, workflow execution continues but real-time updates are logged to console instead of Redis.
Sources: orchestrator/core/redis/client.py:149-197
Step Error Handling
Recipe execution respects per-step error_handling configuration:
stop
Halt execution immediately, mark as failed
skip
Log error, continue to next step
retry
Retry up to max_retries times with exponential backoff
Sources: orchestrator/api/recipe_executor.py:497-525
Connection Test Utility
The RedisClient provides a test_connection() method for health checks:
This is called during initialization to verify Redis availability.
Sources: orchestrator/core/redis/client.py:121-134
Last updated

