feat(library): add business + finance types, workspace_id, IngestJob

Adds two new content-type-aware library types — `business` for
proposals/marketing/strategy (used by the work-team agents) and `finance`
for statements/tax/market commentary (used by Garth). Each ships with
chunking config, embedding/reranker instructions, an LLM-context prompt
that forbids fabricating financial figures, and a vision prompt.

Adds a unique-indexed `workspace_id` property to `Library` so a node
can be scoped to a Daedalus workspace. Null means a global library;
non-null means workspace-scoped. Search Cypher (added in a later
commit) enforces the boundary.

Adds an `IngestJob` Django ORM model — separate from neomodel — that
tracks asynchronous ingestion lifecycle (Daedalus → S3 → Celery →
embedding pipeline) with idempotency on (library, source_ref, hash).
Migration 0001_initial creates the table.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-29 06:26:17 -04:00
parent 81426327bf
commit 33658fbc8d
4 changed files with 228 additions and 8 deletions

View File

@@ -1,11 +1,16 @@
"""
Neo4j graph models for the Mnemosyne content library.
Models for the Mnemosyne content library.
All content data (libraries, collections, items, chunks, concepts, images)
lives in Neo4j as a knowledge graph. These models use neomodel's StructuredNode
OGM — they do NOT participate in Django's ORM or migrations.
Most content (libraries, collections, items, chunks, concepts, images)
lives in Neo4j as a knowledge graph via neomodel StructuredNode. These do
NOT participate in Django's ORM or migrations.
The IngestJob model at the bottom of this file is the exception: it tracks
the lifecycle of asynchronous ingestion requests (file → embedding pipeline)
in PostgreSQL via Django's ORM.
"""
from django.db import models
from neomodel import (
ArrayProperty,
DateTimeProperty,
@@ -50,8 +55,14 @@ class Library(StructuredNode):
"""
Top-level container representing a content library.
Each library has a type (fiction, technical, music, film, art, journal)
that drives chunking strategy, embedding instructions, and LLM prompts.
Each library has a type (fiction, nonfiction, technical, music, film,
art, journal, business, finance) that drives chunking strategy,
embedding instructions, and LLM prompts.
A library may be either *global* (workspace_id is null — searchable
across the whole instance) or *workspace-scoped* (workspace_id set —
visible only to agents inside that Daedalus workspace). Scoping is
enforced structurally by every search query.
"""
uid = UniqueIdProperty()
@@ -66,10 +77,16 @@ class Library(StructuredNode):
"film": "Film",
"art": "Art",
"journal": "Journal",
"business": "Business",
"finance": "Finance",
},
)
description = StringProperty(default="")
# Daedalus workspace UUID this library is scoped to. Null for global
# libraries. Unique-indexed so a workspace cannot have two libraries.
workspace_id = StringProperty(unique_index=True, required=False)
# Content-type configuration
chunking_config = JSONProperty(default={})
embedding_instruction = StringProperty(default="")
@@ -270,3 +287,78 @@ class ImageEmbedding(StructuredNode):
def __str__(self):
return f"ImageEmbedding ({self.uid})"
# --- Django ORM models (PostgreSQL) ---
class IngestJob(models.Model):
"""
Tracks the lifecycle of an asynchronous ingestion + embedding job.
Created when an external client (e.g. Daedalus) posts a file via the
REST ingest API. The Celery worker reads and updates this row as the
job moves through fetch / chunk / embed / graph stages.
Idempotency: a (library, source_ref, content_hash) triple uniquely
identifies a piece of content. A second POST with the same triple
returns the existing job; a POST with the same source_ref but a new
content_hash supersedes the prior Item.
"""
STATUS_CHOICES = [
("pending", "Pending"),
("processing", "Processing"),
("completed", "Completed"),
("failed", "Failed"),
]
id = models.CharField(max_length=64, primary_key=True)
item_uid = models.CharField(max_length=64, db_index=True, blank=True)
library_uid = models.CharField(max_length=64, db_index=True)
celery_task_id = models.CharField(max_length=255, blank=True)
status = models.CharField(
max_length=20,
choices=STATUS_CHOICES,
default="pending",
db_index=True,
)
progress = models.CharField(max_length=50, default="queued")
error = models.TextField(blank=True, null=True)
retry_count = models.PositiveIntegerField(default=0)
chunks_created = models.PositiveIntegerField(default=0)
concepts_extracted = models.PositiveIntegerField(default=0)
embedding_model = models.CharField(max_length=100, blank=True)
# The file's content hash (sha256). Used for idempotency: a second
# ingest with the same source_ref + same hash is a no-op; a second
# ingest with the same source_ref + different hash supersedes.
content_hash = models.CharField(max_length=64, db_index=True, blank=True)
# Where the file came from. For Daedalus: source="daedalus",
# source_ref="<workspace_id>/<file_id>".
source = models.CharField(max_length=50, default="")
source_ref = models.CharField(max_length=200, blank=True, db_index=True)
s3_key = models.CharField(max_length=500)
# Optional metadata carried forward to the Item node.
title = models.CharField(max_length=500, blank=True)
file_type = models.CharField(max_length=50, blank=True)
file_size = models.PositiveBigIntegerField(default=0)
collection_uid = models.CharField(max_length=64, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ["-created_at"]
indexes = [
models.Index(fields=["status", "-created_at"]),
models.Index(fields=["source", "source_ref"]),
]
def __str__(self):
return f"IngestJob {self.id} [{self.status}]"