# Phase 2: Embedding Pipeline ## Objective Build the complete document ingestion and embedding pipeline: upload content → parse (text + images) → chunk (content-type-aware) → embed via configurable model → store vectors in Neo4j → extract concepts for knowledge graph. ## Heritage The embedding pipeline adapts proven patterns from [Spelunker](https://git.helu.ca/r/spelunker)'s `rag/services/embeddings.py` — semantic chunking, batch embedding, S3 chunk storage, and progress tracking — enhanced with multimodal capabilities, knowledge graph relationships, and content-type awareness. ## Architecture Overview ``` Upload (API/Admin) → S3 Storage (original file) → Document Parsing (PyMuPDF — text + images) → Content-Type-Aware Chunking (semantic-text-splitter) → Text Embedding (system embedding model via LLM Manager) → Image Embedding (multimodal model, if available) → Neo4j Graph Storage (Chunk nodes, Image nodes, vectors) → Concept Extraction (system chat model) → Knowledge Graph (Concept nodes, MENTIONS/REFERENCES edges) ``` ## Deliverables ### 1. Document Parsing Service (`library/services/parsers.py`) **Primary parser: PyMuPDF** — a single library handling all document formats with unified text + image extraction. #### Supported Formats | Format | Extensions | Text Extraction | Image Extraction | |--------|-----------|----------------|-----------------| | PDF | `.pdf` | Layout-preserving text | Embedded images, diagrams | | EPUB | `.epub` | Chapter-structured HTML | Cover art, illustrations | | DOCX | `.docx` | Via HTML conversion | Inline images, diagrams | | PPTX | `.pptx` | Via HTML conversion | Slide images, charts | | XLSX | `.xlsx` | Via HTML conversion | Embedded charts | | XPS | `.xps` | Native | Native | | MOBI | `.mobi` | Native | Native | | FB2 | `.fb2` | Native | Native | | CBZ | `.cbz` | Native | Native (comic pages) | | Plain text | `.txt`, `.md` | Direct read | N/A | | HTML | `.html`, `.htm` | PyMuPDF or direct | Inline images | | Images | `.jpg`, `.png`, etc. | N/A (OCR future) | The image itself | #### Text Sanitization Ported from Spelunker's `text_utils.py`: - Remove null bytes and control characters - Remove zero-width characters - Normalize Unicode to NFC - Replace invalid UTF-8 sequences - Clean PDF ligatures and artifacts - Normalize whitespace #### Image Extraction For each document page/section, extract embedded images via `page.get_images()` → `doc.extract_image(xref)`: - Raw image bytes (PNG/JPEG) - Dimensions (width × height) - Source page/position for chunk-image association - Store in S3: `images/{item_uid}/{image_index}.{ext}` #### Parse Result Structure ```python @dataclass class TextBlock: text: str page: int metadata: dict # {heading_level, section_name, etc.} @dataclass class ExtractedImage: data: bytes ext: str # png, jpg, etc. width: int height: int source_page: int source_index: int @dataclass class ParseResult: text_blocks: list[TextBlock] images: list[ExtractedImage] metadata: dict # {page_count, title, author, etc.} file_type: str ``` ### 2. Content-Type-Aware Chunking Service (`library/services/chunker.py`) Uses `semantic-text-splitter` with HuggingFace tokenizer (proven in Spelunker). #### Strategy Dispatch Based on `Library.chunking_config`: | Strategy | Library Type | Boundary Markers | Chunk Size | Overlap | |----------|-------------|-----------------|-----------|---------| | `chapter_aware` | Fiction | chapter, scene, paragraph | 1024 | 128 | | `section_aware` | Technical | section, subsection, code_block, list | 512 | 64 | | `song_level` | Music | song, verse, chorus | 512 | 32 | | `scene_level` | Film | scene, act, sequence | 768 | 64 | | `description_level` | Art | artwork, description, analysis | 512 | 32 | | `entry_level` | Journal | entry, date, paragraph | 512 | 32 | #### Chunk-Image Association Track which images appeared near which text chunks: - PDF: image bounding boxes on specific pages - DOCX/PPTX: images associated with slides/sections - EPUB: images referenced from specific chapters Creates `Chunk -[HAS_NEARBY_IMAGE]-> Image` relationships with proximity metadata. #### Chunk Storage - Chunk text stored in S3: `chunks/{item_uid}/chunk_{index}.txt` - `text_preview` (first 500 chars) stored on Chunk node for full-text indexing ### 3. Embedding Client (`library/services/embedding_client.py`) Multi-backend embedding client dispatching by `LLMApi.api_type`. #### Backend Support | API Type | Protocol | Auth | Batch Support | |----------|---------|------|---------------| | `openai` | HTTP POST `/embeddings` | API key header | Native batch | | `vllm` | HTTP POST `/embeddings` | API key header | Native batch | | `llama-cpp` | HTTP POST `/embeddings` | API key header | Native batch | | `ollama` | HTTP POST `/embeddings` | None | Native batch | | `bedrock` | HTTP POST `/model/{id}/invoke` | Bearer token | Client-side loop | #### Bedrock Integration Uses Amazon Bedrock API keys (Bearer token auth) — no boto3 SDK required: ``` POST https://bedrock-runtime.{region}.amazonaws.com/model/{model_id}/invoke Authorization: Bearer {bedrock_api_key} Content-Type: application/json {"inputText": "text to embed", "dimensions": 1024, "normalize": true} → {"embedding": [float, ...], "inputTextTokenCount": 42} ``` **LLMApi setup for Bedrock embeddings:** - `api_type`: `"bedrock"` - `base_url`: `https://bedrock-runtime.us-east-1.amazonaws.com` - `api_key`: Bedrock API key (encrypted) **LLMApi setup for Bedrock chat (Claude, etc.):** - `api_type`: `"openai"` (Mantle endpoint is OpenAI-compatible) - `base_url`: `https://bedrock-mantle.us-east-1.api.aws/v1` - `api_key`: Same Bedrock API key #### Embedding Instruction Prefix Before embedding, prepend the library's `embedding_instruction` to each chunk: ``` "{embedding_instruction}\n\n{chunk_text}" ``` #### Image Embedding For multimodal models (`model.supports_multimodal`): - Send base64-encoded image to the embedding endpoint - Create `ImageEmbedding` node with the resulting vector - If no multimodal model available, skip (images stored but not embedded) #### Model Matching Track embedded model by **name** (not UUID). Multiple APIs can serve the same model — matching by name allows provider switching without re-embedding. ### 4. Pipeline Orchestrator (`library/services/pipeline.py`) Coordinates the full flow: parse → chunk → embed → store → graph. #### Pipeline Stages 1. **Parse**: Extract text blocks + images from document 2. **Chunk**: Split text using content-type-aware strategy 3. **Store chunks**: S3 + Chunk nodes in Neo4j 4. **Embed text**: Generate vectors for all chunks 5. **Store images**: S3 + Image nodes in Neo4j 6. **Embed images**: Multimodal vectors (if available) 7. **Extract concepts**: Named entities from chunk text (via system chat model) 8. **Build graph**: Create Concept nodes, MENTIONS/REFERENCES edges #### Idempotency - Check `Item.content_hash` — skip if already processed with same hash - Re-embedding deletes existing Chunk/Image nodes before re-processing #### Dimension Compatibility - Validate that the system embedding model's `vector_dimensions` matches the Neo4j vector index dimensions - Warn at embed time if mismatch detected ### 5. Concept Extraction (`library/services/concepts.py`) Uses the system chat model for LLM-based named entity recognition. - Extract: people, places, topics, techniques, themes - Create/update `Concept` nodes (deduplicated by name via unique_index) - Connect: `Chunk -[MENTIONS]-> Concept`, `Item -[REFERENCES]-> Concept` - Embed concept names for vector search - If no system chat model configured, concept extraction is skipped ### 6. Celery Tasks (`library/tasks.py`) All tasks pass IDs (not model instances) per Red Panda Standards. | Task | Queue | Purpose | |------|-------|---------| | `embed_item(item_uid)` | `embedding` | Full pipeline for single item | | `embed_collection(collection_uid)` | `batch` | All items in a collection | | `embed_library(library_uid)` | `batch` | All items in a library | | `batch_embed_items(item_uids)` | `batch` | Specific items | | `reembed_item(item_uid)` | `embedding` | Delete + re-embed | Tasks are idempotent, include retry logic, and track progress via Memcached: `library:task:{task_id}:progress`. ### 7. Prometheus Metrics (`library/metrics.py`) Custom metrics for pipeline observability: | Metric | Type | Labels | Purpose | |--------|------|--------|---------| | `mnemosyne_documents_parsed_total` | Counter | file_type, status | Parse throughput | | `mnemosyne_document_parse_duration_seconds` | Histogram | file_type | Parse latency | | `mnemosyne_images_extracted_total` | Counter | file_type | Image extraction volume | | `mnemosyne_chunks_created_total` | Counter | library_type, strategy | Chunk throughput | | `mnemosyne_chunk_size_tokens` | Histogram | — | Chunk size distribution | | `mnemosyne_embeddings_generated_total` | Counter | model_name, api_type, content_type | Embedding throughput | | `mnemosyne_embedding_batch_duration_seconds` | Histogram | model_name, api_type | API latency | | `mnemosyne_embedding_api_errors_total` | Counter | model_name, api_type, error_type | API failures | | `mnemosyne_embedding_tokens_total` | Counter | model_name | Token consumption | | `mnemosyne_pipeline_items_total` | Counter | status | Pipeline throughput | | `mnemosyne_pipeline_item_duration_seconds` | Histogram | — | End-to-end latency | | `mnemosyne_pipeline_items_in_progress` | Gauge | — | Concurrent processing | | `mnemosyne_concepts_extracted_total` | Counter | concept_type | Concept extraction volume | ### 8. Model Changes #### Item Node — New Fields | Field | Type | Purpose | |-------|------|---------| | `embedding_status` | StringProperty | pending / processing / completed / failed | | `embedding_model_name` | StringProperty | Name of model that generated embeddings | | `chunk_count` | IntegerProperty | Number of chunks created | | `image_count` | IntegerProperty | Number of images extracted | | `error_message` | StringProperty | Last error message (if failed) | #### New Relationship Model ```python class NearbyImageRel(StructuredRel): proximity = StringProperty(default="same_page") # same_page, inline, same_slide, same_chapter ``` #### Chunk Node — New Relationship ```python nearby_images = RelationshipTo('Image', 'HAS_NEARBY_IMAGE', model=NearbyImageRel) ``` #### LLMApi Model — New API Type Add `("bedrock", "Amazon Bedrock")` to `api_type` choices. ### 9. API Enhancements - `POST /api/v1/library/items/` — File upload with auto-trigger of `embed_item` task - `POST /api/v1/library/items//reembed/` — Re-embed endpoint - `GET /api/v1/library/items//status/` — Embedding status check - Admin views: File upload field on item create, embedding status display ### 10. Management Commands | Command | Purpose | |---------|---------| | `embed_item ` | CLI embedding for testing | | `embed_collection ` | CLI batch embedding | | `embedding_status` | Show embedding progress/statistics | ### 11. Dynamic Vector Index Dimensions Update `setup_neo4j_indexes` to read dimensions from `LLMModel.get_system_embedding_model().vector_dimensions` instead of hardcoding 4096. ## Celery Workers & Scheduler ### Prerequisites - RabbitMQ running on `oberon.incus:5672` with `mnemosyne` vhost and user - `.env` configured with `CELERY_BROKER_URL=amqp://mnemosyne:password@oberon.incus:5672/mnemosyne` - Virtual environment activated: `source ~/env/mnemosyne/bin/activate` ### Queues Mnemosyne uses three Celery queues with task routing configured in `settings.py`: | Queue | Tasks | Purpose | Recommended Concurrency | |-------|-------|---------|------------------------| | `celery` (default) | `llm_manager.validate_all_llm_apis`, `llm_manager.validate_single_api` | LLM API validation & model discovery | 2 | | `embedding` | `library.tasks.embed_item`, `library.tasks.reembed_item` | Single-item embedding pipeline (GPU-bound) | 1 | | `batch` | `library.tasks.embed_collection`, `library.tasks.embed_library`, `library.tasks.batch_embed_items` | Batch orchestration (dispatches to embedding queue) | 2 | Task routing (`settings.py`): ```python CELERY_TASK_ROUTES = { "library.tasks.embed_*": {"queue": "embedding"}, "library.tasks.batch_*": {"queue": "batch"}, } ``` ### Starting Workers All commands run from the Django project root (`mnemosyne/`): **Development — single worker, all queues:** ```bash cd mnemosyne celery -A mnemosyne worker -l info -Q celery,embedding,batch ``` **Development — eager mode (no worker needed):** Set `CELERY_TASK_ALWAYS_EAGER=True` in `.env`. All tasks execute synchronously in the web process. Useful for debugging but does not test async behavior. **Production — separate workers per queue:** ```bash # Embedding worker (single concurrency — GPU is sequential) celery -A mnemosyne worker \ -l info \ -Q embedding \ -c 1 \ -n embedding@%h \ --max-tasks-per-child=100 # Batch orchestration worker celery -A mnemosyne worker \ -l info \ -Q batch \ -c 2 \ -n batch@%h # Default queue worker (LLM API validation, etc.) celery -A mnemosyne worker \ -l info \ -Q celery \ -c 2 \ -n default@%h ``` ### Celery Beat (Periodic Scheduler) Celery Beat runs scheduled tasks (e.g., periodic LLM API validation): ```bash # File-based scheduler (simple, stores schedule in celerybeat-schedule file) celery -A mnemosyne beat -l info # Or with Django database scheduler (if django-celery-beat is installed) celery -A mnemosyne beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler ``` Example periodic task schedule (add to `settings.py` if needed): ```python from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { "validate-llm-apis-daily": { "task": "llm_manager.validate_all_llm_apis", "schedule": crontab(hour=6, minute=0), # Daily at 6 AM }, } ``` ### Flower (Task Monitoring) [Flower](https://flower.readthedocs.io/) provides a real-time web UI for monitoring Celery workers and tasks: ```bash celery -A mnemosyne flower --port=5555 ``` Access at `http://localhost:5555`. Shows: - Active/completed/failed tasks - Worker status and resource usage - Task execution times and retry counts - Queue depths ### Reliability Configuration The following settings are already configured in `settings.py`: | Setting | Value | Purpose | |---------|-------|---------| | `CELERY_TASK_ACKS_LATE` | `True` | Acknowledge tasks after execution (not on receipt) — prevents task loss on worker crash | | `CELERY_WORKER_PREFETCH_MULTIPLIER` | `1` | Workers fetch one task at a time — ensures fair distribution across workers | | `CELERY_ACCEPT_CONTENT` | `["json"]` | Only accept JSON-serialized tasks | | `CELERY_TASK_SERIALIZER` | `"json"` | Serialize task arguments as JSON | ### Task Progress Tracking Embedding tasks report progress via Memcached using the key pattern: ``` library:task:{task_id}:progress → {"percent": 45, "message": "Embedded 12/27 chunks"} ``` Tasks also update Celery's native state: ```python # Query task progress from Python from celery.result import AsyncResult result = AsyncResult(task_id) result.state # "PROGRESS", "SUCCESS", "FAILURE" result.info # {"percent": 45, "message": "..."} ``` ## Dependencies ```toml # New additions to pyproject.toml "PyMuPDF>=1.24,<2.0", "pymupdf4llm>=0.0.17,<1.0", "semantic-text-splitter>=0.20,<1.0", "tokenizers>=0.20,<1.0", "Pillow>=10.0,<12.0", "django-prometheus>=2.3,<3.0", ``` ### License Note PyMuPDF is AGPL-3.0 licensed. Acceptable for self-hosted personal use. Commercial distribution would require Artifex's commercial license. ## File Structure ``` mnemosyne/library/ ├── services/ │ ├── __init__.py │ ├── parsers.py # PyMuPDF universal document parsing │ ├── text_utils.py # Text sanitization (from Spelunker) │ ├── chunker.py # Content-type-aware chunking │ ├── embedding_client.py # Multi-backend embedding API client │ ├── pipeline.py # Orchestration: parse → chunk → embed → graph │ └── concepts.py # LLM-based concept extraction ├── metrics.py # Prometheus metrics definitions ├── tasks.py # Celery tasks for async embedding ├── management/commands/ │ ├── embed_item.py │ ├── embed_collection.py │ └── embedding_status.py └── tests/ ├── test_parsers.py ├── test_text_utils.py ├── test_chunker.py ├── test_embedding_client.py ├── test_pipeline.py ├── test_concepts.py └── test_tasks.py ``` ## Testing Strategy All tests use Django `TestCase`. External services (LLM APIs, Neo4j) are mocked. | Test File | Scope | |-----------|-------| | `test_parsers.py` | PyMuPDF parsing for each file type, image extraction, text sanitization | | `test_text_utils.py` | Sanitization functions, PDF artifact cleaning, Unicode normalization | | `test_chunker.py` | Content-type strategies, boundary detection, chunk-image association | | `test_embedding_client.py` | OpenAI-compat + Bedrock backends (mocked HTTP), batch processing, usage tracking | | `test_pipeline.py` | Full pipeline integration (mocked), S3 storage, idempotency | | `test_concepts.py` | Concept extraction, deduplication, graph relationships | | `test_tasks.py` | Celery tasks (eager mode), retry logic, error handling | ## Success Criteria - [ ] Upload a document (PDF, EPUB, DOCX, PPTX, TXT) via API or admin → file stored in S3 - [ ] Images extracted from documents and stored as Image nodes in Neo4j - [ ] Document automatically chunked using content-type-aware strategy - [ ] Chunks embedded via system embedding model and vectors stored in Neo4j Chunk nodes - [ ] Images embedded multimodally into ImageEmbedding nodes (when multimodal model available) - [ ] Chunk-image proximity relationships established in graph - [ ] Concepts extracted and graph populated with MENTIONS/REFERENCES relationships - [ ] Neo4j vector indexes usable for similarity queries on stored embeddings - [ ] Celery tasks handle async embedding with progress tracking - [ ] Re-embedding works (delete old chunks, re-process) - [ ] Content hash prevents redundant re-embedding - [x] Prometheus metrics exposed at `/metrics` for pipeline monitoring - [ ] All tests pass with mocked LLM/embedding APIs - [ ] Bedrock embedding works via Bearer token HTTP (no boto3)