""" Neo4j Unified Knowledge Graph Schema Initialization ===================================================== Creates the foundational schema for a unified knowledge graph used by sixteen AI assistants across three teams: Personal Team (Iolaus): Shawn (General — calendar/contacts/comms), Nate (Travel), Hypatia (Learning), Marcus (Fitness), Watson (Reflection & Emotional Safety), Bourdain (Food), David (Arts & Culture), Cousteau (Nature), Garth (Finance), Cristiano (Football) Work Team (Mentor): Alan (Strategy), Ann (Marketing), Jeffrey (Sales), Jarvis (Execution), AWS SA (Architecture) Engineering Team (Kottos): Scotty (Infrastructure), Harper (Prototyping) Schema Reference: docs/neo4j-unified-schema.md (v2.3.0) Requirements: pip install neo4j Usage: python neo4j-schema-init.py python neo4j-schema-init.py --uri bolt://ariel.incus:7687 python neo4j-schema-init.py --test-only Environment Variables (optional): NEO4J_URI - Bolt URI (default: bolt://localhost:7687) NEO4J_USER - Username (default: neo4j) NEO4J_PASSWORD - Password (will prompt if not set) """ import argparse import getpass import os import sys from neo4j import GraphDatabase from neo4j.exceptions import AuthError, ServiceUnavailable import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class LifeGraphSchema: def __init__(self, uri, user, password): """Initialize connection to Neo4j database""" self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.uri = uri def close(self): """Close the database connection""" self.driver.close() def verify_connection(self): """ Verify the connection to Neo4j is working. Returns True if successful, raises exception otherwise. """ with self.driver.session() as session: result = session.run("RETURN 1 AS test") record = result.single() if record and record["test"] == 1: logger.info(f"✓ Connected to Neo4j at {self.uri}") return True raise ConnectionError("Failed to verify Neo4j connection") def create_constraints(self): """ Create uniqueness constraints on key node properties. This ensures data integrity and creates indexes automatically. All 79 node types get an id uniqueness constraint. """ constraints = [ # ── Universal nodes ────────────────────────────────────── "CREATE CONSTRAINT person_id IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT location_id IF NOT EXISTS FOR (n:Location) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT event_id IF NOT EXISTS FOR (n:Event) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (n:Topic) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT goal_id IF NOT EXISTS FOR (n:Goal) REQUIRE n.id IS UNIQUE", # ── Nate: Travel & Adventure ───────────────────────────── "CREATE CONSTRAINT trip_id IF NOT EXISTS FOR (n:Trip) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT destination_id IF NOT EXISTS FOR (n:Destination) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT activity_id IF NOT EXISTS FOR (n:Activity) REQUIRE n.id IS UNIQUE", # ── Hypatia: Learning & Reading ────────────────────────── "CREATE CONSTRAINT book_id IF NOT EXISTS FOR (n:Book) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT author_id IF NOT EXISTS FOR (n:Author) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT learningpath_id IF NOT EXISTS FOR (n:LearningPath) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT concept_id IF NOT EXISTS FOR (n:Concept) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT quote_id IF NOT EXISTS FOR (n:Quote) REQUIRE n.id IS UNIQUE", # ── Marcus: Fitness & Training ─────────────────────────── "CREATE CONSTRAINT training_id IF NOT EXISTS FOR (n:Training) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT exercise_id IF NOT EXISTS FOR (n:Exercise) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT program_id IF NOT EXISTS FOR (n:Program) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT personalrecord_id IF NOT EXISTS FOR (n:PersonalRecord) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT bodymetric_id IF NOT EXISTS FOR (n:BodyMetric) REQUIRE n.id IS UNIQUE", # ── Watson: Reflection & Emotional Safety ──────────────── "CREATE CONSTRAINT reflection_id IF NOT EXISTS FOR (n:Reflection) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT value_id IF NOT EXISTS FOR (n:Value) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT habit_id IF NOT EXISTS FOR (n:Habit) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT lifeevent_id IF NOT EXISTS FOR (n:LifeEvent) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT intention_id IF NOT EXISTS FOR (n:Intention) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT emotionalmemory_id IF NOT EXISTS FOR (n:EmotionalMemory) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT relationshiptheme_id IF NOT EXISTS FOR (n:RelationshipTheme) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT dialoguenote_id IF NOT EXISTS FOR (n:DialogueNote) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT dynamicpattern_id IF NOT EXISTS FOR (n:DynamicPattern) REQUIRE n.id IS UNIQUE", # ── Bourdain: Food & Cooking ───────────────────────────── "CREATE CONSTRAINT recipe_id IF NOT EXISTS FOR (n:Recipe) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT restaurant_id IF NOT EXISTS FOR (n:Restaurant) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT ingredient_id IF NOT EXISTS FOR (n:Ingredient) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT meal_id IF NOT EXISTS FOR (n:Meal) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT technique_id IF NOT EXISTS FOR (n:Technique) REQUIRE n.id IS UNIQUE", # ── David: Arts & Culture ──────────────────────────────── "CREATE CONSTRAINT music_id IF NOT EXISTS FOR (n:Music) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT film_id IF NOT EXISTS FOR (n:Film) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT artwork_id IF NOT EXISTS FOR (n:Artwork) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT playlist_id IF NOT EXISTS FOR (n:Playlist) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT artist_id IF NOT EXISTS FOR (n:Artist) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT style_id IF NOT EXISTS FOR (n:Style) REQUIRE n.id IS UNIQUE", # ── Cousteau: Nature & Living Things ───────────────────── "CREATE CONSTRAINT species_id IF NOT EXISTS FOR (n:Species) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT plant_id IF NOT EXISTS FOR (n:Plant) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT tank_id IF NOT EXISTS FOR (n:Tank) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT garden_id IF NOT EXISTS FOR (n:Garden) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT ecosystem_id IF NOT EXISTS FOR (n:Ecosystem) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT observation_id IF NOT EXISTS FOR (n:Observation) REQUIRE n.id IS UNIQUE", # ── Garth: Personal Finance ────────────────────────────── "CREATE CONSTRAINT account_id IF NOT EXISTS FOR (n:Account) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT investment_id IF NOT EXISTS FOR (n:Investment) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT asset_id IF NOT EXISTS FOR (n:Asset) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT liability_id IF NOT EXISTS FOR (n:Liability) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT budget_id IF NOT EXISTS FOR (n:Budget) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT financialgoal_id IF NOT EXISTS FOR (n:FinancialGoal) REQUIRE n.id IS UNIQUE", # ── Cristiano: Football ─────────────────────────────────── "CREATE CONSTRAINT match_id IF NOT EXISTS FOR (n:Match) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT team_id IF NOT EXISTS FOR (n:Team) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT league_id IF NOT EXISTS FOR (n:League) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT tournament_id IF NOT EXISTS FOR (n:Tournament) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT player_id IF NOT EXISTS FOR (n:Player) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT season_id IF NOT EXISTS FOR (n:Season) REQUIRE n.id IS UNIQUE", # ── Shawn: Personal General Assistant ──────────────────── "CREATE CONSTRAINT communication_id IF NOT EXISTS FOR (n:Communication) REQUIRE n.id IS UNIQUE", # ── Work: Business ─────────────────────────────────────── "CREATE CONSTRAINT client_id IF NOT EXISTS FOR (n:Client) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT contact_id IF NOT EXISTS FOR (n:Contact) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT opportunity_id IF NOT EXISTS FOR (n:Opportunity) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT proposal_id IF NOT EXISTS FOR (n:Proposal) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT project_id IF NOT EXISTS FOR (n:Project) REQUIRE n.id IS UNIQUE", # ── Work: Market Intelligence ──────────────────────────── "CREATE CONSTRAINT vendor_id IF NOT EXISTS FOR (n:Vendor) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT competitor_id IF NOT EXISTS FOR (n:Competitor) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT markettrend_id IF NOT EXISTS FOR (n:MarketTrend) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT technology_id IF NOT EXISTS FOR (n:Technology) REQUIRE n.id IS UNIQUE", # ── Work: Content & Visibility ─────────────────────────── "CREATE CONSTRAINT content_id IF NOT EXISTS FOR (n:Content) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT publication_id IF NOT EXISTS FOR (n:Publication) REQUIRE n.id IS UNIQUE", # ── Work: Professional Development ─────────────────────── "CREATE CONSTRAINT skill_id IF NOT EXISTS FOR (n:Skill) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT certification_id IF NOT EXISTS FOR (n:Certification) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT relationship_id IF NOT EXISTS FOR (n:Relationship) REQUIRE n.id IS UNIQUE", # ── Work: Daily Operations ─────────────────────────────── "CREATE CONSTRAINT task_id IF NOT EXISTS FOR (n:Task) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (n:Meeting) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT note_id IF NOT EXISTS FOR (n:Note) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT decision_id IF NOT EXISTS FOR (n:Decision) REQUIRE n.id IS UNIQUE", # ── Engineering: Scotty ────────────────────────────────── "CREATE CONSTRAINT infrastructure_id IF NOT EXISTS FOR (n:Infrastructure) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT incident_id IF NOT EXISTS FOR (n:Incident) REQUIRE n.id IS UNIQUE", # ── Engineering: Harper ────────────────────────────────── "CREATE CONSTRAINT prototype_id IF NOT EXISTS FOR (n:Prototype) REQUIRE n.id IS UNIQUE", "CREATE CONSTRAINT experiment_id IF NOT EXISTS FOR (n:Experiment) REQUIRE n.id IS UNIQUE", ] with self.driver.session() as session: created = 0 for constraint in constraints: try: session.run(constraint) name = constraint.split("CONSTRAINT")[1].split("IF")[0].strip() logger.info(f" ✓ Constraint: {name}") created += 1 except Exception as e: logger.warning(f" ⚠ Constraint may already exist: {e}") logger.info(f"Constraints processed: {created}/{len(constraints)}") def create_indexes(self): """ Create indexes for frequently queried properties. These improve query performance for searches and filters. Organized by query pattern: name/title, date, type/status, domain. """ indexes = [ # ── Name / Title text search ───────────────────────────── "CREATE INDEX person_name IF NOT EXISTS FOR (n:Person) ON (n.name)", "CREATE INDEX location_name IF NOT EXISTS FOR (n:Location) ON (n.name)", "CREATE INDEX topic_name IF NOT EXISTS FOR (n:Topic) ON (n.name)", "CREATE INDEX goal_name IF NOT EXISTS FOR (n:Goal) ON (n.name)", "CREATE INDEX book_title IF NOT EXISTS FOR (n:Book) ON (n.title)", "CREATE INDEX film_title IF NOT EXISTS FOR (n:Film) ON (n.title)", "CREATE INDEX music_title IF NOT EXISTS FOR (n:Music) ON (n.title)", "CREATE INDEX artwork_title IF NOT EXISTS FOR (n:Artwork) ON (n.title)", "CREATE INDEX recipe_name IF NOT EXISTS FOR (n:Recipe) ON (n.name)", "CREATE INDEX restaurant_name IF NOT EXISTS FOR (n:Restaurant) ON (n.name)", "CREATE INDEX exercise_name IF NOT EXISTS FOR (n:Exercise) ON (n.name)", "CREATE INDEX species_name IF NOT EXISTS FOR (n:Species) ON (n.name)", "CREATE INDEX plant_name IF NOT EXISTS FOR (n:Plant) ON (n.name)", "CREATE INDEX ingredient_name IF NOT EXISTS FOR (n:Ingredient) ON (n.name)", "CREATE INDEX artist_name IF NOT EXISTS FOR (n:Artist) ON (n.name)", "CREATE INDEX author_name IF NOT EXISTS FOR (n:Author) ON (n.name)", "CREATE INDEX concept_name IF NOT EXISTS FOR (n:Concept) ON (n.name)", "CREATE INDEX client_name IF NOT EXISTS FOR (n:Client) ON (n.name)", "CREATE INDEX contact_name IF NOT EXISTS FOR (n:Contact) ON (n.name)", "CREATE INDEX vendor_name IF NOT EXISTS FOR (n:Vendor) ON (n.name)", "CREATE INDEX competitor_name IF NOT EXISTS FOR (n:Competitor) ON (n.name)", "CREATE INDEX technology_name IF NOT EXISTS FOR (n:Technology) ON (n.name)", "CREATE INDEX content_title IF NOT EXISTS FOR (n:Content) ON (n.title)", "CREATE INDEX skill_name IF NOT EXISTS FOR (n:Skill) ON (n.name)", "CREATE INDEX task_title IF NOT EXISTS FOR (n:Task) ON (n.title)", "CREATE INDEX meeting_title IF NOT EXISTS FOR (n:Meeting) ON (n.title)", "CREATE INDEX infrastructure_name IF NOT EXISTS FOR (n:Infrastructure) ON (n.name)", "CREATE INDEX prototype_name IF NOT EXISTS FOR (n:Prototype) ON (n.name)", "CREATE INDEX investment_ticker IF NOT EXISTS FOR (n:Investment) ON (n.ticker)", "CREATE INDEX match_home IF NOT EXISTS FOR (n:Match) ON (n.home_team)", "CREATE INDEX match_away IF NOT EXISTS FOR (n:Match) ON (n.away_team)", "CREATE INDEX team_name IF NOT EXISTS FOR (n:Team) ON (n.name)", "CREATE INDEX league_name IF NOT EXISTS FOR (n:League) ON (n.name)", "CREATE INDEX tournament_name IF NOT EXISTS FOR (n:Tournament) ON (n.name)", "CREATE INDEX player_name IF NOT EXISTS FOR (n:Player) ON (n.name)", "CREATE INDEX season_team IF NOT EXISTS FOR (n:Season) ON (n.team)", # ── Date indexes for temporal queries ──────────────────── "CREATE INDEX event_date IF NOT EXISTS FOR (n:Event) ON (n.date)", "CREATE INDEX training_date IF NOT EXISTS FOR (n:Training) ON (n.date)", "CREATE INDEX trip_start IF NOT EXISTS FOR (n:Trip) ON (n.start_date)", "CREATE INDEX reflection_date IF NOT EXISTS FOR (n:Reflection) ON (n.date)", "CREATE INDEX observation_date IF NOT EXISTS FOR (n:Observation) ON (n.date)", "CREATE INDEX meal_date IF NOT EXISTS FOR (n:Meal) ON (n.date)", "CREATE INDEX meeting_date IF NOT EXISTS FOR (n:Meeting) ON (n.date)", "CREATE INDEX task_due IF NOT EXISTS FOR (n:Task) ON (n.due_date)", "CREATE INDEX note_date IF NOT EXISTS FOR (n:Note) ON (n.date)", "CREATE INDEX decision_date IF NOT EXISTS FOR (n:Decision) ON (n.date)", "CREATE INDEX incident_date IF NOT EXISTS FOR (n:Incident) ON (n.date)", "CREATE INDEX bodymetric_date IF NOT EXISTS FOR (n:BodyMetric) ON (n.date)", "CREATE INDEX personalrecord_date IF NOT EXISTS FOR (n:PersonalRecord) ON (n.date)", "CREATE INDEX lifeevent_date IF NOT EXISTS FOR (n:LifeEvent) ON (n.date)", "CREATE INDEX intention_date IF NOT EXISTS FOR (n:Intention) ON (n.date)", "CREATE INDEX match_date IF NOT EXISTS FOR (n:Match) ON (n.date)", "CREATE INDEX emotionalmemory_date IF NOT EXISTS FOR (n:EmotionalMemory) ON (n.date)", "CREATE INDEX dialoguenote_date IF NOT EXISTS FOR (n:DialogueNote) ON (n.date)", "CREATE INDEX dynamicpattern_date IF NOT EXISTS FOR (n:DynamicPattern) ON (n.date)", "CREATE INDEX communication_date IF NOT EXISTS FOR (n:Communication) ON (n.date)", # ── Type / Status / Category indexes ───────────────────── "CREATE INDEX event_type IF NOT EXISTS FOR (n:Event) ON (n.type)", "CREATE INDEX location_type IF NOT EXISTS FOR (n:Location) ON (n.type)", "CREATE INDEX activity_type IF NOT EXISTS FOR (n:Activity) ON (n.type)", "CREATE INDEX training_type IF NOT EXISTS FOR (n:Training) ON (n.type)", "CREATE INDEX music_genre IF NOT EXISTS FOR (n:Music) ON (n.genre)", "CREATE INDEX species_category IF NOT EXISTS FOR (n:Species) ON (n.category)", "CREATE INDEX exercise_category IF NOT EXISTS FOR (n:Exercise) ON (n.category)", "CREATE INDEX book_status IF NOT EXISTS FOR (n:Book) ON (n.status)", "CREATE INDEX trip_status IF NOT EXISTS FOR (n:Trip) ON (n.status)", "CREATE INDEX goal_status IF NOT EXISTS FOR (n:Goal) ON (n.status)", "CREATE INDEX goal_category IF NOT EXISTS FOR (n:Goal) ON (n.category)", "CREATE INDEX habit_status IF NOT EXISTS FOR (n:Habit) ON (n.status)", "CREATE INDEX program_status IF NOT EXISTS FOR (n:Program) ON (n.status)", "CREATE INDEX client_status IF NOT EXISTS FOR (n:Client) ON (n.status)", "CREATE INDEX opportunity_status IF NOT EXISTS FOR (n:Opportunity) ON (n.status)", "CREATE INDEX proposal_status IF NOT EXISTS FOR (n:Proposal) ON (n.status)", "CREATE INDEX project_status IF NOT EXISTS FOR (n:Project) ON (n.status)", "CREATE INDEX task_status IF NOT EXISTS FOR (n:Task) ON (n.status)", "CREATE INDEX task_priority IF NOT EXISTS FOR (n:Task) ON (n.priority)", "CREATE INDEX content_status IF NOT EXISTS FOR (n:Content) ON (n.status)", "CREATE INDEX content_type IF NOT EXISTS FOR (n:Content) ON (n.type)", "CREATE INDEX incident_severity IF NOT EXISTS FOR (n:Incident) ON (n.severity)", "CREATE INDEX incident_status IF NOT EXISTS FOR (n:Incident) ON (n.status)", "CREATE INDEX infrastructure_status IF NOT EXISTS FOR (n:Infrastructure) ON (n.status)", "CREATE INDEX account_type IF NOT EXISTS FOR (n:Account) ON (n.type)", "CREATE INDEX investment_type IF NOT EXISTS FOR (n:Investment) ON (n.type)", "CREATE INDEX liability_type IF NOT EXISTS FOR (n:Liability) ON (n.type)", "CREATE INDEX financialgoal_status IF NOT EXISTS FOR (n:FinancialGoal) ON (n.status)", "CREATE INDEX skill_category IF NOT EXISTS FOR (n:Skill) ON (n.category)", "CREATE INDEX skill_level IF NOT EXISTS FOR (n:Skill) ON (n.level)", "CREATE INDEX vendor_category IF NOT EXISTS FOR (n:Vendor) ON (n.category)", "CREATE INDEX match_competition IF NOT EXISTS FOR (n:Match) ON (n.competition)", "CREATE INDEX team_league IF NOT EXISTS FOR (n:Team) ON (n.league)", "CREATE INDEX player_position IF NOT EXISTS FOR (n:Player) ON (n.position)", "CREATE INDEX player_team IF NOT EXISTS FOR (n:Player) ON (n.team)", "CREATE INDEX league_country IF NOT EXISTS FOR (n:League) ON (n.country)", "CREATE INDEX season_year IF NOT EXISTS FOR (n:Season) ON (n.season_year)", # ── Domain indexes for cross-team filtering ────────────── "CREATE INDEX event_domain IF NOT EXISTS FOR (n:Event) ON (n.domain)", "CREATE INDEX topic_domain IF NOT EXISTS FOR (n:Topic) ON (n.domain)", "CREATE INDEX goal_domain IF NOT EXISTS FOR (n:Goal) ON (n.domain)", "CREATE INDEX location_domain IF NOT EXISTS FOR (n:Location) ON (n.domain)", "CREATE INDEX person_domain IF NOT EXISTS FOR (n:Person) ON (n.domain)", "CREATE INDEX contact_domain IF NOT EXISTS FOR (n:Contact) ON (n.domain)", "CREATE INDEX task_domain IF NOT EXISTS FOR (n:Task) ON (n.domain)", ] with self.driver.session() as session: created = 0 for index in indexes: try: session.run(index) name = index.split("INDEX")[1].split("IF")[0].strip() logger.info(f" ✓ Index: {name}") created += 1 except Exception as e: logger.warning(f" ⚠ Index may already exist: {e}") logger.info(f"Indexes processed: {created}/{len(indexes)}") def verify_schema(self): """ Verify that constraints and indexes were created successfully. Returns a dict with counts and status. """ results = {"constraints": 0, "indexes": 0, "nodes": 0, "success": True} with self.driver.session() as session: # Count constraints constraint_result = session.run("SHOW CONSTRAINTS") constraints = list(constraint_result) results["constraints"] = len(constraints) # Count indexes (excluding constraint-created ones) index_result = session.run("SHOW INDEXES WHERE type = 'RANGE'") indexes = list(index_result) results["indexes"] = len(indexes) # Count nodes node_result = session.run("MATCH (n) RETURN count(n) AS count") results["nodes"] = node_result.single()["count"] return results def run_tests(self, include_schema_tests=True): """ Run comprehensive tests to verify schema and APOC functionality. Returns True if all tests pass, False otherwise. Args: include_schema_tests: If True, also verify constraints/indexes exist """ tests_passed = 0 tests_failed = 0 test_cases = [ ("Connection test", "RETURN 1 AS result", lambda r: r.single()["result"] == 1), ("APOC available", "RETURN apoc.version() AS version", lambda r: r.single()["version"] is not None), ("Create test node", "CREATE (t:_Test {id: 'test_' + toString(timestamp())}) RETURN t.id AS id", lambda r: r.single()["id"] is not None), ("Query test node", "MATCH (t:_Test) RETURN count(t) AS count", lambda r: r.single()["count"] >= 1), ("APOC collection functions", "RETURN apoc.coll.sum([1,2,3]) AS total", lambda r: r.single()["total"] == 6), ("APOC date functions", "RETURN apoc.date.format(timestamp(), 'ms', 'yyyy-MM-dd') AS today", lambda r: len(r.single()["today"]) == 10), ] # Schema-specific tests schema_tests = [ # Universal nodes ("Constraint: Person", "SHOW CONSTRAINTS WHERE name = 'person_id'", lambda r: len(list(r)) == 1), ("Constraint: Location", "SHOW CONSTRAINTS WHERE name = 'location_id'", lambda r: len(list(r)) == 1), ("Constraint: Topic", "SHOW CONSTRAINTS WHERE name = 'topic_id'", lambda r: len(list(r)) == 1), ("Constraint: Goal", "SHOW CONSTRAINTS WHERE name = 'goal_id'", lambda r: len(list(r)) == 1), # Personal team samples ("Constraint: Book", "SHOW CONSTRAINTS WHERE name = 'book_id'", lambda r: len(list(r)) == 1), ("Constraint: Training", "SHOW CONSTRAINTS WHERE name = 'training_id'", lambda r: len(list(r)) == 1), ("Constraint: Recipe", "SHOW CONSTRAINTS WHERE name = 'recipe_id'", lambda r: len(list(r)) == 1), ("Constraint: Account", "SHOW CONSTRAINTS WHERE name = 'account_id'", lambda r: len(list(r)) == 1), # Work team samples ("Constraint: Client", "SHOW CONSTRAINTS WHERE name = 'client_id'", lambda r: len(list(r)) == 1), ("Constraint: Opportunity", "SHOW CONSTRAINTS WHERE name = 'opportunity_id'", lambda r: len(list(r)) == 1), ("Constraint: Task", "SHOW CONSTRAINTS WHERE name = 'task_id'", lambda r: len(list(r)) == 1), # Engineering team samples ("Constraint: Infrastructure", "SHOW CONSTRAINTS WHERE name = 'infrastructure_id'", lambda r: len(list(r)) == 1), ("Constraint: Prototype", "SHOW CONSTRAINTS WHERE name = 'prototype_id'", lambda r: len(list(r)) == 1), # Index checks ("Index: person_name", "SHOW INDEXES WHERE name = 'person_name'", lambda r: len(list(r)) == 1), ("Index: event_domain", "SHOW INDEXES WHERE name = 'event_domain'", lambda r: len(list(r)) == 1), ("Index: client_status", "SHOW INDEXES WHERE name = 'client_status'", lambda r: len(list(r)) == 1), # Cristiano team sample ("Constraint: Match", "SHOW CONSTRAINTS WHERE name = 'match_id'", lambda r: len(list(r)) == 1), ("Constraint: Team", "SHOW CONSTRAINTS WHERE name = 'team_id'", lambda r: len(list(r)) == 1), # Watson emotional-memory samples (v2.2.0) ("Constraint: EmotionalMemory", "SHOW CONSTRAINTS WHERE name = 'emotionalmemory_id'", lambda r: len(list(r)) == 1), ("Constraint: RelationshipTheme", "SHOW CONSTRAINTS WHERE name = 'relationshiptheme_id'", lambda r: len(list(r)) == 1), ("Constraint: DialogueNote", "SHOW CONSTRAINTS WHERE name = 'dialoguenote_id'", lambda r: len(list(r)) == 1), ("Constraint: DynamicPattern", "SHOW CONSTRAINTS WHERE name = 'dynamicpattern_id'", lambda r: len(list(r)) == 1), # Shawn sample (v2.3.0) ("Constraint: Communication", "SHOW CONSTRAINTS WHERE name = 'communication_id'", lambda r: len(list(r)) == 1), # Total constraint count (79 node types as of v2.3.0) ("Total constraints >= 79", "SHOW CONSTRAINTS", lambda r: len(list(r)) >= 79), ] if include_schema_tests: test_cases.extend(schema_tests) logger.info("\n" + "=" * 60) logger.info("RUNNING SCHEMA VERIFICATION TESTS") logger.info("=" * 60) with self.driver.session() as session: for test_name, query, validator in test_cases: try: result = session.run(query) if validator(result): logger.info(f" ✓ {test_name}") tests_passed += 1 else: logger.error(f" ✗ {test_name} - Validation failed") tests_failed += 1 except Exception as e: logger.error(f" ✗ {test_name} - {e}") tests_failed += 1 # Cleanup test nodes try: session.run("MATCH (t:_Test) DELETE t") logger.info(" ✓ Cleanup test nodes") except Exception as e: logger.warning(f" ⚠ Cleanup failed: {e}") logger.info("=" * 60) logger.info(f"Tests: {tests_passed} passed, {tests_failed} failed") logger.info("=" * 60 + "\n") return tests_failed == 0 def create_sample_nodes(self): """ Create sample nodes spanning all three teams to demonstrate the unified schema and cross-domain relationships. Uses explicit write transactions for reliable commits. """ node_queries = [ # ── Central person node ────────────────────────────────── ("Person:user_main", """ MERGE (p:Person {id: 'user_main'}) ON CREATE SET p.created_at = datetime() SET p.name = 'Main User', p.relationship = 'self', p.domain = 'both', p.updated_at = datetime() RETURN p.id AS id """), # ── Personal: Sample location ──────────────────────────── ("Location:location_home", """ MERGE (l:Location {id: 'location_home'}) ON CREATE SET l.created_at = datetime() SET l.name = 'Home', l.type = 'residence', l.domain = 'personal', l.updated_at = datetime() RETURN l.id AS id """), # ── Personal: Sample trip (Nate) ───────────────────────── ("Trip:trip_sample_2025", """ MERGE (t:Trip {id: 'trip_sample_2025'}) ON CREATE SET t.created_at = datetime() SET t.name = 'Sample Trip', t.status = 'planning', t.updated_at = datetime() RETURN t.id AS id """), # ── Personal: Sample book (Hypatia) ────────────────────── ("Book:book_meditations_aurelius", """ MERGE (b:Book {id: 'book_meditations_aurelius'}) ON CREATE SET b.created_at = datetime() SET b.title = 'Meditations', b.author = 'Marcus Aurelius', b.status = 'completed', b.rating = 5, b.updated_at = datetime() RETURN b.id AS id """), # ── Personal: Sample goal (Watson) ─────────────────────── ("Goal:goal_sample_2025", """ MERGE (g:Goal {id: 'goal_sample_2025'}) ON CREATE SET g.created_at = datetime() SET g.name = 'Sample Goal', g.category = 'personal_growth', g.domain = 'personal', g.status = 'in_progress', g.updated_at = datetime() RETURN g.id AS id """), # ── Personal: Sample topic (universal) ─────────────────── ("Topic:topic_stoicism", """ MERGE (t:Topic {id: 'topic_stoicism'}) ON CREATE SET t.created_at = datetime() SET t.name = 'Stoicism', t.category = 'philosophy', t.domain = 'personal', t.updated_at = datetime() RETURN t.id AS id """), # ── Personal: Sample account (Garth) ───────────────────── ("Account:account_tfsa_sample", """ MERGE (a:Account {id: 'account_tfsa_sample'}) ON CREATE SET a.created_at = datetime() SET a.name = 'TFSA - Sample', a.type = 'TFSA', a.updated_at = datetime() RETURN a.id AS id """), # ── Shawn: Sample personal contact + communication ─────── ("Contact:contact_sample_personal", """ MERGE (c:Contact {id: 'contact_sample_personal'}) ON CREATE SET c.created_at = datetime() SET c.name = 'Sample Personal Contact', c.domain = 'personal', c.relationship_strength = 'developing', c.updated_at = datetime() RETURN c.id AS id """), ("Communication:comm_sample", """ MERGE (c:Communication {id: 'comm_sample'}) ON CREATE SET c.created_at = datetime() SET c.type = 'in_person', c.contact_id = 'contact_sample_personal', c.date = date(), c.summary = 'Sample interaction record', c.updated_at = datetime() RETURN c.id AS id """), # ── Watson: Sample emotional memory ────────────────────── ("EmotionalMemory:memory_sample", """ MERGE (m:EmotionalMemory {id: 'memory_sample'}) ON CREATE SET m.created_at = datetime() SET m.date = date(), m.theme = 'safety', m.intensity = 3, m.content = 'Sample emotional memory entry', m.updated_at = datetime() RETURN m.id AS id """), # ── Work: Sample client ────────────────────────────────── ("Client:client_sample_corp", """ MERGE (c:Client {id: 'client_sample_corp'}) ON CREATE SET c.created_at = datetime() SET c.name = 'Sample Corp', c.industry = 'Technology', c.status = 'prospect', c.domain = 'work', c.updated_at = datetime() RETURN c.id AS id """), # ── Work: Sample skill ─────────────────────────────────── ("Skill:skill_cx_strategy", """ MERGE (s:Skill {id: 'skill_cx_strategy'}) ON CREATE SET s.created_at = datetime() SET s.name = 'CX Strategy', s.category = 'consulting', s.level = 'expert', s.updated_at = datetime() RETURN s.id AS id """), # ── Work: Sample topic ─────────────────────────────────── ("Topic:topic_ai_in_cx", """ MERGE (t:Topic {id: 'topic_ai_in_cx'}) ON CREATE SET t.created_at = datetime() SET t.name = 'AI in Customer Experience', t.category = 'technology', t.domain = 'work', t.updated_at = datetime() RETURN t.id AS id """), # ── Engineering: Sample infrastructure (Scotty) ────────── ("Infrastructure:infra_neo4j_prod", """ MERGE (i:Infrastructure {id: 'infra_neo4j_prod'}) ON CREATE SET i.created_at = datetime() SET i.name = 'Neo4j Production', i.type = 'database', i.status = 'running', i.environment = 'production', i.updated_at = datetime() RETURN i.id AS id """), # ── Personal: Sample team (Cristiano) ────────────────────── ("Team:team_arsenal", """ MERGE (t:Team {id: 'team_arsenal'}) ON CREATE SET t.created_at = datetime() SET t.name = 'Arsenal', t.league = 'Premier League', t.country = 'England', t.followed = true, t.updated_at = datetime() RETURN t.id AS id """), ] # Create all nodes in one explicit transaction (auto-commits on exit) created_nodes = 0 with self.driver.session() as session: with session.begin_transaction() as tx: for label, query in node_queries: try: result = tx.run(query) record = result.single() logger.info(f" ✓ Node: {label} → {record['id']}") created_nodes += 1 except Exception as e: logger.error(f" ✗ Node {label}: {e}") # tx auto-commits when context exits normally logger.info(f" Created {created_nodes}/{len(node_queries)} sample nodes") # Verify nodes exist before creating relationships with self.driver.session() as session: count = session.run("MATCH (n) RETURN count(n) AS c").single()["c"] logger.info(f" Verified {count} nodes exist before creating relationships") # Create all relationships in one explicit transaction rel_specs = [ ("SUPPORTS", "Person", "user_main", "Team", "team_arsenal"), ("COMPLETED", "Person", "user_main", "Book", "book_meditations_aurelius"), ("PURSUING", "Person", "user_main", "Goal", "goal_sample_2025"), ("EXPLORES", "Book", "book_meditations_aurelius", "Topic", "topic_stoicism"), ("OWNS", "Person", "user_main", "Account", "account_tfsa_sample"), ("HAD", "Person", "user_main", "Communication", "comm_sample"), ("WITH", "Communication", "comm_sample", "Contact", "contact_sample_personal"), ] created_rels = 0 with self.driver.session() as session: with session.begin_transaction() as tx: for rel_type, from_label, from_id, to_label, to_id in rel_specs: desc = f"({from_id})-[:{rel_type}]->({to_id})" try: query = ( f"MATCH (a:{from_label} {{id: $from_id}}) " f"MATCH (b:{to_label} {{id: $to_id}}) " f"MERGE (a)-[r:{rel_type}]->(b) " f"RETURN type(r) AS rel" ) result = tx.run(query, from_id=from_id, to_id=to_id) record = result.single() if record is None: logger.error(f" ✗ Rel {desc}: endpoints not found") else: logger.info(f" ✓ Rel: {desc}") created_rels += 1 except Exception as e: logger.error(f" ✗ Rel {desc}: {e}") # tx auto-commits when context exits normally logger.info(f" Created {created_rels}/{len(rel_specs)} sample relationships") def document_schema(self): """ Display a summary of the unified schema design. Full documentation: docs/neo4j-unified-schema.md """ schema_doc = """ ════════════════════════════════════════════════════════════════ UNIFIED KNOWLEDGE GRAPH SCHEMA One graph for all assistants across personal, work, and engineering ════════════════════════════════════════════════════════════════ UNIVERSAL NODES (any assistant can read/write): ──────────────────────────────────────────────────────────────── Person People (self, family, friends, colleagues) Location Physical places (cities, venues, offices, trails) Event Significant occurrences (celebrations, conferences) Topic Subjects of interest (stoicism, AI in CX) Goal Objectives (personal growth, career, fitness, financial) PERSONAL TEAM (Iolaus): ──────────────────────────────────────────────────────────────── Shawn (General) Contact, Event, Task (all domain='personal'), Communication Nate (Travel) Trip, Destination, Activity Hypatia (Learning) Book, Author, LearningPath, Concept, Quote Marcus (Fitness) Training, Exercise, Program, PersonalRecord, BodyMetric Watson (Reflection) Reflection, Value, Habit, LifeEvent, Intention, EmotionalMemory, RelationshipTheme, DialogueNote, DynamicPattern Bourdain (Food) Recipe, Restaurant, Ingredient, Meal, Technique David (Culture) Music, Film, Artwork, Playlist, Artist, Style Cousteau (Nature) Species, Plant, Tank, Garden, Ecosystem, Observation Garth (Finance) Account, Investment, Asset, Liability, Budget, FinancialGoal Cristiano (Football) Match, Team, League, Tournament, Player, Season WORK TEAM (Mentor): ──────────────────────────────────────────────────────────────── Alan (Strategy) Client, Vendor, Competitor, MarketTrend, Technology, Decision Ann (Marketing) Content, Publication, Topic, Event (domain='work') Jeffrey (Sales) Contact (domain='work'), Opportunity, Proposal, Meeting Jarvis (Execution) Task (domain='work'), Meeting, Note, Decision, Project AWS SA (Architecture) No domain ownership — writes Note (messages) only ENGINEERING TEAM (Kottos): ──────────────────────────────────────────────────────────────── Scotty (Infra) Infrastructure, Incident Harper (Hacking) Prototype, Experiment TOTAL: 79 node types, 16 assistants. All node types have id uniqueness constraints. Contact/Event/Task are Universal with a `domain` field ('personal' or 'work') disambiguating Shawn vs. Jarvis/Jeffrey ownership. CROSS-TEAM CONNECTIONS (examples): ──────────────────────────────────────────────────────────────── Trip -[FOR_EVENT]-> Event (Personal ↔ Work) Book -[DEVELOPS]-> Skill (Personal ↔ Work) Book -[INFORMS]-> Content (Personal ↔ Work) Infrastructure -[HOSTS]-> Project (Engineering ↔ Work) Prototype -[SUPPORTS]-> Opportunity (Engineering ↔ Work) Project -[GENERATES_REVENUE]-> Account (Work ↔ Personal) Training -[BUILDS]-> Skill (Personal ↔ Work) Communication -[WITH]-> Contact (Shawn: personal interaction history) Full schema: docs/neo4j-unified-schema.md (v2.3.0) ════════════════════════════════════════════════════════════════ """ print(schema_doc) logger.info("Schema documentation displayed") def preview_changes(self): """ Print what a full init run WOULD create, without writing anything. Compares the live database's current state to the v2.3.0 schema spec (79 constraints, all indexes, 14 sample nodes, 7 sample rels). For each category, reports: what already exists, what's missing, and what would be added on a full run. Purely read-only — runs SHOW CONSTRAINTS / SHOW INDEXES / MATCH queries against the live DB but does not modify any data. """ # Known totals from this script's create_* methods. Kept in sync with # the v2.3.0 schema doc; verified by the unit tests in run_tests(). EXPECTED_CONSTRAINTS = 79 EXPECTED_SAMPLE_NODES = 14 EXPECTED_SAMPLE_RELS = 7 print() print("═" * 60) print(" DRY RUN — Preview of what a full init would create") print("═" * 60) with self.driver.session() as session: # ── Constraints ────────────────────────────────────────── existing_constraint_count = len(list( session.run("SHOW CONSTRAINTS YIELD name RETURN name") )) constraints_to_add = max(0, EXPECTED_CONSTRAINTS - existing_constraint_count) print(f"\n Constraints: {existing_constraint_count} present / " f"{EXPECTED_CONSTRAINTS} expected") print(f" {constraints_to_add} would be created " f"(or skipped via IF NOT EXISTS)") # ── Indexes ────────────────────────────────────────────── # Index count varies as the schema evolves; just report current. existing_indexes = list(session.run( "SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name" )) print(f"\n Indexes: {len(existing_indexes)} present " f"(includes constraint-backed)") print(f" ~30 additional named indexes would be " f"created (or skipped via IF NOT EXISTS)") # ── Total node / relationship counts ───────────────────── total_nodes = session.run( "MATCH (n) RETURN count(n) AS c" ).single()["c"] total_rels = session.run( "MATCH ()-[r]->() RETURN count(r) AS c" ).single()["c"] print(f"\n Current data: {total_nodes} nodes, " f"{total_rels} relationships") print(f" {EXPECTED_SAMPLE_NODES} sample nodes + " f"{EXPECTED_SAMPLE_RELS} sample rels would be MERGEd") # ── Node-type breakdown by team ────────────────────────── print("\n Sample data by team (a full run with --skip-samples=false):") breakdown = [ ("Universal", ["Person:user_main", "Location:location_home"]), ("Personal — Nate", ["Trip:trip_sample_2025"]), ("Personal — Hypatia", ["Book:book_meditations_aurelius", "Topic:topic_stoicism"]), ("Personal — Watson", ["Goal:goal_sample_2025", "EmotionalMemory:memory_sample"]), ("Personal — Garth", ["Account:account_tfsa_sample"]), ("Personal — Shawn", ["Contact:contact_sample_personal", "Communication:comm_sample"]), ("Personal — Cristiano", ["Team:team_arsenal"]), ("Work", ["Client:client_sample_corp", "Skill:skill_cx_strategy", "Topic:topic_ai_in_cx"]), ("Engineering — Scotty", ["Infrastructure:infra_neo4j_prod"]), ] for team, samples in breakdown: print(f" {team}:") for s in samples: print(f" • {s}") print("\n Sample relationships:") rels = [ "(Person:user_main)-[:SUPPORTS]->(Team:team_arsenal)", "(Person:user_main)-[:COMPLETED]->(Book:book_meditations_aurelius)", "(Person:user_main)-[:PURSUING]->(Goal:goal_sample_2025)", "(Book:book_meditations_aurelius)-[:EXPLORES]->(Topic:topic_stoicism)", "(Person:user_main)-[:OWNS]->(Account:account_tfsa_sample)", "(Person:user_main)-[:HAD]->(Communication:comm_sample)", "(Communication:comm_sample)-[:WITH]->(Contact:contact_sample_personal)", ] for r in rels: print(f" • {r}") print() print(" All writes use MERGE + IF NOT EXISTS, so re-running is") print(" idempotent. Nothing has been changed by this dry run.") print("═" * 60) print() def _mask_password(pw): """Mask a password for display: keep first and last char, hide the middle.""" if not pw: return "(empty)" if len(pw) <= 2: return "*" * len(pw) return f"{pw[0]}{'*' * (len(pw) - 2)}{pw[-1]} ({len(pw)} chars)" def get_credentials(args): """ Collect Neo4j credentials by prompting for each value sequentially. For each of URI, username, password: show the current default (from CLI arg, env var, or built-in fallback) in brackets; user hits Enter to accept or types a new value to override. Password prompt uses getpass so it isn't echoed and doesn't land in shell history. Finally, print a summary (password masked) and ask for final confirmation. If the user declines, exit cleanly without touching the database. Priority for each default value: CLI arg > Environment variable > Built-in default """ print() print("─" * 60) print(" Neo4j Connection") print("─" * 60) # URI uri_default = args.uri or os.environ.get("NEO4J_URI") or "bolt://localhost:7687" uri = input(f" Neo4j URI [{uri_default}]: ").strip() or uri_default # Username user_default = args.user or os.environ.get("NEO4J_USER") or "neo4j" user = input(f" Neo4j username [{user_default}]: ").strip() or user_default # Password (always via getpass, never echoed) env_password = os.environ.get("NEO4J_PASSWORD") if env_password: prompt = " Neo4j password [from $NEO4J_PASSWORD, Enter to accept]: " else: prompt = " Neo4j password: " password = getpass.getpass(prompt) or env_password or "" if not password: logger.error("Password is required") sys.exit(1) # Summary + confirm print() print("─" * 60) print(" Connection summary") print("─" * 60) print(f" URI: {uri}") print(f" User: {user}") print(f" Password: {_mask_password(password)}") print("─" * 60) confirm = input("Proceed with these credentials? [Y/n]: ").strip().lower() if confirm and confirm not in ("y", "yes"): logger.info("Cancelled by user. No changes made.") sys.exit(0) return uri, user, password def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="Initialize Neo4j Unified Knowledge Graph schema for all AI assistants", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Interactive prompts %(prog)s --uri bolt://ariel.incus:7687 # Specify URI, prompt for rest %(prog)s --test-only # Run tests without creating schema %(prog)s --skip-samples # Create schema without sample data Environment Variables: NEO4J_URI Bolt connection URI NEO4J_USER Database username NEO4J_PASSWORD Database password (recommended for scripts) Schema Reference: docs/neo4j-unified-schema.md """ ) parser.add_argument( "--uri", "-u", help="Neo4j Bolt URI (default: bolt://localhost:7687)" ) parser.add_argument( "--user", "-U", help="Neo4j username (default: neo4j)" ) parser.add_argument( "--test-only", "-t", action="store_true", help="Only run verification tests, don't create schema" ) parser.add_argument( "--skip-samples", action="store_true", help="Skip creating sample nodes" ) parser.add_argument( "--skip-docs", action="store_true", help="Skip displaying schema documentation" ) parser.add_argument( "--quiet", "-q", action="store_true", help="Reduce output verbosity" ) return parser.parse_args() def prompt_action(args): """ Show the interactive action menu and return the chosen action key. Returns one of: "full", "schema_only", "tests_only", "quit". If a CLI flag pre-selects an action (--test-only, --skip-samples), that takes precedence and the menu is skipped — useful for cron jobs and automation. Otherwise prompt the user. """ # CLI flags take precedence over the interactive menu if args.test_only: return "tests_only" if args.skip_samples: return "schema_only" print() print("─" * 60) print(" What would you like to do?") print("─" * 60) print(" 1) Full init with sample data") print(" constraints + indexes + sample nodes + verification tests") print(" 2) Schema only (no sample data)") print(" constraints + indexes + verification tests") print(" 3) Tests only (read-only, no writes)") print(" runs connection + APOC + basic functional checks") print(" 4) Quit") print() print(" All writes use MERGE + IF NOT EXISTS — running options 1 or 2") print(" against an already-initialized database is safe and idempotent.") print("─" * 60) while True: choice = input("Choice [1-4]: ").strip() if choice == "1": return "full" if choice == "2": return "schema_only" if choice == "3": return "tests_only" if choice == "4" or choice.lower() in ("q", "quit", "exit"): return "quit" print(" Please enter 1, 2, 3, or 4.") def main(): """ Main execution function. Flow: 1. Parse CLI args (mostly to allow defaults and automation overrides). 2. Prompt for URI, username, password — each with a default visible — then show a summary and require [Y/n] confirmation. 3. Open a connection and verify it works. 4. Run a read-only dry-run preview showing what would be created. 5. Present an action menu (full / schema only / tests only / quit). 6. Execute the chosen action. CLI flags (--test-only, --skip-samples) skip the menu and pre-select an action so cron-style automation still works. """ args = parse_args() # Set log level if args.quiet: logging.getLogger().setLevel(logging.WARNING) # Get credentials (interactive prompt + summary + confirm) uri, user, password = get_credentials(args) logger.info(f"Connecting to Neo4j at {uri}...") try: schema = LifeGraphSchema(uri, user, password) except Exception as e: logger.error(f"Failed to create database driver: {e}") sys.exit(1) try: # Verify connection first try: schema.verify_connection() except AuthError: logger.error("✗ Authentication failed - check username/password") sys.exit(1) except ServiceUnavailable: logger.error(f"✗ Cannot connect to Neo4j at {uri}") sys.exit(1) # Dry-run preview: read-only, shows what a full run would create schema.preview_changes() # Decide what to do (CLI flags override the interactive menu) action = prompt_action(args) if action == "quit": logger.info("Cancelled by user. No changes made.") sys.exit(0) if action == "tests_only": success = schema.run_tests(include_schema_tests=False) sys.exit(0 if success else 1) # Both "full" and "schema_only" go through the same provisioning # path; the only difference is whether sample nodes get created. create_samples = (action == "full") # Display schema documentation if not args.skip_docs: schema.document_schema() # Create constraints (includes automatic indexes) logger.info("Creating constraints (79 node types)...") schema.create_constraints() # Create additional indexes logger.info("Creating indexes...") schema.create_indexes() # Create sample nodes to validate schema if create_samples: logger.info("Creating sample nodes...") schema.create_sample_nodes() else: logger.info("Skipping sample nodes (schema-only run).") # Run verification tests (including schema tests) logger.info("Verifying schema...") test_success = schema.run_tests(include_schema_tests=True) # Summary stats = schema.verify_schema() logger.info("=" * 60) logger.info("SCHEMA INITIALIZATION COMPLETE") logger.info("=" * 60) logger.info(f" Constraints: {stats['constraints']}") logger.info(f" Indexes: {stats['indexes']}") logger.info(f" Nodes: {stats['nodes']}") logger.info("=" * 60) if test_success: logger.info("✓ All tests passed!") logger.info("\nUnified graph ready for all 16 assistants.") logger.info("Schema reference: docs/neo4j-unified-schema.md") logger.info("\nNext steps:") logger.info(" 1. Import data (Plex, Calibre, etc.)") logger.info(" 2. Configure MCP servers for each assistant") logger.info(" 3. Update assistant prompts with unified graph sections") else: logger.warning("⚠ Some tests failed - review output above") sys.exit(1) except KeyboardInterrupt: logger.info("\nOperation cancelled by user") sys.exit(130) except Exception as e: logger.error(f"Error during schema initialization: {e}") sys.exit(1) finally: schema.close() if __name__ == "__main__": main()