Document Ingestion Pipeline

chevron-rightRelevant source fileshashtag

This page describes the complete document ingestion flow from upload/sync through text extraction, chunking, embedding generation, and storage. The pipeline processes documents into searchable vector chunks stored in S3 Vectors with metadata tracked in PostgreSQL.

For document search and retrieval, see RAG Retrieval System. For cloud storage configuration, see Cloud Storage Integration. For chunking algorithm details, see Semantic Chunking Strategies.


Pipeline Overview

The ingestion pipeline transforms raw documents into semantically searchable chunks through five stages: acquisition, extraction, chunking, embedding, and storage. Each stage is optimized for specific file types and includes fallback mechanisms for robustness.

spinner

Sources: orchestrator/modules/rag/ingestion/manager.py:402-750, orchestrator/api/documents.py:106-262, orchestrator/modules/rag/services/cloud_sync_service.py:38-290


Entry Points

Direct Upload via API

The /api/documents/upload endpoint accepts multipart form data with MIME type validation using python-magic to detect actual file content (not just extension checking). This prevents malicious files disguised with fake extensions.

Validation Layer
Implementation
Purpose

MIME Detection

magic.from_buffer(content, mime=True)

Detect actual file type from binary content

Extension Mapping

ALLOWED_MIME_TYPES dict

Verify extension matches detected MIME

Size Limit

50MB cap

Prevent resource exhaustion

Content Hash

SHA-256 deduplication

Skip re-processing identical files

Supported Formats:

  • Documents: PDF, DOCX, TXT, Markdown

  • Code: Python, JSON

  • Spreadsheets: XLSX, CSV

Processing Flow:

spinner

Sources: orchestrator/api/documents.py:106-262, orchestrator/api/documents.py:88-104

Cloud Storage Sync

The CloudSyncService orchestrates batch syncing from Google Drive, Dropbox, OneDrive, and Box via Composio actions. It maintains sync state in the cloud_documents table to track which files have been processed and their modification timestamps.

Sync Architecture:

spinner

Key Features:

  • Incremental Sync: Compares cloud_modified_at to skip unchanged files

  • Parallel Processing: asyncio.Semaphore(3) limits concurrent downloads

  • File Type Filtering: Skips unsupported extensions (.ttf, .png, etc.)

  • Redis Caching: Stores folder/file listings to reduce Composio API calls

Sources: orchestrator/modules/rag/services/cloud_sync_service.py:197-402, orchestrator/modules/rag/services/cloud_file_downloader.py:60-144


Text Extraction

The DocumentProcessor class implements multimodal extraction with format-specific parsers and fallback strategies.

PDF Extraction

PDFs use a two-tier extraction strategy to handle both standard and malformed documents:

  1. Primary: pdfplumber with table extraction (tables converted to Markdown)

  2. Fallback: PyPDF2 for PDFs that crash pdfplumber

spinner

Table Extraction Example: PDFs with embedded tables are converted to Markdown for better LLM comprehension:

Sources: orchestrator/modules/rag/ingestion/manager.py:157-194, orchestrator/modules/rag/ingestion/manager.py:480-514

Spreadsheet Extraction

XLSX and CSV files are converted to Markdown tables per sheet:

Format
Library
Processing

XLSX

openpyxl

Load workbook → iterate sheets → convert each sheet to Markdown table

CSV

csv.reader

Read rows → convert to single Markdown table

Encoding

UTF-8 with latin-1 fallback

Handles international characters

Sources: orchestrator/modules/rag/ingestion/manager.py:208-273

DOCX and Plain Text

  • DOCX: python-docx extracts paragraph text sequentially

  • Markdown/TXT/JSON/Python: Direct UTF-8 read with latin-1 fallback

  • Character Cleaning: All extracted text has null bytes (\x00, \x01, \x02) stripped

Sources: orchestrator/modules/rag/ingestion/manager.py:196-207, orchestrator/modules/rag/ingestion/manager.py:261-287


Semantic Chunking

The SemanticChunker implements five strategies based on information theory and semantic similarity. The default strategy is TOPIC_COHERENCE for fast, keyword-based chunking without local model dependencies.

Chunking Strategies

Strategy
Method
Use Case
Performance

SEMANTIC_SIMILARITY

Embedding-based cosine similarity

High-quality semantic boundaries

Slow (embedding calls)

INFORMATION_DENSITY

Shannon entropy calculations

Technical documents with varying density

Medium

TOPIC_COHERENCE

Keyword overlap (Jaccard similarity)

Default - Fast, no embeddings required

Fast

HIERARCHICAL

Parent-child chunk relationships

Long documents needing context expansion

Medium

ADAPTIVE

Runs all 3 strategies, picks best

