Files
mnemosyne/docs/PHASE_2_EMBEDDING_PIPELINE.md

18 KiB
Raw Blame History

Phase 2: Embedding Pipeline

Objective

Build the complete document ingestion and embedding pipeline: upload content → parse (text + images) → chunk (content-type-aware) → embed via configurable model → store vectors in Neo4j → extract concepts for knowledge graph.

Heritage

The embedding pipeline adapts proven patterns from Spelunker's rag/services/embeddings.py — semantic chunking, batch embedding, S3 chunk storage, and progress tracking — enhanced with multimodal capabilities, knowledge graph relationships, and content-type awareness.

Architecture Overview

Upload (API/Admin)
  → S3 Storage (original file)
    → Document Parsing (PyMuPDF — text + images)
      → Content-Type-Aware Chunking (semantic-text-splitter)
        → Text Embedding (system embedding model via LLM Manager)
        → Image Embedding (multimodal model, if available)
          → Neo4j Graph Storage (Chunk nodes, Image nodes, vectors)
            → Concept Extraction (system chat model)
              → Knowledge Graph (Concept nodes, MENTIONS/REFERENCES edges)

Deliverables

1. Document Parsing Service (library/services/parsers.py)

Primary parser: PyMuPDF — a single library handling all document formats with unified text + image extraction.

Supported Formats

Format Extensions Text Extraction Image Extraction
PDF .pdf Layout-preserving text Embedded images, diagrams
EPUB .epub Chapter-structured HTML Cover art, illustrations
DOCX .docx Via HTML conversion Inline images, diagrams
PPTX .pptx Via HTML conversion Slide images, charts
XLSX .xlsx Via HTML conversion Embedded charts
XPS .xps Native Native
MOBI .mobi Native Native
FB2 .fb2 Native Native
CBZ .cbz Native Native (comic pages)
Plain text .txt, .md Direct read N/A
HTML .html, .htm PyMuPDF or direct Inline images
Images .jpg, .png, etc. N/A (OCR future) The image itself

Text Sanitization

Ported from Spelunker's text_utils.py:

  • Remove null bytes and control characters
  • Remove zero-width characters
  • Normalize Unicode to NFC
  • Replace invalid UTF-8 sequences
  • Clean PDF ligatures and artifacts
  • Normalize whitespace

Image Extraction

For each document page/section, extract embedded images via page.get_images()doc.extract_image(xref):

  • Raw image bytes (PNG/JPEG)
  • Dimensions (width × height)
  • Source page/position for chunk-image association
  • Store in S3: images/{item_uid}/{image_index}.{ext}

Parse Result Structure

@dataclass
class TextBlock:
    text: str
    page: int
    metadata: dict  # {heading_level, section_name, etc.}

@dataclass
class ExtractedImage:
    data: bytes
    ext: str        # png, jpg, etc.
    width: int
    height: int
    source_page: int
    source_index: int

@dataclass
class ParseResult:
    text_blocks: list[TextBlock]
    images: list[ExtractedImage]
    metadata: dict  # {page_count, title, author, etc.}
    file_type: str

2. Content-Type-Aware Chunking Service (library/services/chunker.py)

Uses semantic-text-splitter with HuggingFace tokenizer (proven in Spelunker).

Strategy Dispatch

Based on Library.chunking_config:

Strategy Library Type Boundary Markers Chunk Size Overlap
chapter_aware Fiction chapter, scene, paragraph 1024 128
section_aware Technical section, subsection, code_block, list 512 64
song_level Music song, verse, chorus 512 32
scene_level Film scene, act, sequence 768 64
description_level Art artwork, description, analysis 512 32
entry_level Journal entry, date, paragraph 512 32

Chunk-Image Association

Track which images appeared near which text chunks:

  • PDF: image bounding boxes on specific pages
  • DOCX/PPTX: images associated with slides/sections
  • EPUB: images referenced from specific chapters

Creates Chunk -[HAS_NEARBY_IMAGE]-> Image relationships with proximity metadata.

Chunk Storage

  • Chunk text stored in S3: chunks/{item_uid}/chunk_{index}.txt
  • text_preview (first 500 chars) stored on Chunk node for full-text indexing

3. Embedding Client (library/services/embedding_client.py)

Multi-backend embedding client dispatching by LLMApi.api_type.

Backend Support

