Streaming Chat Service

chevron-rightRelevant source fileshashtag

Purpose and Scope

The Streaming Chat Service provides real-time, token-by-token chat responses using Server-Sent Events (SSE) in the AI SDK Data Stream format. It orchestrates the flow between user input, LLM generation, tool execution, and memory management to deliver intelligent, context-aware responses with function calling capabilities.

For information about the chatbot frontend components, see Chat Components. For details on tool execution mechanics, see Tool Router & Execution. For memory retrieval logic, see Memory Integration.

Sources: orchestrator/consumers/chatbot/service.py:1-14


Architecture Overview

The streaming chat service orchestrates real-time responses by activating specialized agents, integrating memory context, and executing tools through a composable pipeline. The service delegates LLM configuration and tool execution to agent-specific runtimes, enabling per-agent model selection and credential management.

spinner

Key Components:

Component
Purpose
Location

StreamingChatService

Main orchestrator for streaming responses

stream_response_with_agent()

Core streaming method with agent activation

ChatService

Database operations for chats/messages

ToolExecutionTracker

Prevents infinite tool loops via deduplication

SmartChatIntegration

Orchestrates memory, persona, and tool filtering

AgentFactory

Creates agent runtimes with LLM configuration

ContextGuard

Auto-compacts messages approaching context limit

Sources: orchestrator/consumers/chatbot/service.py:1-40, orchestrator/consumers/chatbot/service.py:456-896, orchestrator/modules/agents/factory/agent_factory.py:503-675


AI SDK Data Stream Format

The service outputs responses in Vercel's AI SDK Data Stream format, which uses newline-delimited structured messages. This format enables streaming text content, structured data events, and error handling in a unified protocol.

Format Specification

Prefix
Type
Example
Purpose

0:

Text chunk

0:"Hello"\n

Incremental text streaming

d:

Data event

d:{"type":"tool_call"}\n

Structured events (tool calls, metadata)

e:

Error

e:{"message":"Failed"}\n

Error propagation to client

8:

Chat ID

8:"chat-uuid"\n

Session identifier

Example Stream Sequence

Implementation: The streaming_handler.format_aisdk_* methods generate these prefixed lines. The service calls yield to send each chunk to the client over SSE.

Sources: orchestrator/consumers/chatbot/service.py:500-503, orchestrator/api/workflows.py:36-136


StreamingChatService Class

The StreamingChatService class is the main entry point for streaming chat operations. It initializes dependencies and provides the stream_response_with_agent() method for generating streamed responses with agent-specific configuration.

Initialization

Lazy Module Loading: Dependencies are retrieved via factory functions (get_*()) to avoid circular imports and enable clean module boundaries.

Widget Mode: When widget_mode=True, the service skips workspace-scoped memory to ensure embedded widgets don't leak context across users.

Sources: orchestrator/consumers/chatbot/service.py:456-475

Main Streaming Method

The stream_response_with_agent() method is an async generator that yields AI SDK formatted chunks:

Parameters:

Parameter
Type
Purpose

chat_id

str

UUID of the chat session

messages

List[Dict]

Conversation history in OpenAI format

agent_id

int

Agent ID to activate (determines model, tools, persona)

user_id

int

User ID for memory scoping

use_system_llm

bool

Use orchestrator LLM settings instead of agent's model

skip_composio

bool

Disable Composio tool injection (for testing)

complexity_assessment

Any

PRD-68: AutoBrain complexity assessment with tool hints

Unified Agent-Chat System: The method activates the agent via AgentFactory, which loads the agent's LLM configuration, skills, and tool permissions. This enables per-agent model selection (e.g., Agent A uses GPT-4, Agent B uses Claude 3.5).

Sources: orchestrator/consumers/chatbot/service.py:493-522


Tool Loop Prevention

The ToolExecutionTracker class prevents infinite loops where the LLM repeatedly calls the same tool with identical or similar parameters. This is critical for preventing runaway token costs and ensuring stable execution.

ToolExecutionTracker Architecture

spinner

Search Tools Semantic Deduplication

For search-related tools, the tracker performs fuzzy matching on query parameters to detect semantically similar requests:

Search Tools Set:

Similarity Algorithm:

  1. Normalize queries (lowercase, remove punctuation)

  2. Use SequenceMatcher.ratio() for fuzzy matching

  3. Threshold: 75% similarity triggers skip

Example: "search for API docs" and "search api documentation" are detected as duplicates.

Sources: orchestrator/consumers/chatbot/service.py:88-186

Retry Limits

Different tools have different retry limits based on their characteristics:

Tool Type
Limit
Rationale

composio_execute

2

