Real-Time Updates

chevron-rightRelevant source fileshashtag

This document describes the real-time update infrastructure in Automatos AI, including Redis Pub/Sub messaging, SSE streaming to the frontend, and live progress tracking for workflow and recipe executions.

Scope: This page covers the event publishing, message routing, and frontend consumption of execution updates. For workflow execution logic, see Execution Layer. For recipe execution details, see Recipe Execution. For frontend state management, see State Management.


Architecture Overview

Automatos AI uses a dual-path real-time update system: Redis Pub/Sub for backend event logging and monitoring, and SSE (Server-Sent Events) for direct streaming to the frontend UI.

spinner

Sources: orchestrator/core/redis/client.py, orchestrator/api/workflows.py:37-136, frontend/components/workflows/execution-kitchen.tsx:1-50


Redis Pub/Sub System

The RedisClient class provides a connection pool-based pub/sub infrastructure for workflow execution events.

RedisClient Implementation

Core Methods:

  • publish(channel, message) - Publish JSON message to Redis channel

  • publish_workflow_event(workflow_id, execution_id, event_type, data) - Typed workflow event publisher

  • get_async_pubsub(channel) - Get async Redis pubsub for non-blocking streaming

  • get_redis() - Get connection from pool

Configuration:

Lazy Initialization Pattern: The global get_redis_client() function supports both REDIS_URL (Railway/Heroku) and individual environment variables, returning None if Redis is unconfigured (optional service).

Sources: orchestrator/core/redis/client.py:14-198

Channel Naming Convention

Events are published to channels following the pattern:

Example: workflow:42:execution:1337

This allows frontend clients to subscribe to specific execution streams without receiving unrelated events.

Sources: orchestrator/core/redis/client.py:110-119

Message Structure

All Redis messages follow a consistent JSON structure:

Sources: orchestrator/core/redis/client.py:91-119


SSE Streaming to Frontend

The backend has transitioned from WebSockets to SSE (Server-Sent Events) for real-time updates, using a stream_manager for broadcast operations.

WorkflowStageTracker SSE Integration

The WorkflowStageTracker class emits events during 9-stage workflow execution:

spinner

Key Implementation Details:

  • start_stage() and complete_stage() both publish to SSE and Redis

  • SSE provides instant UI updates via stream_manager.broadcast_event()

  • Redis provides logging/monitoring via redis.publish_workflow_event()

  • Stage duration calculated via stage_start_times dict

  • Non-blocking error handling with try/except wrappers

Sources: orchestrator/api/workflows.py:37-136

9-Stage Workflow Events

The STAGES dict defines the workflow pipeline stages:

Stage
Name
Description

1

Task Decomposition

Break down complex task into subtasks

2

Agent Selection

Choose agents for each subtask

3

Context Engineering

Build optimized context for agents

4

Agent Execution

Execute agents with tools

5

Result Aggregation

Combine outputs from agents

6

Learning Update

Update learning models

7

Quality Assessment

Score execution quality

8

Memory Storage

Persist learnings

9

Response Generation

Format final response

Sources: orchestrator/api/workflows.py:38-49, frontend/components/workflows/execution-kitchen.tsx:74-84

Legacy WebSocket Removal

Comments throughout the codebase indicate WebSocket support was removed in favor of SSE:

This appears at workflow update endpoints, delete endpoints, and creation endpoints where WebSocket broadcasts were previously emitted.

Sources: orchestrator/api/workflows.py:394-396, orchestrator/api/workflows.py:442-443, orchestrator/api/workflows.py:598-599


Recipe Step Progress Updates

Recipe executions use a different update pattern optimized for sequential step-by-step workflows.

Direct Execution with Persistence

The execute_recipe_direct() function executes recipes step-by-step and persists progress in the database:

spinner

Key Data Structures:

  1. step_results - Flat JSONB array persisted to database:

  1. step_outputs - In-memory keyed dict for inter-step data passing:

Sources: orchestrator/api/recipe_executor.py:313-555

Step Result Persistence

The _persist_step_results() helper updates the execution record after each step:

This ensures that partial progress is saved even if the recipe fails mid-execution, enabling resume/retry logic and frontend progress display.

Sources: orchestrator/api/recipe_executor.py:578-583


Event Types and Payloads

The system emits various event types throughout execution:

Workflow Stage Events

stage_start:

stage_complete:

Sources: orchestrator/api/workflows.py:58-93, orchestrator/api/workflows.py:95-135

Agent Execution Events

Events generated during agent task execution (typically in Stage 4):

agent_spawn:

task_progress:

task_complete:

task_error:

Sources: frontend/components/workflows/execution-kitchen.tsx:61-71


Frontend Integration

Frontend components consume real-time updates via SSE streaming and HTTP polling.

ExecutionKitchen Component

The ExecutionKitchen component provides a theater-style live execution viewer:

Component Structure:

Key Features:

  • StreamingLog - Real-time log display with filtering and auto-scroll

  • TheaterStageProgress - Visual 9-stage progress indicator

  • TheaterStepExecution - Recipe step-by-step visualization

  • RecipeStepProgress - Step completion tracker

State Management:

Sources: frontend/components/workflows/execution-kitchen.tsx:47-85

SSE Event Processing

Frontend components use the EventSource API for SSE streaming:

Sources: frontend/components/workflows/execution-kitchen.tsx:98-257

Streaming Log Component

The StreamingLog displays events with expandable details:

Features:

  • Auto-scroll toggle with manual override

  • Stage filtering (click stage badge to filter)

  • Expandable log entries for full response text

  • Color-coded event types

  • Timestamp display with millisecond precision

Log Entry Structure:

Sources: frontend/components/workflows/execution-kitchen.tsx:61-71, frontend/components/workflows/execution-kitchen.tsx:99-258

HTTP Polling Fallback

For recipe executions, the frontend polls the REST API when SSE is unavailable:

Response includes:

  • status: "pending" | "running" | "completed" | "failed"

  • current_step: Current step index (0-based)

  • step_results: Array of completed step results

  • output_data: Final execution output and metadata

Sources: frontend/components/workflows/view-recipe-modal.tsx:251-329


Performance Considerations

Redis Connection Pooling

The RedisClient uses a connection pool to avoid connection overhead:

This allows up to 50 concurrent Redis connections, sufficient for multi-execution scenarios.

Sources: orchestrator/core/redis/client.py:22-31

Non-Blocking Event Publishing

Both SSE and Redis publishing use non-blocking error handling to prevent execution failures from broken connections:

This ensures execution continues even if the real-time update infrastructure is unavailable.

Sources: orchestrator/api/workflows.py:66-93, orchestrator/api/workflows.py:104-135

JSONB Step Results Efficiency

Recipe step results are stored as JSONB in PostgreSQL, allowing:

  • Atomic updates: Single db.commit() per step

  • Partial reads: Frontend can query specific execution without full table scan

  • Schema flexibility: Step result structure can evolve without migrations

  • Index support: PostgreSQL can index into JSONB for filtering

Example query with JSONB filtering:

Sources: orchestrator/api/recipe_executor.py:413-528

Frontend Auto-Scroll Optimization

The StreamingLog component uses a ref-based scroll mechanism with manual override:

This prevents excessive re-renders while maintaining smooth scroll behavior during high-frequency log updates.

Sources: frontend/components/workflows/execution-kitchen.tsx:100-107


Error Handling and Resilience

Optional Redis Service

The system gracefully degrades when Redis is unavailable:

When get_redis_client() returns None, workflow execution continues but real-time updates are logged to console instead of Redis.

Sources: orchestrator/core/redis/client.py:149-197

Step Error Handling

Recipe execution respects per-step error_handling configuration:

Mode
Behavior

stop

Halt execution immediately, mark as failed

skip

Log error, continue to next step

retry

Retry up to max_retries times with exponential backoff

Sources: orchestrator/api/recipe_executor.py:497-525

Connection Test Utility

The RedisClient provides a test_connection() method for health checks:

This is called during initialization to verify Redis availability.

Sources: orchestrator/core/redis/client.py:121-134


Last updated