API Type Protocol Auth Batch Support
openai HTTP POST /embeddings API key header Native batch
vllm HTTP POST /embeddings API key header Native batch
llama-cpp HTTP POST /embeddings API key header Native batch
ollama HTTP POST /embeddings None Native batch
bedrock HTTP POST /model/{id}/invoke Bearer token Client-side loop

Bedrock Integration

Uses Amazon Bedrock API keys (Bearer token auth) — no boto3 SDK required:

POST https://bedrock-runtime.{region}.amazonaws.com/model/{model_id}/invoke
Authorization: Bearer {bedrock_api_key}
Content-Type: application/json

{"inputText": "text to embed", "dimensions": 1024, "normalize": true}
→ {"embedding": [float, ...], "inputTextTokenCount": 42}

LLMApi setup for Bedrock embeddings:

  • api_type: "bedrock"
  • base_url: https://bedrock-runtime.us-east-1.amazonaws.com
  • api_key: Bedrock API key (encrypted)

LLMApi setup for Bedrock chat (Claude, etc.):

  • api_type: "openai" (Mantle endpoint is OpenAI-compatible)
  • base_url: https://bedrock-mantle.us-east-1.api.aws/v1
  • api_key: Same Bedrock API key

Embedding Instruction Prefix

Before embedding, prepend the library's embedding_instruction to each chunk:

"{embedding_instruction}\n\n{chunk_text}"

Image Embedding

For multimodal models (model.supports_multimodal):

  • Send base64-encoded image to the embedding endpoint
  • Create ImageEmbedding node with the resulting vector
  • If no multimodal model available, skip (images stored but not embedded)

Model Matching

Track embedded model by name (not UUID). Multiple APIs can serve the same model — matching by name allows provider switching without re-embedding.

4. Pipeline Orchestrator (library/services/pipeline.py)

Coordinates the full flow: parse → chunk → embed → store → graph.

Pipeline Stages

  1. Parse: Extract text blocks + images from document
  2. Chunk: Split text using content-type-aware strategy
  3. Store chunks: S3 + Chunk nodes in Neo4j
  4. Embed text: Generate vectors for all chunks
  5. Store images: S3 + Image nodes in Neo4j
  6. Embed images: Multimodal vectors (if available)
  7. Extract concepts: Named entities from chunk text (via system chat model)
  8. Build graph: Create Concept nodes, MENTIONS/REFERENCES edges

Idempotency

  • Check Item.content_hash — skip if already processed with same hash
  • Re-embedding deletes existing Chunk/Image nodes before re-processing

Dimension Compatibility

  • Validate that the system embedding model's vector_dimensions matches the Neo4j vector index dimensions
  • Warn at embed time if mismatch detected

5. Concept Extraction (library/services/concepts.py)

Uses the system chat model for LLM-based named entity recognition.

  • Extract: people, places, topics, techniques, themes
  • Create/update Concept nodes (deduplicated by name via unique_index)
  • Connect: Chunk -[MENTIONS]-> Concept, Item -[REFERENCES]-> Concept
  • Embed concept names for vector search
  • If no system chat model configured, concept extraction is skipped

6. Celery Tasks (library/tasks.py)

All tasks pass IDs (not model instances) per Red Panda Standards.

Task Queue Purpose
embed_item(item_uid) embedding Full pipeline for single item
embed_collection(collection_uid) batch All items in a collection
embed_library(library_uid) batch All items in a library
batch_embed_items(item_uids) batch Specific items
reembed_item(item_uid) embedding Delete + re-embed

Tasks are idempotent, include retry logic, and track progress via Memcached: library:task:{task_id}:progress.

7. Prometheus Metrics (library/metrics.py)

Custom metrics for pipeline observability:

Metric Type Labels Purpose
mnemosyne_documents_parsed_total Counter file_type, status Parse throughput
mnemosyne_document_parse_duration_seconds Histogram file_type Parse latency
mnemosyne_images_extracted_total Counter file_type Image extraction volume
mnemosyne_chunks_created_total Counter library_type, strategy Chunk throughput
mnemosyne_chunk_size_tokens Histogram Chunk size distribution
mnemosyne_embeddings_generated_total Counter model_name, api_type, content_type Embedding throughput
mnemosyne_embedding_batch_duration_seconds Histogram model_name, api_type API latency
mnemosyne_embedding_api_errors_total Counter model_name, api_type, error_type API failures
mnemosyne_embedding_tokens_total Counter model_name Token consumption
mnemosyne_pipeline_items_total Counter status Pipeline throughput
mnemosyne_pipeline_item_duration_seconds Histogram End-to-end latency
mnemosyne_pipeline_items_in_progress Gauge Concurrent processing
mnemosyne_concepts_extracted_total Counter concept_type Concept extraction volume