Unknown document types

Slow

Chunk Size Constraints:

TOPIC_COHERENCE Strategy (Default)

This strategy uses keyword extraction and Jaccard similarity to maintain topic continuity without requiring embeddings:

spinner

Metadata Captured:

Each chunk includes these mathematical metrics:

Sources: orchestrator/modules/rag/ingestion/manager.py:289-400, orchestrator/modules/rag/chunking/semantic_chunker.py:200-252

Chunk Quality Filters

The pipeline applies post-processing filters to remove low-quality chunks:

Sources: orchestrator/modules/rag/ingestion/manager.py:357-400


Embedding Generation

Embeddings are generated using the centralized EmbeddingManager which abstracts multiple providers:

spinner

Batch Processing:

The pipeline generates embeddings in batches to optimize API usage:

Provider Selection:

The system uses a 6-level credential resolution strategy (see Agent Factory & Runtime):

  1. Workspace-level provider override

  2. Agent-specific credentials

  3. User credentials

  4. Workspace default

  5. System default

  6. Hardcoded fallback

Sources: orchestrator/modules/rag/ingestion/manager.py:402-450


Storage Architecture

The ingestion pipeline uses a dual-storage model: S3 for large blobs (raw files, vectors) and PostgreSQL for metadata (document records, chunk tracking).

Document Storage

Raw uploaded files are stored in S3 with workspace isolation:

Upload Flow:

spinner

Sources: orchestrator/modules/rag/ingestion/manager.py:652-687

Vector Storage

Embeddings are stored in S3 Vectors (dedicated workspace buckets) rather than PostgreSQL pgvector for scalability:

Storage Type
Use Case
Trade-offs

S3 Vectors (default)

Production deployment, unlimited scale

Slightly higher latency (~100-200ms)

PostgreSQL pgvector

Development, low-volume

Fast queries, storage limits

S3 Vectors Schema:

Bucket Isolation:

Each workspace gets a dedicated S3 bucket:

Sources: orchestrator/modules/rag/ingestion/manager.py:405-445, orchestrator/modules/rag/service.py:808-821

PostgreSQL Metadata

The database tracks document lifecycle and chunk metadata for fast querying:

Schema Overview:

Indexes for Performance:

Sources: orchestrator/modules/rag/ingestion/manager.py:578-643


Error Handling and Retry Logic

The pipeline implements multi-layer error handling to ensure robustness:

Document-Level Retry

spinner

Deduplication Strategy:

Sources: orchestrator/modules/rag/ingestion/manager.py:688-728

Cloud Download Fallback

The CloudFileDownloader implements a 2-layer download strategy for Google Drive (which truncates inline responses):

spinner

Provider-Specific Actions:

Sources: orchestrator/modules/rag/services/cloud_file_downloader.py:60-143, orchestrator/modules/rag/services/cloud_file_downloader.py:341-423


Performance Considerations

Token Efficiency

The semantic chunker achieves 80-90% token savings compared to naive fixed-size splitting by:

  1. Intelligent Boundaries: Splits at semantic breaks (sentence/paragraph) rather than arbitrary character counts

  2. Quality Filters: Removes separator-only chunks and low-information fragments

  3. Overlap Management: Uses 10% overlap only where semantically beneficial

Parallel Processing

Cloud sync uses asyncio.Semaphore(3) for concurrent downloads:

Throughput: Processes ~3 files simultaneously, reducing sync time by 70% for large folders.

Sources: orchestrator/modules/rag/services/cloud_sync_service.py:290-384

Embedding Cache

The SemanticChunker maintains an in-memory embedding cache with FIFO eviction:

Sources: orchestrator/modules/rag/chunking/semantic_chunker.py:78-82, orchestrator/modules/rag/chunking/semantic_chunker.py:372-381


Configuration

Key configuration parameters from config.py:

Variable
Default
Purpose

S3_DOCUMENTS_BUCKET

automatos-documents

Raw file storage bucket

S3_VECTORS_ENABLED

true

Use S3 Vectors vs PostgreSQL pgvector

CHUNK_SIZE

500

Target chunk size in tokens

MIN_CHUNK_SIZE

100

Minimum viable chunk

MAX_CHUNK_SIZE

1500

Hard limit per chunk

DIVERSITY_FACTOR

0.3

MMR diversity in retrieval

MIN_SIMILARITY

0.5

Minimum cosine similarity for RAG results

System Settings Integration:

The RAG module reads from system_settings table for runtime tuning:

Sources: orchestrator/modules/rag/service.py:47-140


The ingestion pipeline integrates with several other subsystems:

Sources: orchestrator/modules/rag/service.py:210-295, orchestrator/modules/search/services/entity_extractor.py:40-88


Last updated