""" Neo4j Life Graph Schema Initialization ======================================= Creates the foundational schema for a personal knowledge graph used by seven AI assistants: Hypatia, Marcus, Seneca, Nate, Bowie, Bourdain, Cousteau Requirements: pip install neo4j Usage: python neo4j-personal-schema-init.py python neo4j-personal-schema-init.py --uri bolt://ariel.incus:7687 python neo4j-personal-schema-init.py --test-only Environment Variables (optional): NEO4J_URI - Bolt URI (default: bolt://localhost:7687) NEO4J_USER - Username (default: neo4j) NEO4J_PASSWORD - Password (will prompt if not set) """ import argparse import getpass import os import sys from neo4j import GraphDatabase from neo4j.exceptions import AuthError, ServiceUnavailable import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class LifeGraphSchema: def __init__(self, uri, user, password): """Initialize connection to Neo4j database""" self.driver = GraphDatabase.driver(uri, auth=(user, password)) self.uri = uri def close(self): """Close the database connection""" self.driver.close() def verify_connection(self): """ Verify the connection to Neo4j is working. Returns True if successful, raises exception otherwise. """ with self.driver.session() as session: result = session.run("RETURN 1 AS test") record = result.single() if record and record["test"] == 1: logger.info(f"✓ Connected to Neo4j at {self.uri}") return True raise ConnectionError("Failed to verify Neo4j connection") def create_constraints(self): """ Create uniqueness constraints on key node properties. This ensures data integrity and creates indexes automatically. """ constraints = [ # Core entities "CREATE CONSTRAINT person_id IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE", "CREATE CONSTRAINT location_id IF NOT EXISTS FOR (l:Location) REQUIRE l.id IS UNIQUE", "CREATE CONSTRAINT event_id IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE", # Media types (Bowie, Bourdain, Hypatia domains) "CREATE CONSTRAINT book_id IF NOT EXISTS FOR (b:Book) REQUIRE b.id IS UNIQUE", "CREATE CONSTRAINT film_id IF NOT EXISTS FOR (f:Film) REQUIRE f.id IS UNIQUE", "CREATE CONSTRAINT music_id IF NOT EXISTS FOR (m:Music) REQUIRE m.id IS UNIQUE", "CREATE CONSTRAINT recipe_id IF NOT EXISTS FOR (r:Recipe) REQUIRE r.id IS UNIQUE", # Activity/Practice nodes "CREATE CONSTRAINT training_id IF NOT EXISTS FOR (t:Training) REQUIRE t.id IS UNIQUE", "CREATE CONSTRAINT trip_id IF NOT EXISTS FOR (t:Trip) REQUIRE t.id IS UNIQUE", "CREATE CONSTRAINT reflection_id IF NOT EXISTS FOR (r:Reflection) REQUIRE r.id IS UNIQUE", # Knowledge/Learning (Hypatia domain) "CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (t:Topic) REQUIRE t.id IS UNIQUE", "CREATE CONSTRAINT concept_id IF NOT EXISTS FOR (c:Concept) REQUIRE c.id IS UNIQUE", # Nature (Cousteau domain) "CREATE CONSTRAINT species_id IF NOT EXISTS FOR (s:Species) REQUIRE s.id IS UNIQUE", "CREATE CONSTRAINT plant_id IF NOT EXISTS FOR (p:Plant) REQUIRE p.id IS UNIQUE", ] with self.driver.session() as session: for constraint in constraints: try: session.run(constraint) logger.info(f"Created constraint: {constraint.split('FOR')[1].split('REQUIRE')[0].strip()}") except Exception as e: logger.warning(f"Constraint may already exist: {e}") def create_indexes(self): """ Create indexes for frequently queried properties. These improve query performance for searches and filters. """ indexes = [ # Text search indexes "CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.name)", "CREATE INDEX location_name IF NOT EXISTS FOR (l:Location) ON (l.name)", "CREATE INDEX book_title IF NOT EXISTS FOR (b:Book) ON (b.title)", "CREATE INDEX film_title IF NOT EXISTS FOR (f:Film) ON (f.title)", "CREATE INDEX music_title IF NOT EXISTS FOR (m:Music) ON (m.title)", "CREATE INDEX recipe_name IF NOT EXISTS FOR (r:Recipe) ON (r.name)", # Date-based indexes for temporal queries "CREATE INDEX event_date IF NOT EXISTS FOR (e:Event) ON (e.date)", "CREATE INDEX training_date IF NOT EXISTS FOR (t:Training) ON (t.date)", "CREATE INDEX trip_start IF NOT EXISTS FOR (t:Trip) ON (t.start_date)", "CREATE INDEX reflection_date IF NOT EXISTS FOR (r:Reflection) ON (r.date)", # Category/type indexes for filtering "CREATE INDEX event_type IF NOT EXISTS FOR (e:Event) ON (e.type)", "CREATE INDEX location_category IF NOT EXISTS FOR (l:Location) ON (l.category)", "CREATE INDEX music_genre IF NOT EXISTS FOR (m:Music) ON (m.genre)", ] with self.driver.session() as session: for index in indexes: try: session.run(index) logger.info(f"Created index: {index.split('FOR')[1].split('ON')[0].strip()}") except Exception as e: logger.warning(f"Index may already exist: {e}") def verify_schema(self): """ Verify that constraints and indexes were created successfully. Returns a dict with counts and status. """ results = {"constraints": 0, "indexes": 0, "nodes": 0, "success": True} with self.driver.session() as session: # Count constraints constraint_result = session.run("SHOW CONSTRAINTS") constraints = list(constraint_result) results["constraints"] = len(constraints) # Count indexes (excluding constraint-created ones) index_result = session.run("SHOW INDEXES WHERE type = 'RANGE'") indexes = list(index_result) results["indexes"] = len(indexes) # Count nodes node_result = session.run("MATCH (n) RETURN count(n) AS count") results["nodes"] = node_result.single()["count"] return results def run_tests(self, include_schema_tests=True): """ Run comprehensive tests to verify schema and APOC functionality. Returns True if all tests pass, False otherwise. Args: include_schema_tests: If True, also verify constraints/indexes exist """ tests_passed = 0 tests_failed = 0 test_cases = [ ("Connection test", "RETURN 1 AS result", lambda r: r.single()["result"] == 1), ("APOC available", "RETURN apoc.version() AS version", lambda r: r.single()["version"] is not None), ("Create test node", "CREATE (t:_Test {id: 'test_' + toString(timestamp())}) RETURN t.id AS id", lambda r: r.single()["id"] is not None), ("Query test node", "MATCH (t:_Test) RETURN count(t) AS count", lambda r: r.single()["count"] >= 1), ("APOC collection functions", "RETURN apoc.coll.sum([1,2,3]) AS total", lambda r: r.single()["total"] == 6), ("APOC date functions", "RETURN apoc.date.format(timestamp(), 'ms', 'yyyy-MM-dd') AS today", lambda r: len(r.single()["today"]) == 10), ] # Schema-specific tests (only run after schema creation) schema_tests = [ ("Constraint exists (Person)", "SHOW CONSTRAINTS WHERE name = 'person_id'", lambda r: len(list(r)) == 1), ("Index exists (person_name)", "SHOW INDEXES WHERE name = 'person_name'", lambda r: len(list(r)) == 1), ] if include_schema_tests: test_cases.extend(schema_tests) logger.info("\n" + "=" * 60) logger.info("RUNNING SCHEMA VERIFICATION TESTS") logger.info("=" * 60) with self.driver.session() as session: for test_name, query, validator in test_cases: try: result = session.run(query) if validator(result): logger.info(f" ✓ {test_name}") tests_passed += 1 else: logger.error(f" ✗ {test_name} - Validation failed") tests_failed += 1 except Exception as e: logger.error(f" ✗ {test_name} - {e}") tests_failed += 1 # Cleanup test nodes try: session.run("MATCH (t:_Test) DELETE t") logger.info(" ✓ Cleanup test nodes") except Exception as e: logger.warning(f" ⚠ Cleanup failed: {e}") logger.info("=" * 60) logger.info(f"Tests: {tests_passed} passed, {tests_failed} failed") logger.info("=" * 60 + "\n") return tests_failed == 0 def create_sample_nodes(self): """ Create sample nodes to demonstrate the schema. Replace this with your actual data import logic. """ queries = [ # Central person node (you) """ MERGE (p:Person {id: 'user_main'}) SET p.name = 'Main User', p.relationship_type = 'self', p.created_at = datetime() """, # Sample interest/preference """ MERGE (i:Interest {id: 'interest_cooking'}) SET i.category = 'culinary', i.name = 'Cooking', i.intensity = 'high', i.notes = 'Especially interested in techniques and cultural context' """, # Sample location """ MERGE (l:Location {id: 'location_costarica'}) SET l.name = 'Costa Rica', l.country = 'Costa Rica', l.category = 'travel_destination', l.notes = 'Planning future trip' """, ] with self.driver.session() as session: for query in queries: session.run(query) logger.info("Created sample nodes") def document_schema(self): """ Document the schema design for reference. This prints the node types and their intended use by each assistant. """ schema_doc = """ ════════════════════════════════════════════════════════════════ LIFE GRAPH SCHEMA - NODE TYPES AND ASSISTANT RESPONSIBILITIES ════════════════════════════════════════════════════════════════ CORE NODES (Used by all assistants): ──────────────────────────────────────────────────────────────── Person - People in your life (family, friends, contacts) Properties: name, relationship_type, birthday, contact_info, notes Location - Places (home, travel, favorites) Properties: name, city, country, coordinates, category, notes Event - Life events (vacations, gatherings, milestones) Properties: name, date, location, description, type Interest - Preferences, hobbies, goals Properties: category, name, intensity, notes ════════════════════════════════════════════════════════════════ HYPATIA (Learning & Knowledge): ──────────────────────────────────────────────────────────────── Book - Books read or to-read Properties: title, author, isbn, status, rating, date_started, date_finished, notes Topic - Subject areas of study Properties: name, field, depth, resources Concept - Ideas and principles learned Properties: name, definition, examples, connections ════════════════════════════════════════════════════════════════ MARCUS (Fitness & Training): ──────────────────────────────────────────────────────────────── Training - Individual workout sessions Properties: date, type, duration, exercises, volume, intensity, notes, feeling Exercise - Specific movements/activities Properties: name, category, equipment, target_muscles, technique_notes ════════════════════════════════════════════════════════════════ SENECA (Reflection & Wellness): ──────────────────────────────────────────────────────────────── Reflection - Journal entries and insights Properties: date, content, mood, themes, insights, questions Goal - Life objectives and aspirations Properties: name, category, timeline, status, progress, reflections ════════════════════════════════════════════════════════════════ NATE (Travel & Adventure): ──────────────────────────────────────────────────────────────── Trip - Travel plans and experiences Properties: name, start_date, end_date, destinations, purpose, budget, highlights Activity - Things to do at destinations Properties: name, type, location, cost, difficulty, notes ════════════════════════════════════════════════════════════════ BOWIE (Arts, Culture & Style): ──────────────────────────────────────────────────────────────── Film - Movies and TV shows Properties: title, year, director, genre, status, rating, date_watched, notes Music - Songs, albums, artists Properties: title, artist, album, genre, year, rating, play_count, notes Artwork - Visual art, exhibitions, collections Properties: title, artist, medium, year, location, notes ════════════════════════════════════════════════════════════════ BOURDAIN (Food & Drink): ──────────────────────────────────────────────────────────────── Recipe - Dishes to cook Properties: name, cuisine, difficulty, time, ingredients, instructions, source, notes Restaurant - Dining destinations Properties: name, location, cuisine, price_range, rating, dishes_tried, notes Ingredient - Foods and cooking components Properties: name, category, season, source, substitutes, notes ════════════════════════════════════════════════════════════════ COUSTEAU (Nature & Living Things): ──────────────────────────────────────────────────────────────── Species - Animals, fish, marine life Properties: name, scientific_name, category, habitat, conservation_status, notes Plant - Garden plants, houseplants Properties: name, scientific_name, type, care_requirements, location, health_status Ecosystem - Environments and habitats Properties: name, type, location, characteristics, species_present, conservation_notes ════════════════════════════════════════════════════════════════ KEY RELATIONSHIP PATTERNS: ──────────────────────────────────────────────────────────────── Cross-domain connections: - Training -[PREPARATION_FOR]-> Trip - Reflection -[ABOUT]-> Event/Training/Trip - Book -[INSPIRED]-> Trip/Recipe/Concept - Recipe -[FROM_LOCATION]-> Location - Music -[PLAYED_AT]-> Event/Location - Film -[SET_IN]-> Location - Species -[OBSERVED_AT]-> Location - Plant -[GROWS_IN]-> Location Personal connections: - Person -[ATTENDED]-> Event - Person -[TRAVELED_WITH]-> Trip - Person -[TRAINED_WITH]-> Training - Person -[SHARED_MEAL]-> Recipe/Restaurant - Person -[RECOMMENDED]-> Book/Film/Music/Restaurant Learning connections: - Book -[ABOUT]-> Topic - Topic -[CONTAINS]-> Concept - Concept -[RELATES_TO]-> Concept - Training -[TEACHES]-> Concept (movement patterns, discipline) ════════════════════════════════════════════════════════════════ """ print(schema_doc) logger.info("Schema documentation displayed") def get_credentials(args): """ Collect Neo4j credentials from environment variables, CLI args, or prompts. Priority: CLI args > Environment variables > Interactive prompts """ # URI uri = args.uri or os.environ.get("NEO4J_URI") if not uri: uri = input("Neo4j URI [bolt://localhost:7687]: ").strip() if not uri: uri = "bolt://localhost:7687" # Username user = args.user or os.environ.get("NEO4J_USER") if not user: user = input("Neo4j username [neo4j]: ").strip() if not user: user = "neo4j" # Password (never from CLI for security) password = os.environ.get("NEO4J_PASSWORD") if not password: password = getpass.getpass("Neo4j password: ") if not password: logger.error("Password is required") sys.exit(1) return uri, user, password def parse_args(): """Parse command line arguments""" parser = argparse.ArgumentParser( description="Initialize Neo4j Life Graph schema for AI assistants", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Interactive prompts %(prog)s --uri bolt://ariel.incus:7687 # Specify URI, prompt for rest %(prog)s --test-only # Run tests without creating schema %(prog)s --skip-samples # Create schema without sample data Environment Variables: NEO4J_URI Bolt connection URI NEO4J_USER Database username NEO4J_PASSWORD Database password (recommended for scripts) """ ) parser.add_argument( "--uri", "-u", help="Neo4j Bolt URI (default: bolt://localhost:7687)" ) parser.add_argument( "--user", "-U", help="Neo4j username (default: neo4j)" ) parser.add_argument( "--test-only", "-t", action="store_true", help="Only run verification tests, don't create schema" ) parser.add_argument( "--skip-samples", action="store_true", help="Skip creating sample nodes" ) parser.add_argument( "--skip-docs", action="store_true", help="Skip displaying schema documentation" ) parser.add_argument( "--quiet", "-q", action="store_true", help="Reduce output verbosity" ) return parser.parse_args() def main(): """ Main execution function. Collects credentials via prompts or environment variables. """ args = parse_args() # Set log level if args.quiet: logging.getLogger().setLevel(logging.WARNING) # Get credentials uri, user, password = get_credentials(args) logger.info(f"Connecting to Neo4j at {uri}...") try: schema = LifeGraphSchema(uri, user, password) except Exception as e: logger.error(f"Failed to create database driver: {e}") sys.exit(1) try: # Verify connection first try: schema.verify_connection() except AuthError: logger.error("✗ Authentication failed - check username/password") sys.exit(1) except ServiceUnavailable: logger.error(f"✗ Cannot connect to Neo4j at {uri}") sys.exit(1) if args.test_only: # Just run basic tests (no schema verification) success = schema.run_tests(include_schema_tests=False) sys.exit(0 if success else 1) # Display schema documentation if not args.skip_docs: schema.document_schema() # Create constraints (includes automatic indexes) logger.info("Creating constraints...") schema.create_constraints() # Create additional indexes logger.info("Creating indexes...") schema.create_indexes() # Create sample nodes to validate schema if not args.skip_samples: logger.info("Creating sample nodes...") schema.create_sample_nodes() # Run verification tests (including schema tests) logger.info("Verifying schema...") test_success = schema.run_tests(include_schema_tests=True) # Summary stats = schema.verify_schema() logger.info("=" * 60) logger.info("SCHEMA INITIALIZATION COMPLETE") logger.info("=" * 60) logger.info(f" Constraints: {stats['constraints']}") logger.info(f" Indexes: {stats['indexes']}") logger.info(f" Nodes: {stats['nodes']}") logger.info("=" * 60) if test_success: logger.info("✓ All tests passed!") logger.info("\nNext steps:") logger.info(" 1. Import your Plex library (Film, Music nodes)") logger.info(" 2. Import your Calibre library (Book nodes)") logger.info(" 3. Configure your AI assistants to write to this graph") else: logger.warning("⚠ Some tests failed - review output above") sys.exit(1) except KeyboardInterrupt: logger.info("\nOperation cancelled by user") sys.exit(130) except Exception as e: logger.error(f"Error during schema initialization: {e}") sys.exit(1) finally: schema.close() if __name__ == "__main__": main()