8. Model Changes

Item Node — New Fields

Field Type Purpose
embedding_status StringProperty pending / processing / completed / failed
embedding_model_name StringProperty Name of model that generated embeddings
chunk_count IntegerProperty Number of chunks created
image_count IntegerProperty Number of images extracted
error_message StringProperty Last error message (if failed)

New Relationship Model

class NearbyImageRel(StructuredRel):
    proximity = StringProperty(default="same_page")  # same_page, inline, same_slide, same_chapter

Chunk Node — New Relationship

nearby_images = RelationshipTo('Image', 'HAS_NEARBY_IMAGE', model=NearbyImageRel)

LLMApi Model — New API Type

Add ("bedrock", "Amazon Bedrock") to api_type choices.

9. API Enhancements

  • POST /api/v1/library/items/ — File upload with auto-trigger of embed_item task
  • POST /api/v1/library/items/<uid>/reembed/ — Re-embed endpoint
  • GET /api/v1/library/items/<uid>/status/ — Embedding status check
  • Admin views: File upload field on item create, embedding status display

10. Management Commands

Command Purpose
embed_item <uid> CLI embedding for testing
embed_collection <uid> CLI batch embedding
embedding_status Show embedding progress/statistics

11. Dynamic Vector Index Dimensions

Update setup_neo4j_indexes to read dimensions from LLMModel.get_system_embedding_model().vector_dimensions instead of hardcoding 4096.

Celery Workers & Scheduler

Prerequisites

  • RabbitMQ running on oberon.incus:5672 with mnemosyne vhost and user
  • .env configured with CELERY_BROKER_URL=amqp://mnemosyne:password@oberon.incus:5672/mnemosyne
  • Virtual environment activated: source ~/env/mnemosyne/bin/activate

Queues

Mnemosyne uses three Celery queues with task routing configured in settings.py:

Queue Tasks Purpose Recommended Concurrency
celery (default) llm_manager.validate_all_llm_apis, llm_manager.validate_single_api LLM API validation & model discovery 2
embedding library.tasks.embed_item, library.tasks.reembed_item Single-item embedding pipeline (GPU-bound) 1
batch library.tasks.embed_collection, library.tasks.embed_library, library.tasks.batch_embed_items Batch orchestration (dispatches to embedding queue) 2

Task routing (settings.py):

CELERY_TASK_ROUTES = {
    "library.tasks.embed_*": {"queue": "embedding"},
    "library.tasks.batch_*": {"queue": "batch"},
}

Starting Workers

All commands run from the Django project root (mnemosyne/):

Development — single worker, all queues:

cd mnemosyne
celery -A mnemosyne worker -l info -Q celery,embedding,batch

Development — eager mode (no worker needed):

Set CELERY_TASK_ALWAYS_EAGER=True in .env. All tasks execute synchronously in the web process. Useful for debugging but does not test async behavior.

Production — separate workers per queue:

# Embedding worker (single concurrency — GPU is sequential)
celery -A mnemosyne worker \
    -l info \
    -Q embedding \
    -c 1 \
    -n embedding@%h \
    --max-tasks-per-child=100

# Batch orchestration worker
celery -A mnemosyne worker \
    -l info \
    -Q batch \
    -c 2 \
    -n batch@%h

# Default queue worker (LLM API validation, etc.)
celery -A mnemosyne worker \
    -l info \
    -Q celery \
    -c 2 \
    -n default@%h

Celery Beat (Periodic Scheduler)

Celery Beat runs scheduled tasks (e.g., periodic LLM API validation):

# File-based scheduler (simple, stores schedule in celerybeat-schedule file)
celery -A mnemosyne beat -l info

# Or with Django database scheduler (if django-celery-beat is installed)
celery -A mnemosyne beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

Example periodic task schedule (add to settings.py if needed):

from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    "validate-llm-apis-daily": {
        "task": "llm_manager.validate_all_llm_apis",
        "schedule": crontab(hour=6, minute=0),  # Daily at 6 AM
    },
}

