Document Ingestion Pipeline
This page describes the complete document ingestion flow from upload/sync through text extraction, chunking, embedding generation, and storage. The pipeline processes documents into searchable vector chunks stored in S3 Vectors with metadata tracked in PostgreSQL.
For document search and retrieval, see RAG Retrieval System. For cloud storage configuration, see Cloud Storage Integration. For chunking algorithm details, see Semantic Chunking Strategies.
Pipeline Overview
The ingestion pipeline transforms raw documents into semantically searchable chunks through five stages: acquisition, extraction, chunking, embedding, and storage. Each stage is optimized for specific file types and includes fallback mechanisms for robustness.
Sources: orchestrator/modules/rag/ingestion/manager.py:402-750, orchestrator/api/documents.py:106-262, orchestrator/modules/rag/services/cloud_sync_service.py:38-290
Entry Points
Direct Upload via API
The /api/documents/upload endpoint accepts multipart form data with MIME type validation using python-magic to detect actual file content (not just extension checking). This prevents malicious files disguised with fake extensions.
MIME Detection
magic.from_buffer(content, mime=True)
Detect actual file type from binary content
Extension Mapping
ALLOWED_MIME_TYPES dict
Verify extension matches detected MIME
Size Limit
50MB cap
Prevent resource exhaustion
Content Hash
SHA-256 deduplication
Skip re-processing identical files
Supported Formats:
Documents: PDF, DOCX, TXT, Markdown
Code: Python, JSON
Spreadsheets: XLSX, CSV
Processing Flow:
Sources: orchestrator/api/documents.py:106-262, orchestrator/api/documents.py:88-104
Cloud Storage Sync
The CloudSyncService orchestrates batch syncing from Google Drive, Dropbox, OneDrive, and Box via Composio actions. It maintains sync state in the cloud_documents table to track which files have been processed and their modification timestamps.
Sync Architecture:
Key Features:
Incremental Sync: Compares
cloud_modified_atto skip unchanged filesParallel Processing:
asyncio.Semaphore(3)limits concurrent downloadsFile Type Filtering: Skips unsupported extensions (
.ttf,.png, etc.)Redis Caching: Stores folder/file listings to reduce Composio API calls
Sources: orchestrator/modules/rag/services/cloud_sync_service.py:197-402, orchestrator/modules/rag/services/cloud_file_downloader.py:60-144
Text Extraction
The DocumentProcessor class implements multimodal extraction with format-specific parsers and fallback strategies.
PDF Extraction
PDFs use a two-tier extraction strategy to handle both standard and malformed documents:
Primary:
pdfplumberwith table extraction (tables converted to Markdown)Fallback:
PyPDF2for PDFs that crash pdfplumber
Table Extraction Example: PDFs with embedded tables are converted to Markdown for better LLM comprehension:
Sources: orchestrator/modules/rag/ingestion/manager.py:157-194, orchestrator/modules/rag/ingestion/manager.py:480-514
Spreadsheet Extraction
XLSX and CSV files are converted to Markdown tables per sheet:
XLSX
openpyxl
Load workbook → iterate sheets → convert each sheet to Markdown table
CSV
csv.reader
Read rows → convert to single Markdown table
Encoding
UTF-8 with latin-1 fallback
Handles international characters
Sources: orchestrator/modules/rag/ingestion/manager.py:208-273
DOCX and Plain Text
DOCX:
python-docxextracts paragraph text sequentiallyMarkdown/TXT/JSON/Python: Direct UTF-8 read with latin-1 fallback
Character Cleaning: All extracted text has null bytes (
\x00,\x01,\x02) stripped
Sources: orchestrator/modules/rag/ingestion/manager.py:196-207, orchestrator/modules/rag/ingestion/manager.py:261-287
Semantic Chunking
The SemanticChunker implements five strategies based on information theory and semantic similarity. The default strategy is TOPIC_COHERENCE for fast, keyword-based chunking without local model dependencies.
Chunking Strategies
SEMANTIC_SIMILARITY
Embedding-based cosine similarity
High-quality semantic boundaries
Slow (embedding calls)
INFORMATION_DENSITY
Shannon entropy calculations
Technical documents with varying density
Medium
TOPIC_COHERENCE
Keyword overlap (Jaccard similarity)
Default - Fast, no embeddings required
Fast
HIERARCHICAL
Parent-child chunk relationships
Long documents needing context expansion
Medium
ADAPTIVE
Runs all 3 strategies, picks best
Unknown document types
Slow
Chunk Size Constraints:
TOPIC_COHERENCE Strategy (Default)
This strategy uses keyword extraction and Jaccard similarity to maintain topic continuity without requiring embeddings:
Metadata Captured:
Each chunk includes these mathematical metrics:
Sources: orchestrator/modules/rag/ingestion/manager.py:289-400, orchestrator/modules/rag/chunking/semantic_chunker.py:200-252
Chunk Quality Filters
The pipeline applies post-processing filters to remove low-quality chunks:
Sources: orchestrator/modules/rag/ingestion/manager.py:357-400
Embedding Generation
Embeddings are generated using the centralized EmbeddingManager which abstracts multiple providers:
Batch Processing:
The pipeline generates embeddings in batches to optimize API usage:
Provider Selection:
The system uses a 6-level credential resolution strategy (see Agent Factory & Runtime):
Workspace-level provider override
Agent-specific credentials
User credentials
Workspace default
System default
Hardcoded fallback
Sources: orchestrator/modules/rag/ingestion/manager.py:402-450
Storage Architecture
The ingestion pipeline uses a dual-storage model: S3 for large blobs (raw files, vectors) and PostgreSQL for metadata (document records, chunk tracking).
Document Storage
Raw uploaded files are stored in S3 with workspace isolation:
Upload Flow:
Sources: orchestrator/modules/rag/ingestion/manager.py:652-687
Vector Storage
Embeddings are stored in S3 Vectors (dedicated workspace buckets) rather than PostgreSQL pgvector for scalability:
S3 Vectors (default)
Production deployment, unlimited scale
Slightly higher latency (~100-200ms)
PostgreSQL pgvector
Development, low-volume
Fast queries, storage limits
S3 Vectors Schema:
Bucket Isolation:
Each workspace gets a dedicated S3 bucket:
Sources: orchestrator/modules/rag/ingestion/manager.py:405-445, orchestrator/modules/rag/service.py:808-821
PostgreSQL Metadata
The database tracks document lifecycle and chunk metadata for fast querying:
Schema Overview:
Indexes for Performance:
Sources: orchestrator/modules/rag/ingestion/manager.py:578-643
Error Handling and Retry Logic
The pipeline implements multi-layer error handling to ensure robustness:
Document-Level Retry
Deduplication Strategy:
Sources: orchestrator/modules/rag/ingestion/manager.py:688-728
Cloud Download Fallback
The CloudFileDownloader implements a 2-layer download strategy for Google Drive (which truncates inline responses):
Provider-Specific Actions:
Sources: orchestrator/modules/rag/services/cloud_file_downloader.py:60-143, orchestrator/modules/rag/services/cloud_file_downloader.py:341-423
Performance Considerations
Token Efficiency
The semantic chunker achieves 80-90% token savings compared to naive fixed-size splitting by:
Intelligent Boundaries: Splits at semantic breaks (sentence/paragraph) rather than arbitrary character counts
Quality Filters: Removes separator-only chunks and low-information fragments
Overlap Management: Uses 10% overlap only where semantically beneficial
Parallel Processing
Cloud sync uses asyncio.Semaphore(3) for concurrent downloads:
Throughput: Processes ~3 files simultaneously, reducing sync time by 70% for large folders.
Sources: orchestrator/modules/rag/services/cloud_sync_service.py:290-384
Embedding Cache
The SemanticChunker maintains an in-memory embedding cache with FIFO eviction:
Sources: orchestrator/modules/rag/chunking/semantic_chunker.py:78-82, orchestrator/modules/rag/chunking/semantic_chunker.py:372-381
Configuration
Key configuration parameters from config.py:
S3_DOCUMENTS_BUCKET
automatos-documents
Raw file storage bucket
S3_VECTORS_ENABLED
true
Use S3 Vectors vs PostgreSQL pgvector
CHUNK_SIZE
500
Target chunk size in tokens
MIN_CHUNK_SIZE
100
Minimum viable chunk
MAX_CHUNK_SIZE
1500
Hard limit per chunk
DIVERSITY_FACTOR
0.3
MMR diversity in retrieval
MIN_SIMILARITY
0.5
Minimum cosine similarity for RAG results
System Settings Integration:
The RAG module reads from system_settings table for runtime tuning:
Sources: orchestrator/modules/rag/service.py:47-140
Related Components
The ingestion pipeline integrates with several other subsystems:
RAG Retrieval System: Uses ingested chunks for semantic search
Knowledge Graph & Entity Extraction: Extracts entities from chunks for graph-based retrieval
Document Management: UI layer for upload, browsing, and deletion
Cloud Storage Integration: OAuth flow and connection management
Tools & Integrations: Composio actions for cloud provider APIs
Sources: orchestrator/modules/rag/service.py:210-295, orchestrator/modules/search/services/entity_extractor.py:40-88
Last updated

