Files
ouranos/utils/neo4j-personal-schema-init.py
Robert Helewka b4d60f2f38 docs: rewrite README with structured overview and quick start guide
Replaces the minimal project description with a comprehensive README
including a component overview table, quick start instructions, common
Ansible operations, and links to detailed documentation. Aligns with
Red Panda Approval™ standards.
2026-03-03 12:49:06 +00:00

588 lines
25 KiB
Python

"""
Neo4j Life Graph Schema Initialization
=======================================
Creates the foundational schema for a personal knowledge graph used by
seven AI assistants: Hypatia, Marcus, Seneca, Nate, Bowie, Bourdain, Cousteau
Requirements:
pip install neo4j
Usage:
python neo4j-personal-schema-init.py
python neo4j-personal-schema-init.py --uri bolt://ariel.incus:7687
python neo4j-personal-schema-init.py --test-only
Environment Variables (optional):
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
NEO4J_USER - Username (default: neo4j)
NEO4J_PASSWORD - Password (will prompt if not set)
"""
import argparse
import getpass
import os
import sys
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LifeGraphSchema:
def __init__(self, uri, user, password):
"""Initialize connection to Neo4j database"""
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.uri = uri
def close(self):
"""Close the database connection"""
self.driver.close()
def verify_connection(self):
"""
Verify the connection to Neo4j is working.
Returns True if successful, raises exception otherwise.
"""
with self.driver.session() as session:
result = session.run("RETURN 1 AS test")
record = result.single()
if record and record["test"] == 1:
logger.info(f"✓ Connected to Neo4j at {self.uri}")
return True
raise ConnectionError("Failed to verify Neo4j connection")
def create_constraints(self):
"""
Create uniqueness constraints on key node properties.
This ensures data integrity and creates indexes automatically.
"""
constraints = [
# Core entities
"CREATE CONSTRAINT person_id IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE",
"CREATE CONSTRAINT location_id IF NOT EXISTS FOR (l:Location) REQUIRE l.id IS UNIQUE",
"CREATE CONSTRAINT event_id IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE",
# Media types (Bowie, Bourdain, Hypatia domains)
"CREATE CONSTRAINT book_id IF NOT EXISTS FOR (b:Book) REQUIRE b.id IS UNIQUE",
"CREATE CONSTRAINT film_id IF NOT EXISTS FOR (f:Film) REQUIRE f.id IS UNIQUE",
"CREATE CONSTRAINT music_id IF NOT EXISTS FOR (m:Music) REQUIRE m.id IS UNIQUE",
"CREATE CONSTRAINT recipe_id IF NOT EXISTS FOR (r:Recipe) REQUIRE r.id IS UNIQUE",
# Activity/Practice nodes
"CREATE CONSTRAINT training_id IF NOT EXISTS FOR (t:Training) REQUIRE t.id IS UNIQUE",
"CREATE CONSTRAINT trip_id IF NOT EXISTS FOR (t:Trip) REQUIRE t.id IS UNIQUE",
"CREATE CONSTRAINT reflection_id IF NOT EXISTS FOR (r:Reflection) REQUIRE r.id IS UNIQUE",
# Knowledge/Learning (Hypatia domain)
"CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (t:Topic) REQUIRE t.id IS UNIQUE",
"CREATE CONSTRAINT concept_id IF NOT EXISTS FOR (c:Concept) REQUIRE c.id IS UNIQUE",
# Nature (Cousteau domain)
"CREATE CONSTRAINT species_id IF NOT EXISTS FOR (s:Species) REQUIRE s.id IS UNIQUE",
"CREATE CONSTRAINT plant_id IF NOT EXISTS FOR (p:Plant) REQUIRE p.id IS UNIQUE",
]
with self.driver.session() as session:
for constraint in constraints:
try:
session.run(constraint)
logger.info(f"Created constraint: {constraint.split('FOR')[1].split('REQUIRE')[0].strip()}")
except Exception as e:
logger.warning(f"Constraint may already exist: {e}")
def create_indexes(self):
"""
Create indexes for frequently queried properties.
These improve query performance for searches and filters.
"""
indexes = [
# Text search indexes
"CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.name)",
"CREATE INDEX location_name IF NOT EXISTS FOR (l:Location) ON (l.name)",
"CREATE INDEX book_title IF NOT EXISTS FOR (b:Book) ON (b.title)",
"CREATE INDEX film_title IF NOT EXISTS FOR (f:Film) ON (f.title)",
"CREATE INDEX music_title IF NOT EXISTS FOR (m:Music) ON (m.title)",
"CREATE INDEX recipe_name IF NOT EXISTS FOR (r:Recipe) ON (r.name)",
# Date-based indexes for temporal queries
"CREATE INDEX event_date IF NOT EXISTS FOR (e:Event) ON (e.date)",
"CREATE INDEX training_date IF NOT EXISTS FOR (t:Training) ON (t.date)",
"CREATE INDEX trip_start IF NOT EXISTS FOR (t:Trip) ON (t.start_date)",
"CREATE INDEX reflection_date IF NOT EXISTS FOR (r:Reflection) ON (r.date)",
# Category/type indexes for filtering
"CREATE INDEX event_type IF NOT EXISTS FOR (e:Event) ON (e.type)",
"CREATE INDEX location_category IF NOT EXISTS FOR (l:Location) ON (l.category)",
"CREATE INDEX music_genre IF NOT EXISTS FOR (m:Music) ON (m.genre)",
]
with self.driver.session() as session:
for index in indexes:
try:
session.run(index)
logger.info(f"Created index: {index.split('FOR')[1].split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Index may already exist: {e}")
def verify_schema(self):
"""
Verify that constraints and indexes were created successfully.
Returns a dict with counts and status.
"""
results = {"constraints": 0, "indexes": 0, "nodes": 0, "success": True}
with self.driver.session() as session:
# Count constraints
constraint_result = session.run("SHOW CONSTRAINTS")
constraints = list(constraint_result)
results["constraints"] = len(constraints)
# Count indexes (excluding constraint-created ones)
index_result = session.run("SHOW INDEXES WHERE type = 'RANGE'")
indexes = list(index_result)
results["indexes"] = len(indexes)
# Count nodes
node_result = session.run("MATCH (n) RETURN count(n) AS count")
results["nodes"] = node_result.single()["count"]
return results
def run_tests(self, include_schema_tests=True):
"""
Run comprehensive tests to verify schema and APOC functionality.
Returns True if all tests pass, False otherwise.
Args:
include_schema_tests: If True, also verify constraints/indexes exist
"""
tests_passed = 0
tests_failed = 0
test_cases = [
("Connection test", "RETURN 1 AS result", lambda r: r.single()["result"] == 1),
("APOC available", "RETURN apoc.version() AS version", lambda r: r.single()["version"] is not None),
("Create test node",
"CREATE (t:_Test {id: 'test_' + toString(timestamp())}) RETURN t.id AS id",
lambda r: r.single()["id"] is not None),
("Query test node",
"MATCH (t:_Test) RETURN count(t) AS count",
lambda r: r.single()["count"] >= 1),
("APOC collection functions",
"RETURN apoc.coll.sum([1,2,3]) AS total",
lambda r: r.single()["total"] == 6),
("APOC date functions",
"RETURN apoc.date.format(timestamp(), 'ms', 'yyyy-MM-dd') AS today",
lambda r: len(r.single()["today"]) == 10),
]
# Schema-specific tests (only run after schema creation)
schema_tests = [
("Constraint exists (Person)",
"SHOW CONSTRAINTS WHERE name = 'person_id'",
lambda r: len(list(r)) == 1),
("Index exists (person_name)",
"SHOW INDEXES WHERE name = 'person_name'",
lambda r: len(list(r)) == 1),
]
if include_schema_tests:
test_cases.extend(schema_tests)
logger.info("\n" + "=" * 60)
logger.info("RUNNING SCHEMA VERIFICATION TESTS")
logger.info("=" * 60)
with self.driver.session() as session:
for test_name, query, validator in test_cases:
try:
result = session.run(query)
if validator(result):
logger.info(f"{test_name}")
tests_passed += 1
else:
logger.error(f"{test_name} - Validation failed")
tests_failed += 1
except Exception as e:
logger.error(f"{test_name} - {e}")
tests_failed += 1
# Cleanup test nodes
try:
session.run("MATCH (t:_Test) DELETE t")
logger.info(" ✓ Cleanup test nodes")
except Exception as e:
logger.warning(f" ⚠ Cleanup failed: {e}")
logger.info("=" * 60)
logger.info(f"Tests: {tests_passed} passed, {tests_failed} failed")
logger.info("=" * 60 + "\n")
return tests_failed == 0
def create_sample_nodes(self):
"""
Create sample nodes to demonstrate the schema.
Replace this with your actual data import logic.
"""
queries = [
# Central person node (you)
"""
MERGE (p:Person {id: 'user_main'})
SET p.name = 'Main User',
p.relationship_type = 'self',
p.created_at = datetime()
""",
# Sample interest/preference
"""
MERGE (i:Interest {id: 'interest_cooking'})
SET i.category = 'culinary',
i.name = 'Cooking',
i.intensity = 'high',
i.notes = 'Especially interested in techniques and cultural context'
""",
# Sample location
"""
MERGE (l:Location {id: 'location_costarica'})
SET l.name = 'Costa Rica',
l.country = 'Costa Rica',
l.category = 'travel_destination',
l.notes = 'Planning future trip'
""",
]
with self.driver.session() as session:
for query in queries:
session.run(query)
logger.info("Created sample nodes")
def document_schema(self):
"""
Document the schema design for reference.
This prints the node types and their intended use by each assistant.
"""
schema_doc = """
════════════════════════════════════════════════════════════════
LIFE GRAPH SCHEMA - NODE TYPES AND ASSISTANT RESPONSIBILITIES
════════════════════════════════════════════════════════════════
CORE NODES (Used by all assistants):
────────────────────────────────────────────────────────────────
Person - People in your life (family, friends, contacts)
Properties: name, relationship_type, birthday,
contact_info, notes
Location - Places (home, travel, favorites)
Properties: name, city, country, coordinates,
category, notes
Event - Life events (vacations, gatherings, milestones)
Properties: name, date, location, description, type
Interest - Preferences, hobbies, goals
Properties: category, name, intensity, notes
════════════════════════════════════════════════════════════════
HYPATIA (Learning & Knowledge):
────────────────────────────────────────────────────────────────
Book - Books read or to-read
Properties: title, author, isbn, status, rating,
date_started, date_finished, notes
Topic - Subject areas of study
Properties: name, field, depth, resources
Concept - Ideas and principles learned
Properties: name, definition, examples, connections
════════════════════════════════════════════════════════════════
MARCUS (Fitness & Training):
────────────────────────────────────────────────────────────────
Training - Individual workout sessions
Properties: date, type, duration, exercises,
volume, intensity, notes, feeling
Exercise - Specific movements/activities
Properties: name, category, equipment,
target_muscles, technique_notes
════════════════════════════════════════════════════════════════
SENECA (Reflection & Wellness):
────────────────────────────────────────────────────────────────
Reflection - Journal entries and insights
Properties: date, content, mood, themes,
insights, questions
Goal - Life objectives and aspirations
Properties: name, category, timeline, status,
progress, reflections
════════════════════════════════════════════════════════════════
NATE (Travel & Adventure):
────────────────────────────────────────────────────────────────
Trip - Travel plans and experiences
Properties: name, start_date, end_date,
destinations, purpose, budget, highlights
Activity - Things to do at destinations
Properties: name, type, location, cost,
difficulty, notes
════════════════════════════════════════════════════════════════
BOWIE (Arts, Culture & Style):
────────────────────────────────────────────────────────────────
Film - Movies and TV shows
Properties: title, year, director, genre,
status, rating, date_watched, notes
Music - Songs, albums, artists
Properties: title, artist, album, genre, year,
rating, play_count, notes
Artwork - Visual art, exhibitions, collections
Properties: title, artist, medium, year, location,
notes
════════════════════════════════════════════════════════════════
BOURDAIN (Food & Drink):
────────────────────────────────────────────────────────────────
Recipe - Dishes to cook
Properties: name, cuisine, difficulty, time,
ingredients, instructions, source, notes
Restaurant - Dining destinations
Properties: name, location, cuisine, price_range,
rating, dishes_tried, notes
Ingredient - Foods and cooking components
Properties: name, category, season, source,
substitutes, notes
════════════════════════════════════════════════════════════════
COUSTEAU (Nature & Living Things):
────────────────────────────────────────────────────────────────
Species - Animals, fish, marine life
Properties: name, scientific_name, category,
habitat, conservation_status, notes
Plant - Garden plants, houseplants
Properties: name, scientific_name, type,
care_requirements, location, health_status
Ecosystem - Environments and habitats
Properties: name, type, location, characteristics,
species_present, conservation_notes
════════════════════════════════════════════════════════════════
KEY RELATIONSHIP PATTERNS:
────────────────────────────────────────────────────────────────
Cross-domain connections:
- Training -[PREPARATION_FOR]-> Trip
- Reflection -[ABOUT]-> Event/Training/Trip
- Book -[INSPIRED]-> Trip/Recipe/Concept
- Recipe -[FROM_LOCATION]-> Location
- Music -[PLAYED_AT]-> Event/Location
- Film -[SET_IN]-> Location
- Species -[OBSERVED_AT]-> Location
- Plant -[GROWS_IN]-> Location
Personal connections:
- Person -[ATTENDED]-> Event
- Person -[TRAVELED_WITH]-> Trip
- Person -[TRAINED_WITH]-> Training
- Person -[SHARED_MEAL]-> Recipe/Restaurant
- Person -[RECOMMENDED]-> Book/Film/Music/Restaurant
Learning connections:
- Book -[ABOUT]-> Topic
- Topic -[CONTAINS]-> Concept
- Concept -[RELATES_TO]-> Concept
- Training -[TEACHES]-> Concept (movement patterns, discipline)
════════════════════════════════════════════════════════════════
"""
print(schema_doc)
logger.info("Schema documentation displayed")
def get_credentials(args):
"""
Collect Neo4j credentials from environment variables, CLI args, or prompts.
Priority: CLI args > Environment variables > Interactive prompts
"""
# URI
uri = args.uri or os.environ.get("NEO4J_URI")
if not uri:
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip()
if not uri:
uri = "bolt://localhost:7687"
# Username
user = args.user or os.environ.get("NEO4J_USER")
if not user:
user = input("Neo4j username [neo4j]: ").strip()
if not user:
user = "neo4j"
# Password (never from CLI for security)
password = os.environ.get("NEO4J_PASSWORD")
if not password:
password = getpass.getpass("Neo4j password: ")
if not password:
logger.error("Password is required")
sys.exit(1)
return uri, user, password
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Initialize Neo4j Life Graph schema for AI assistants",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Interactive prompts
%(prog)s --uri bolt://ariel.incus:7687 # Specify URI, prompt for rest
%(prog)s --test-only # Run tests without creating schema
%(prog)s --skip-samples # Create schema without sample data
Environment Variables:
NEO4J_URI Bolt connection URI
NEO4J_USER Database username
NEO4J_PASSWORD Database password (recommended for scripts)
"""
)
parser.add_argument(
"--uri", "-u",
help="Neo4j Bolt URI (default: bolt://localhost:7687)"
)
parser.add_argument(
"--user", "-U",
help="Neo4j username (default: neo4j)"
)
parser.add_argument(
"--test-only", "-t",
action="store_true",
help="Only run verification tests, don't create schema"
)
parser.add_argument(
"--skip-samples",
action="store_true",
help="Skip creating sample nodes"
)
parser.add_argument(
"--skip-docs",
action="store_true",
help="Skip displaying schema documentation"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Reduce output verbosity"
)
return parser.parse_args()
def main():
"""
Main execution function.
Collects credentials via prompts or environment variables.
"""
args = parse_args()
# Set log level
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
# Get credentials
uri, user, password = get_credentials(args)
logger.info(f"Connecting to Neo4j at {uri}...")
try:
schema = LifeGraphSchema(uri, user, password)
except Exception as e:
logger.error(f"Failed to create database driver: {e}")
sys.exit(1)
try:
# Verify connection first
try:
schema.verify_connection()
except AuthError:
logger.error("✗ Authentication failed - check username/password")
sys.exit(1)
except ServiceUnavailable:
logger.error(f"✗ Cannot connect to Neo4j at {uri}")
sys.exit(1)
if args.test_only:
# Just run basic tests (no schema verification)
success = schema.run_tests(include_schema_tests=False)
sys.exit(0 if success else 1)
# Display schema documentation
if not args.skip_docs:
schema.document_schema()
# Create constraints (includes automatic indexes)
logger.info("Creating constraints...")
schema.create_constraints()
# Create additional indexes
logger.info("Creating indexes...")
schema.create_indexes()
# Create sample nodes to validate schema
if not args.skip_samples:
logger.info("Creating sample nodes...")
schema.create_sample_nodes()
# Run verification tests (including schema tests)
logger.info("Verifying schema...")
test_success = schema.run_tests(include_schema_tests=True)
# Summary
stats = schema.verify_schema()
logger.info("=" * 60)
logger.info("SCHEMA INITIALIZATION COMPLETE")
logger.info("=" * 60)
logger.info(f" Constraints: {stats['constraints']}")
logger.info(f" Indexes: {stats['indexes']}")
logger.info(f" Nodes: {stats['nodes']}")
logger.info("=" * 60)
if test_success:
logger.info("✓ All tests passed!")
logger.info("\nNext steps:")
logger.info(" 1. Import your Plex library (Film, Music nodes)")
logger.info(" 2. Import your Calibre library (Book nodes)")
logger.info(" 3. Configure your AI assistants to write to this graph")
else:
logger.warning("⚠ Some tests failed - review output above")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\nOperation cancelled by user")
sys.exit(130)
except Exception as e:
logger.error(f"Error during schema initialization: {e}")
sys.exit(1)
finally:
schema.close()
if __name__ == "__main__":
main()