Flower (Task Monitoring)

Flower provides a real-time web UI for monitoring Celery workers and tasks:

celery -A mnemosyne flower --port=5555

Access at http://localhost:5555. Shows:

  • Active/completed/failed tasks
  • Worker status and resource usage
  • Task execution times and retry counts
  • Queue depths

Reliability Configuration

The following settings are already configured in settings.py:

Setting Value Purpose
CELERY_TASK_ACKS_LATE True Acknowledge tasks after execution (not on receipt) — prevents task loss on worker crash
CELERY_WORKER_PREFETCH_MULTIPLIER 1 Workers fetch one task at a time — ensures fair distribution across workers
CELERY_ACCEPT_CONTENT ["json"] Only accept JSON-serialized tasks
CELERY_TASK_SERIALIZER "json" Serialize task arguments as JSON

Task Progress Tracking

Embedding tasks report progress via Memcached using the key pattern:

library:task:{task_id}:progress → {"percent": 45, "message": "Embedded 12/27 chunks"}

Tasks also update Celery's native state:

# Query task progress from Python
from celery.result import AsyncResult
result = AsyncResult(task_id)
result.state    # "PROGRESS", "SUCCESS", "FAILURE"
result.info     # {"percent": 45, "message": "..."}

Dependencies

# New additions to pyproject.toml
"PyMuPDF>=1.24,<2.0",
"pymupdf4llm>=0.0.17,<1.0",
"semantic-text-splitter>=0.20,<1.0",
"tokenizers>=0.20,<1.0",
"Pillow>=10.0,<12.0",
"django-prometheus>=2.3,<3.0",

License Note

PyMuPDF is AGPL-3.0 licensed. Acceptable for self-hosted personal use. Commercial distribution would require Artifex's commercial license.

File Structure

mnemosyne/library/
├── services/
│   ├── __init__.py
│   ├── parsers.py          # PyMuPDF universal document parsing
│   ├── text_utils.py       # Text sanitization (from Spelunker)
│   ├── chunker.py          # Content-type-aware chunking
│   ├── embedding_client.py # Multi-backend embedding API client
│   ├── pipeline.py         # Orchestration: parse → chunk → embed → graph
│   └── concepts.py         # LLM-based concept extraction
├── metrics.py              # Prometheus metrics definitions
├── tasks.py                # Celery tasks for async embedding
├── management/commands/
│   ├── embed_item.py
│   ├── embed_collection.py
│   └── embedding_status.py
└── tests/
    ├── test_parsers.py
    ├── test_text_utils.py
    ├── test_chunker.py
    ├── test_embedding_client.py
    ├── test_pipeline.py
    ├── test_concepts.py
    └── test_tasks.py

Testing Strategy

All tests use Django TestCase. External services (LLM APIs, Neo4j) are mocked.

Test File Scope
test_parsers.py PyMuPDF parsing for each file type, image extraction, text sanitization
test_text_utils.py Sanitization functions, PDF artifact cleaning, Unicode normalization
test_chunker.py Content-type strategies, boundary detection, chunk-image association
test_embedding_client.py OpenAI-compat + Bedrock backends (mocked HTTP), batch processing, usage tracking
test_pipeline.py Full pipeline integration (mocked), S3 storage, idempotency
test_concepts.py Concept extraction, deduplication, graph relationships
test_tasks.py Celery tasks (eager mode), retry logic, error handling

Success Criteria

  • Upload a document (PDF, EPUB, DOCX, PPTX, TXT) via API or admin → file stored in S3
  • Images extracted from documents and stored as Image nodes in Neo4j
  • Document automatically chunked using content-type-aware strategy
  • Chunks embedded via system embedding model and vectors stored in Neo4j Chunk nodes
  • Images embedded multimodally into ImageEmbedding nodes (when multimodal model available)
  • Chunk-image proximity relationships established in graph
  • Concepts extracted and graph populated with MENTIONS/REFERENCES relationships
  • Neo4j vector indexes usable for similarity queries on stored embeddings
  • Celery tasks handle async embedding with progress tracking
  • Re-embedding works (delete old chunks, re-process)
  • Content hash prevents redundant re-embedding
  • Prometheus metrics exposed at /metrics for pipeline monitoring
  • All tests pass with mocked LLM/embedding APIs
  • Bedrock embedding works via Bearer token HTTP (no boto3)