External APIs may be flaky, but repeated calls are expensive

search_knowledge

2

First attempt + one refinement usually sufficient

read_file

3

File reads are cheap, may need multiple attempts

write_file

2

Writing twice to same file likely indicates loop

Default

3

Conservative fallback for unknown tools

Sources: orchestrator/consumers/chatbot/service.py:104-116


Request Flow

The streaming response follows a multi-stage pipeline with agent activation, memory orchestration, and tool loop prevention:

spinner

Key Decision Points:

  1. Agent Activation: AgentFactory.activate_agent() loads agent-specific model, credentials, and skills

  2. PRD-68 ATOM Path: Simple queries skip tools/memory/orchestration for fastest response (~200ms saved)

  3. Fresh Start Detection: Keywords like "start over", "forget", "new" trigger context reset

  4. SmartChatIntegration: Orchestrates memory retrieval, persona injection, and tool filtering based on intent

  5. Composio Tool Resolution: Per-action tools (primary) or hint-based mega-tool (fallback)

  6. Context Guard: Auto-compacts messages if approaching model's context window limit

  7. Tool Loop: Max 10 iterations prevents runaway execution

  8. Image Upload: Base64 inline images replaced with S3 URLs before DB storage

Sources: orchestrator/consumers/chatbot/service.py:493-896, orchestrator/modules/agents/factory/agent_factory.py:676-820


Message Format and Conversion

The service converts between multiple message formats to integrate with various LLM providers and the frontend chat UI.

OpenAI Format (Internal)

Chat UI Format (Database)

Messages stored in the Message table use a parts-based format:

Conversion Logic

The prompt_analyzer.convert_to_llm_messages() method handles conversion:

  1. User messages: Concatenate parts[].text, preserve attachments as context

  2. Assistant messages: Extract text content, serialize tool calls

  3. Tool messages: Format tool results for LLM context

Sources: orchestrator/consumers/chatbot/service.py:590-594


Memory Integration

The service integrates with the Mem0 memory system via SmartChatIntegration, which orchestrates memory retrieval, persona injection, and tool filtering in a single preparation step. This replaces the legacy MemoryInjector with a more sophisticated orchestration layer.

SmartChatIntegration Architecture

spinner

3-Tier Memory Retrieval:

Tier
Scope
Query Strategy
Purpose

Global

Workspace-wide

Semantic search across all conversations

Long-term facts, preferences, patterns

Session

Current chat_id

Semantic search within chat

Chat-specific context, multi-turn reasoning

Immediate

Last 5 messages

Exact retrieval

Recent context for continuity

Memory Injection Format:

Memories are injected into the system prompt, not as separate messages:

This ensures the LLM sees all context as part of its core instructions rather than fragmented across multiple system messages.

Skip Conditions:

  • Query is too short (< 10 characters)

  • Query is a greeting ("hello", "hi")

  • Query is a simple command ("clear", "reset")

  • Widget mode is enabled (prevents cross-user context leakage)

Memory Injection Event (US-015)

When memories are retrieved, the service emits a structured SSE event to the frontend:

This enables the frontend to display which memories were used to generate the response, improving transparency and debuggability.

Sources: orchestrator/consumers/chatbot/service.py:583-709, orchestrator/consumers/chatbot/integration.py


Tool Execution Flow

Tool execution follows a three-tier resolution strategy: per-action Composio tools (primary), hint-based Composio mega-tool (fallback), and built-in platform/workspace tools (always available).

Tool Resolution Strategy

spinner

Composio Tool Modes

The service uses ComposioToolService.get_tools_for_step() with a three-tier resolution strategy:

1. Per-Action Tools (Primary - PRD-64):

Resolution Strategies (in order):

Strategy
Condition
Example
Benefit

Exact action name

tool_hints contains exact action

["GITHUB_GET_ISSUE"]

0ms overhead, 100% accuracy

SDK semantic search

Composio SDK available

Search for "github issues"

~50ms, 95% accuracy

Cache fallback

SDK unavailable

Query ComposioActionCache table

~5ms, 90% accuracy

2. Hint-Based Fallback:

Provides the composio_execute mega-tool + LLM hints for action selection. Used when SDK search returns empty results.

Sources: orchestrator/consumers/chatbot/service.py:743-800, orchestrator/modules/tools/services/composio_tool_service.py:56-360

Tool Result Formatting

Tool results are routed through ToolRouter.execute_tool():

The router delegates to the appropriate executor:

Tool Pattern
Executor
Example Tools

APP_ACTION_*

ComposioToolService

GITHUB_GET_ISSUE, SLACK_SEND_MESSAGE

platform_*

