Files
mnemosyne/docs/PHASE_2_EMBEDDING_PIPELINE.md

499 lines
18 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Phase 2: Embedding Pipeline
## Objective
Build the complete document ingestion and embedding pipeline: upload content → parse (text + images) → chunk (content-type-aware) → embed via configurable model → store vectors in Neo4j → extract concepts for knowledge graph.
## Heritage
The embedding pipeline adapts proven patterns from [Spelunker](https://git.helu.ca/r/spelunker)'s `rag/services/embeddings.py` — semantic chunking, batch embedding, S3 chunk storage, and progress tracking — enhanced with multimodal capabilities, knowledge graph relationships, and content-type awareness.
## Architecture Overview
```
Upload (API/Admin)
→ S3 Storage (original file)
→ Document Parsing (PyMuPDF — text + images)
→ Content-Type-Aware Chunking (semantic-text-splitter)
→ Text Embedding (system embedding model via LLM Manager)
→ Image Embedding (multimodal model, if available)
→ Neo4j Graph Storage (Chunk nodes, Image nodes, vectors)
→ Concept Extraction (system chat model)
→ Knowledge Graph (Concept nodes, MENTIONS/REFERENCES edges)
```
## Deliverables
### 1. Document Parsing Service (`library/services/parsers.py`)
**Primary parser: PyMuPDF** — a single library handling all document formats with unified text + image extraction.
#### Supported Formats
| Format | Extensions | Text Extraction | Image Extraction |
|--------|-----------|----------------|-----------------|
| PDF | `.pdf` | Layout-preserving text | Embedded images, diagrams |
| EPUB | `.epub` | Chapter-structured HTML | Cover art, illustrations |
| DOCX | `.docx` | Via HTML conversion | Inline images, diagrams |
| PPTX | `.pptx` | Via HTML conversion | Slide images, charts |
| XLSX | `.xlsx` | Via HTML conversion | Embedded charts |
| XPS | `.xps` | Native | Native |
| MOBI | `.mobi` | Native | Native |
| FB2 | `.fb2` | Native | Native |
| CBZ | `.cbz` | Native | Native (comic pages) |
| Plain text | `.txt`, `.md` | Direct read | N/A |
| HTML | `.html`, `.htm` | PyMuPDF or direct | Inline images |
| Images | `.jpg`, `.png`, etc. | N/A (OCR future) | The image itself |
#### Text Sanitization
Ported from Spelunker's `text_utils.py`:
- Remove null bytes and control characters
- Remove zero-width characters
- Normalize Unicode to NFC
- Replace invalid UTF-8 sequences
- Clean PDF ligatures and artifacts
- Normalize whitespace
#### Image Extraction
For each document page/section, extract embedded images via `page.get_images()``doc.extract_image(xref)`:
- Raw image bytes (PNG/JPEG)
- Dimensions (width × height)
- Source page/position for chunk-image association
- Store in S3: `images/{item_uid}/{image_index}.{ext}`
#### Parse Result Structure
```python
@dataclass
class TextBlock:
text: str
page: int
metadata: dict # {heading_level, section_name, etc.}
@dataclass
class ExtractedImage:
data: bytes
ext: str # png, jpg, etc.
width: int
height: int
source_page: int
source_index: int
@dataclass
class ParseResult:
text_blocks: list[TextBlock]
images: list[ExtractedImage]
metadata: dict # {page_count, title, author, etc.}
file_type: str
```
### 2. Content-Type-Aware Chunking Service (`library/services/chunker.py`)
Uses `semantic-text-splitter` with HuggingFace tokenizer (proven in Spelunker).
#### Strategy Dispatch
Based on `Library.chunking_config`:
| Strategy | Library Type | Boundary Markers | Chunk Size | Overlap |
|----------|-------------|-----------------|-----------|---------|
| `chapter_aware` | Fiction | chapter, scene, paragraph | 1024 | 128 |
| `section_aware` | Technical | section, subsection, code_block, list | 512 | 64 |
| `song_level` | Music | song, verse, chorus | 512 | 32 |
| `scene_level` | Film | scene, act, sequence | 768 | 64 |
| `description_level` | Art | artwork, description, analysis | 512 | 32 |
| `entry_level` | Journal | entry, date, paragraph | 512 | 32 |
#### Chunk-Image Association
Track which images appeared near which text chunks:
- PDF: image bounding boxes on specific pages
- DOCX/PPTX: images associated with slides/sections
- EPUB: images referenced from specific chapters
Creates `Chunk -[HAS_NEARBY_IMAGE]-> Image` relationships with proximity metadata.
#### Chunk Storage
- Chunk text stored in S3: `chunks/{item_uid}/chunk_{index}.txt`
- `text_preview` (first 500 chars) stored on Chunk node for full-text indexing
### 3. Embedding Client (`library/services/embedding_client.py`)
Multi-backend embedding client dispatching by `LLMApi.api_type`.
#### Backend Support
| API Type | Protocol | Auth | Batch Support |
|----------|---------|------|---------------|
| `openai` | HTTP POST `/embeddings` | API key header | Native batch |
| `vllm` | HTTP POST `/embeddings` | API key header | Native batch |
| `llama-cpp` | HTTP POST `/embeddings` | API key header | Native batch |
| `ollama` | HTTP POST `/embeddings` | None | Native batch |
| `bedrock` | HTTP POST `/model/{id}/invoke` | Bearer token | Client-side loop |
#### Bedrock Integration
Uses Amazon Bedrock API keys (Bearer token auth) — no boto3 SDK required:
```
POST https://bedrock-runtime.{region}.amazonaws.com/model/{model_id}/invoke
Authorization: Bearer {bedrock_api_key}
Content-Type: application/json
{"inputText": "text to embed", "dimensions": 1024, "normalize": true}
→ {"embedding": [float, ...], "inputTextTokenCount": 42}
```
**LLMApi setup for Bedrock embeddings:**
- `api_type`: `"bedrock"`
- `base_url`: `https://bedrock-runtime.us-east-1.amazonaws.com`
- `api_key`: Bedrock API key (encrypted)
**LLMApi setup for Bedrock chat (Claude, etc.):**
- `api_type`: `"openai"` (Mantle endpoint is OpenAI-compatible)
- `base_url`: `https://bedrock-mantle.us-east-1.api.aws/v1`
- `api_key`: Same Bedrock API key
#### Embedding Instruction Prefix
Before embedding, prepend the library's `embedding_instruction` to each chunk:
```
"{embedding_instruction}\n\n{chunk_text}"
```
#### Image Embedding
For multimodal models (`model.supports_multimodal`):
- Send base64-encoded image to the embedding endpoint
- Create `ImageEmbedding` node with the resulting vector
- If no multimodal model available, skip (images stored but not embedded)
#### Model Matching
Track embedded model by **name** (not UUID). Multiple APIs can serve the same model — matching by name allows provider switching without re-embedding.
### 4. Pipeline Orchestrator (`library/services/pipeline.py`)
Coordinates the full flow: parse → chunk → embed → store → graph.
#### Pipeline Stages
1. **Parse**: Extract text blocks + images from document
2. **Chunk**: Split text using content-type-aware strategy
3. **Store chunks**: S3 + Chunk nodes in Neo4j
4. **Embed text**: Generate vectors for all chunks
5. **Store images**: S3 + Image nodes in Neo4j
6. **Embed images**: Multimodal vectors (if available)
7. **Extract concepts**: Named entities from chunk text (via system chat model)
8. **Build graph**: Create Concept nodes, MENTIONS/REFERENCES edges
#### Idempotency
- Check `Item.content_hash` — skip if already processed with same hash
- Re-embedding deletes existing Chunk/Image nodes before re-processing
#### Dimension Compatibility
- Validate that the system embedding model's `vector_dimensions` matches the Neo4j vector index dimensions
- Warn at embed time if mismatch detected
### 5. Concept Extraction (`library/services/concepts.py`)
Uses the system chat model for LLM-based named entity recognition.
- Extract: people, places, topics, techniques, themes
- Create/update `Concept` nodes (deduplicated by name via unique_index)
- Connect: `Chunk -[MENTIONS]-> Concept`, `Item -[REFERENCES]-> Concept`
- Embed concept names for vector search
- If no system chat model configured, concept extraction is skipped
### 6. Celery Tasks (`library/tasks.py`)
All tasks pass IDs (not model instances) per Red Panda Standards.
| Task | Queue | Purpose |
|------|-------|---------|
| `embed_item(item_uid)` | `embedding` | Full pipeline for single item |
| `embed_collection(collection_uid)` | `batch` | All items in a collection |
| `embed_library(library_uid)` | `batch` | All items in a library |
| `batch_embed_items(item_uids)` | `batch` | Specific items |
| `reembed_item(item_uid)` | `embedding` | Delete + re-embed |
Tasks are idempotent, include retry logic, and track progress via Memcached: `library:task:{task_id}:progress`.
### 7. Prometheus Metrics (`library/metrics.py`)
Custom metrics for pipeline observability:
| Metric | Type | Labels | Purpose |
|--------|------|--------|---------|
| `mnemosyne_documents_parsed_total` | Counter | file_type, status | Parse throughput |
| `mnemosyne_document_parse_duration_seconds` | Histogram | file_type | Parse latency |
| `mnemosyne_images_extracted_total` | Counter | file_type | Image extraction volume |
| `mnemosyne_chunks_created_total` | Counter | library_type, strategy | Chunk throughput |
| `mnemosyne_chunk_size_tokens` | Histogram | — | Chunk size distribution |
| `mnemosyne_embeddings_generated_total` | Counter | model_name, api_type, content_type | Embedding throughput |
| `mnemosyne_embedding_batch_duration_seconds` | Histogram | model_name, api_type | API latency |
| `mnemosyne_embedding_api_errors_total` | Counter | model_name, api_type, error_type | API failures |
| `mnemosyne_embedding_tokens_total` | Counter | model_name | Token consumption |
| `mnemosyne_pipeline_items_total` | Counter | status | Pipeline throughput |
| `mnemosyne_pipeline_item_duration_seconds` | Histogram | — | End-to-end latency |
| `mnemosyne_pipeline_items_in_progress` | Gauge | — | Concurrent processing |
| `mnemosyne_concepts_extracted_total` | Counter | concept_type | Concept extraction volume |
### 8. Model Changes
#### Item Node — New Fields
| Field | Type | Purpose |
|-------|------|---------|
| `embedding_status` | StringProperty | pending / processing / completed / failed |
| `embedding_model_name` | StringProperty | Name of model that generated embeddings |
| `chunk_count` | IntegerProperty | Number of chunks created |
| `image_count` | IntegerProperty | Number of images extracted |
| `error_message` | StringProperty | Last error message (if failed) |
#### New Relationship Model
```python
class NearbyImageRel(StructuredRel):
proximity = StringProperty(default="same_page") # same_page, inline, same_slide, same_chapter
```
#### Chunk Node — New Relationship
```python
nearby_images = RelationshipTo('Image', 'HAS_NEARBY_IMAGE', model=NearbyImageRel)
```
#### LLMApi Model — New API Type
Add `("bedrock", "Amazon Bedrock")` to `api_type` choices.
### 9. API Enhancements
- `POST /api/v1/library/items/` — File upload with auto-trigger of `embed_item` task
- `POST /api/v1/library/items/<uid>/reembed/` — Re-embed endpoint
- `GET /api/v1/library/items/<uid>/status/` — Embedding status check
- Admin views: File upload field on item create, embedding status display
### 10. Management Commands
| Command | Purpose |
|---------|---------|
| `embed_item <uid>` | CLI embedding for testing |
| `embed_collection <uid>` | CLI batch embedding |
| `embedding_status` | Show embedding progress/statistics |
### 11. Dynamic Vector Index Dimensions
Update `setup_neo4j_indexes` to read dimensions from `LLMModel.get_system_embedding_model().vector_dimensions` instead of hardcoding 4096.
## Celery Workers & Scheduler
### Prerequisites
- RabbitMQ running on `oberon.incus:5672` with `mnemosyne` vhost and user
- `.env` configured with `CELERY_BROKER_URL=amqp://mnemosyne:password@oberon.incus:5672/mnemosyne`
- Virtual environment activated: `source ~/env/mnemosyne/bin/activate`
### Queues
Mnemosyne uses three Celery queues with task routing configured in `settings.py`:
| Queue | Tasks | Purpose | Recommended Concurrency |
|-------|-------|---------|------------------------|
| `celery` (default) | `llm_manager.validate_all_llm_apis`, `llm_manager.validate_single_api` | LLM API validation & model discovery | 2 |
| `embedding` | `library.tasks.embed_item`, `library.tasks.reembed_item` | Single-item embedding pipeline (GPU-bound) | 1 |
| `batch` | `library.tasks.embed_collection`, `library.tasks.embed_library`, `library.tasks.batch_embed_items` | Batch orchestration (dispatches to embedding queue) | 2 |
Task routing (`settings.py`):
```python
CELERY_TASK_ROUTES = {
"library.tasks.embed_*": {"queue": "embedding"},
"library.tasks.batch_*": {"queue": "batch"},
}
```
### Starting Workers
All commands run from the Django project root (`mnemosyne/`):
**Development — single worker, all queues:**
```bash
cd mnemosyne
celery -A mnemosyne worker -l info -Q celery,embedding,batch
```
**Development — eager mode (no worker needed):**
Set `CELERY_TASK_ALWAYS_EAGER=True` in `.env`. All tasks execute synchronously in the web process. Useful for debugging but does not test async behavior.
**Production — separate workers per queue:**
```bash
# Embedding worker (single concurrency — GPU is sequential)
celery -A mnemosyne worker \
-l info \
-Q embedding \
-c 1 \
-n embedding@%h \
--max-tasks-per-child=100
# Batch orchestration worker
celery -A mnemosyne worker \
-l info \
-Q batch \
-c 2 \
-n batch@%h
# Default queue worker (LLM API validation, etc.)
celery -A mnemosyne worker \
-l info \
-Q celery \
-c 2 \
-n default@%h
```
### Celery Beat (Periodic Scheduler)
Celery Beat runs scheduled tasks (e.g., periodic LLM API validation):
```bash
# File-based scheduler (simple, stores schedule in celerybeat-schedule file)
celery -A mnemosyne beat -l info
# Or with Django database scheduler (if django-celery-beat is installed)
celery -A mnemosyne beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
```
Example periodic task schedule (add to `settings.py` if needed):
```python
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"validate-llm-apis-daily": {
"task": "llm_manager.validate_all_llm_apis",
"schedule": crontab(hour=6, minute=0), # Daily at 6 AM
},
}
```
### Flower (Task Monitoring)
[Flower](https://flower.readthedocs.io/) provides a real-time web UI for monitoring Celery workers and tasks:
```bash
celery -A mnemosyne flower --port=5555
```
Access at `http://localhost:5555`. Shows:
- Active/completed/failed tasks
- Worker status and resource usage
- Task execution times and retry counts
- Queue depths
### Reliability Configuration
The following settings are already configured in `settings.py`:
| Setting | Value | Purpose |
|---------|-------|---------|
| `CELERY_TASK_ACKS_LATE` | `True` | Acknowledge tasks after execution (not on receipt) — prevents task loss on worker crash |
| `CELERY_WORKER_PREFETCH_MULTIPLIER` | `1` | Workers fetch one task at a time — ensures fair distribution across workers |
| `CELERY_ACCEPT_CONTENT` | `["json"]` | Only accept JSON-serialized tasks |
| `CELERY_TASK_SERIALIZER` | `"json"` | Serialize task arguments as JSON |
### Task Progress Tracking
Embedding tasks report progress via Memcached using the key pattern:
```
library:task:{task_id}:progress → {"percent": 45, "message": "Embedded 12/27 chunks"}
```
Tasks also update Celery's native state:
```python
# Query task progress from Python
from celery.result import AsyncResult
result = AsyncResult(task_id)
result.state # "PROGRESS", "SUCCESS", "FAILURE"
result.info # {"percent": 45, "message": "..."}
```
## Dependencies
```toml
# New additions to pyproject.toml
"PyMuPDF>=1.24,<2.0",
"pymupdf4llm>=0.0.17,<1.0",
"semantic-text-splitter>=0.20,<1.0",
"tokenizers>=0.20,<1.0",
"Pillow>=10.0,<12.0",
"django-prometheus>=2.3,<3.0",
```
### License Note
PyMuPDF is AGPL-3.0 licensed. Acceptable for self-hosted personal use. Commercial distribution would require Artifex's commercial license.
## File Structure
```
mnemosyne/library/
├── services/
│ ├── __init__.py
│ ├── parsers.py # PyMuPDF universal document parsing
│ ├── text_utils.py # Text sanitization (from Spelunker)
│ ├── chunker.py # Content-type-aware chunking
│ ├── embedding_client.py # Multi-backend embedding API client
│ ├── pipeline.py # Orchestration: parse → chunk → embed → graph
│ └── concepts.py # LLM-based concept extraction
├── metrics.py # Prometheus metrics definitions
├── tasks.py # Celery tasks for async embedding
├── management/commands/
│ ├── embed_item.py
│ ├── embed_collection.py
│ └── embedding_status.py
└── tests/
├── test_parsers.py
├── test_text_utils.py
├── test_chunker.py
├── test_embedding_client.py
├── test_pipeline.py
├── test_concepts.py
└── test_tasks.py
```
## Testing Strategy
All tests use Django `TestCase`. External services (LLM APIs, Neo4j) are mocked.
| Test File | Scope |
|-----------|-------|
| `test_parsers.py` | PyMuPDF parsing for each file type, image extraction, text sanitization |
| `test_text_utils.py` | Sanitization functions, PDF artifact cleaning, Unicode normalization |
| `test_chunker.py` | Content-type strategies, boundary detection, chunk-image association |
| `test_embedding_client.py` | OpenAI-compat + Bedrock backends (mocked HTTP), batch processing, usage tracking |
| `test_pipeline.py` | Full pipeline integration (mocked), S3 storage, idempotency |
| `test_concepts.py` | Concept extraction, deduplication, graph relationships |
| `test_tasks.py` | Celery tasks (eager mode), retry logic, error handling |
## Success Criteria
- [ ] Upload a document (PDF, EPUB, DOCX, PPTX, TXT) via API or admin → file stored in S3
- [ ] Images extracted from documents and stored as Image nodes in Neo4j
- [ ] Document automatically chunked using content-type-aware strategy
- [ ] Chunks embedded via system embedding model and vectors stored in Neo4j Chunk nodes
- [ ] Images embedded multimodally into ImageEmbedding nodes (when multimodal model available)
- [ ] Chunk-image proximity relationships established in graph
- [ ] Concepts extracted and graph populated with MENTIONS/REFERENCES relationships
- [ ] Neo4j vector indexes usable for similarity queries on stored embeddings
- [ ] Celery tasks handle async embedding with progress tracking
- [ ] Re-embedding works (delete old chunks, re-process)
- [ ] Content hash prevents redundant re-embedding
- [x] Prometheus metrics exposed at `/metrics` for pipeline monitoring
- [ ] All tests pass with mocked LLM/embedding APIs
- [ ] Bedrock embedding works via Bearer token HTTP (no boto3)