Task Management
The task management system enables asynchronous execution of workspace operations—shell commands, file operations, git operations—in isolated, sandboxed environments. Tasks are queued in Redis priority queues, consumed by workspace workers, and executed within persistent workspace directories on a mounted volume. This system supports both explicit step-based task submission (via /api/tasks/submit) and higher-level operations like GitHub repository cloning (via /api/workspaces/{workspace_id}/github/clone).
For information about the workspace filesystem architecture and security model, see Security & Sandboxing. For details on workspace file operations and the code viewer, see File Operations.
Task Lifecycle
Tasks progress through five states: queued → running → completed (or failed, cancelled, timed_out). State transitions are tracked in both Redis (for fast worker access) and PostgreSQL (for durable persistence and API queries).
Task Lifecycle Diagram
Sources: orchestrator/api/tasks.py:1-404, services/workspace-worker/main.py:227-358
Task Submission
Direct Task Submission
The /api/tasks/submit endpoint accepts explicit task steps for immediate execution. Unlike workflow execution (which uses LLM agents to generate steps), this endpoint enqueues concrete operations directly to the worker.
Request Model:
steps
List[TaskStep]
Ordered list of actions to execute
priority
str
Queue priority: low, normal, high, critical
timeout_seconds
int
Max execution time (10-3600s)
TaskStep Actions:
execute_command
command, cwd, timeout
Run shell command
git_clone
repo, branch, shallow
Clone repository
git_pull
repo_name, branch
Update cached repo
read_file
path
Read file contents
write_file
path, content
Write or create file
list_directory
path
List directory entries
create_directory
path
Create directory tree
Sources: orchestrator/api/tasks.py:39-173, services/workspace-worker/executor.py:322-360
GitHub Repository Cloning
The /api/workspaces/{workspace_id}/github/clone endpoint provides a specialized task submission flow for cloning repositories. It retrieves the user's GitHub OAuth token via Composio and injects it into the clone URL for authenticated access to private repos.
Sources: orchestrator/api/workspace_github.py:167-293, services/workspace-worker/executor.py:368-419
Worker Architecture
WorkspaceWorker Consumer Loop
The WorkspaceWorker class implements a priority queue consumer that polls Redis queues in order (critical → high → normal → low) and executes tasks concurrently up to a configurable limit (default: 3).
Configuration:
WORKER_CONCURRENCY
3
Max concurrent tasks
REDIS_URL
redis://localhost:6379/0
Redis connection string
WORKSPACE_VOLUME_PATH
//workspaces
Persistent volume mount
WORKER_HEALTH_PORT
8081
Health check HTTP port
WORKER_INTERNAL_TOKEN
(empty)
Auth token for internal API
Sources: services/workspace-worker/main.py:60-180, services/workspace-worker/main.py:44-57
Task Execution Flow
When a worker dequeues a task, it progresses through a multi-stage execution pipeline with quota checks, credential injection, step execution, and cleanup.
Sources: services/workspace-worker/main.py:227-358
Task Status Tracking
Task state is maintained in two storage tiers:
Redis (Fast Access for Workers)
workspace:task:{task_id}:status
Hash
7200s
Current status, worker_id, timestamps
workspace:task:{task_id}:result
String (JSON)
3600s
Full execution result payload
workspace:ws:{workspace_id}:active_tasks
Set
N/A
Active task IDs for workspace
workspace:worker:{worker_id}:heartbeat
String
60s
Worker health timestamp
workspace:worker:{worker_id}:tasks
Set
60s
Tasks assigned to this worker
Sources: services/workspace-worker/main.py:44-57, services/workspace-worker/main.py:363-421
PostgreSQL (Durable Persistence)
The task_executions table stores complete task metadata and is updated in lockstep with Redis by the worker.
Sources: services/workspace-worker/main.py:377-415
Real-Time Event Streaming
The /api/tasks/{task_id}/events endpoint provides Server-Sent Events (SSE) for live task progress. The worker publishes events to Redis pub/sub, which the orchestrator streams to the client.
Event Types
status_changed
status, execution_time_ms
Status transition
progress_update
step, total_steps, description
Before each step
error
error, step
Step failure or exception
Example SSE Stream:
Sources: orchestrator/api/tasks.py:352-403, services/workspace-worker/main.py:422-434
Task Cancellation
Tasks can be cancelled via POST /api/tasks/{task_id}/cancel. The cancellation mechanism varies by task state:
Queued tasks: Marked as cancelled in Redis status key; worker skips execution when dequeued
Running tasks: Worker checks cancellation flag between steps; stops execution gracefully
Completed/failed tasks: Cannot be cancelled (terminal state)
Sources: orchestrator/api/tasks.py:304-346, services/workspace-worker/main.py:236-286
Workspace HTTP API
The workspace worker exposes an HTTP server on port 8081 (configurable via WORKER_HEALTH_PORT) for health checks and direct file/command operations. This API is used by the orchestrator to proxy file browsing and command execution requests to the worker.
Endpoints
GET
/health
Worker health check
None
GET
/workspaces/{ws_id}/files
List directory
Internal token
GET
/workspaces/{ws_id}/files/content
Read file
Internal token
POST
/workspaces/{ws_id}/files/write
Write file
Internal token
POST
/workspaces/{ws_id}/exec
Execute command
Internal token
GET
/workspaces/{ws_id}/files/grep
Search files
Internal token
POST
/workspaces/{ws_id}/git
Git operation
Internal token
Internal Authentication
Requests from the orchestrator to the worker HTTP API are authenticated via the X-Internal-Token header. This prevents external access to workspace operations.
Configuration:
Sources: services/workspace-worker/main.py:461-819, orchestrator/core/workspace_client.py:1-185
Task Execution Environment
WorkspaceToolExecutor
The WorkspaceToolExecutor class provides sandboxed execution of task steps with command whitelisting, path containment, and output limits.
Allowed Commands
The executor enforces a strict whitelist of 73 allowed commands. Path-based binaries (e.g., /usr/bin/python) and relative paths (e.g., ./malicious) are rejected.
Categories:
Shell
sh, bash, cd, pwd, export, source, test
VCS
git
Python
python, python3, pip, pip3, uv, pytest, ruff, black, mypy
Node.js
node, npm, npx, pnpm, yarn, vitest, jest, tsc, eslint
Search
ls, cat, grep, egrep, fgrep, rg, find, tree
Build
make, cmake, cargo, go, mvn, gradle, rustc, gcc
Blocked Patterns:
rm -rf /andrm -rf /{anything}sudo,su,passwd,useradd,userdelchmod 777,mount,umount,mkfsBackticks, embedded newlines, device access (
> /dev/)Kubernetes, iptables, systemctl
Sources: services/workspace-worker/executor.py:35-99, services/workspace-worker/executor.py:448-501
Workspace Directory Structure
Each workspace gets a persistent directory tree on the worker volume with repos, ephemeral task dirs, and artifacts.
Workspace Metadata:
Sources: services/workspace-worker/workspace_manager.py:1-308
Storage Quota Enforcement
The WorkspaceManager enforces per-workspace storage quotas (default: 5GB) by calculating disk usage before task execution. Tasks are rejected if the quota is exceeded.
Configuration:
Quota Check:
Sources: services/workspace-worker/workspace_manager.py:83-115, services/workspace-worker/main.py:250-264
Integration with Platform Tools
The task management system integrates with the platform tools registry to provide workspace actions (workspace_read_file, workspace_exec, etc.) that agents can invoke during chat or recipe execution.
Workspace Action Definitions
workspace_read_file
workspace_files
read
Read file contents
workspace_write_file
workspace_files
write
Write or create file
workspace_list_dir
workspace_files
read
List directory entries
workspace_grep
workspace_files
read
Search files with regex
workspace_exec
workspace_exec
write
Run sandboxed command
workspace_git
workspace_git
write
Execute git operation
These actions proxy requests through the WorkspaceClient to the worker HTTP API, which executes them via WorkspaceToolExecutor.
Execution Flow:
Sources: orchestrator/modules/tools/discovery/workspace_actions.py:1-249, orchestrator/core/workspace_client.py:56-185
Error Handling
Task execution errors are captured at multiple levels and persisted for debugging:
Error Capture Hierarchy
Step-level errors: Returned in
result["steps"][i]["result"]["error"]but don't stop execution (agent decides)Task-level exceptions: Caught by
_execute_task_wrapper, written to result, status set tofailedTimeout errors: Subprocess killed, status set to
timed_out, elapsed time recordedCancellation: Checked between steps, status set to
cancelledif detected
Error Result Structure
Sources: services/workspace-worker/main.py:331-354
Heartbeat and Health Monitoring
Workers report health via two mechanisms:
Heartbeat Loop
Every 30 seconds, the worker updates a heartbeat key in Redis with the current timestamp and reports active task IDs.
Health Check Endpoint
The /health endpoint returns worker status including concurrency, active task count, and volume path.
Sources: services/workspace-worker/main.py:440-459, services/workspace-worker/main.py:516-523
Last updated