PlatformActionExecutor

platform_list_agents, platform_get_workspace_info

workspace_*

WorkspaceClient

workspace_read_file, workspace_execute_command

search_*

RAGService

search_knowledge, semantic_search

Sources: orchestrator/modules/tools/tool_router.py:1-575, orchestrator/api/recipe_executor.py:314-332


Frontend Integration

The frontend consumes the AI SDK Data Stream using the useChat hook from ai/react, which automatically handles streaming text and structured data events.

React Hook Usage

Event Handling

The ai library automatically parses the stream format:

Stream Event
Hook Behavior

0:"text"

Appends to messages[].content incrementally

8:"chat-id"

Sets chat session ID

d:{...}

Fires onToolCall callback with structured data

e:{...}

Fires onError callback

Recipe Execution Streaming

For recipe executions, the frontend uses a similar pattern with stage tracking:

Sources: frontend/components/workflows/execution-kitchen.tsx:47-56, frontend/components/workflows/execution-kitchen.tsx:420-510


Image Upload Handling

The service automatically detects base64-encoded inline images in markdown format and replaces them with S3 URLs before storing messages in the database.

Detection and Upload Flow

spinner

Implementation

Storage Path: workspaces/{workspace_id}/generated-images/{image_id}.{ext}

Access Pattern: Images are served via /api/generated-images/{image_id} endpoint with workspace validation.

Sources: orchestrator/consumers/chatbot/service.py:424-449, orchestrator/consumers/chatbot/service.py:831-835


Error Handling and Recovery

The service implements defensive error handling to ensure partial failures don't break the streaming flow:

Error Categories

Error Type
Handling Strategy
User Impact

LLM API failure

Log warning, return cached response if available

Graceful degradation

Tool execution error

Return error message to LLM, allow retry

LLM can adjust approach

Memory retrieval failure

Log warning, continue without memory

Slight context loss

Image upload failure

Log warning, preserve base64 in message

Image not optimized

Database save failure

Log error, continue streaming

Response visible but not persisted

Tool Error Format

When a tool fails, the error is returned to the LLM as a tool message:

This allows the LLM to:

  1. Understand what went wrong

  2. Reformulate the request with corrected parameters

  3. Try an alternative approach

  4. Inform the user about the limitation

Sources: orchestrator/consumers/chatbot/service.py:790-855


Performance Optimizations

PRD-68 ATOM Path

For simple queries (greetings, basic questions), the service bypasses tools, memory, and orchestration:

Performance Impact:

  • Skip SmartChatIntegration: Saves ~200ms (memory retrieval + tool filtering)

  • Skip Composio resolution: Saves ~50ms (SDK search)

  • Minimal system prompt: Reduces input tokens by ~80%

Triggers: AutoBrain complexity assessment detects ATOM-level queries (greetings, acknowledgments, simple questions).

Sources: orchestrator/consumers/chatbot/service.py:604-617

Context Window Guard

The ContextGuard auto-compacts messages if they approach the model's context window limit:

Strategy: If messages exceed 85% of context window, the guard uses an LLM to summarize older messages, preserving recent context and system prompts intact.

Average savings: Prevents context overflow errors, enables longer conversations without manual intervention.

Sources: orchestrator/consumers/chatbot/service.py:821-835

Tool Execution Deduplication

The ToolExecutionTracker maintains per-turn caches to prevent redundant tool calls:

Strategies:

Strategy
Mechanism
Savings

Exact dedup

Hash of (tool_name, args)

~500ms + API cost

Semantic dedup

Query similarity >= 75% for search tools

~500ms + API cost

Retry limits

Max 2-3 executions per tool type

Prevents infinite loops

Sources: orchestrator/consumers/chatbot/service.py:88-186

Composio Action Caching

The service maintains a per-execution cache for Composio action results:

When the LLM calls the same Composio action twice (e.g., retrying after error), the cached result is returned instead of making another API call.

Average savings: 500ms + Composio API cost per duplicate call.

Sources: orchestrator/api/recipe_executor.py:209, orchestrator/api/recipe_executor.py:269-273


Usage Tracking

All LLM calls made by the streaming service are automatically tracked via the LLMManager's usage tracking context:

This populates the LLMUsage table with:

  • Workspace ID

  • Agent ID

  • Input/output token counts

  • Model provider and name

  • Cost calculation (based on LLMModel.pricing)

  • Latency

  • BYOK flag (user-provided vs platform API key)

For detailed usage tracking mechanics, see LLM Usage Tracking.

Sources: orchestrator/api/recipe_executor.py:202-205, orchestrator/core/llm/manager.py:1-68


Last updated