- Introduced `neo4j-schema-init.py` for creating the foundational schema for the personal knowledge graph used by multiple AI assistants. - Implemented functionality for creating constraints, indexes, and sample nodes, along with comprehensive testing of the schema. - Added `neo4j-validate.py` to perform validation checks on the Neo4j knowledge graph, including constraints, indexes, sample nodes, relationships, and junk data detection. - Enhanced logging for better traceability and debugging during schema initialization and validation processes.
588 lines
25 KiB
Python
588 lines
25 KiB
Python
"""
|
|
Neo4j Life Graph Schema Initialization
|
|
=======================================
|
|
Creates the foundational schema for a personal knowledge graph used by
|
|
seven AI assistants: Hypatia, Marcus, Seneca, Nate, Bowie, Bourdain, Cousteau
|
|
|
|
Requirements:
|
|
pip install neo4j
|
|
|
|
Usage:
|
|
python neo4j-personal-schema-init.py
|
|
python neo4j-personal-schema-init.py --uri bolt://ariel.incus:7687
|
|
python neo4j-personal-schema-init.py --test-only
|
|
|
|
Environment Variables (optional):
|
|
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
|
|
NEO4J_USER - Username (default: neo4j)
|
|
NEO4J_PASSWORD - Password (will prompt if not set)
|
|
"""
|
|
|
|
import argparse
|
|
import getpass
|
|
import os
|
|
import sys
|
|
from neo4j import GraphDatabase
|
|
from neo4j.exceptions import AuthError, ServiceUnavailable
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class LifeGraphSchema:
|
|
def __init__(self, uri, user, password):
|
|
"""Initialize connection to Neo4j database"""
|
|
self.driver = GraphDatabase.driver(uri, auth=(user, password))
|
|
self.uri = uri
|
|
|
|
def close(self):
|
|
"""Close the database connection"""
|
|
self.driver.close()
|
|
|
|
def verify_connection(self):
|
|
"""
|
|
Verify the connection to Neo4j is working.
|
|
Returns True if successful, raises exception otherwise.
|
|
"""
|
|
with self.driver.session() as session:
|
|
result = session.run("RETURN 1 AS test")
|
|
record = result.single()
|
|
if record and record["test"] == 1:
|
|
logger.info(f"✓ Connected to Neo4j at {self.uri}")
|
|
return True
|
|
raise ConnectionError("Failed to verify Neo4j connection")
|
|
|
|
def create_constraints(self):
|
|
"""
|
|
Create uniqueness constraints on key node properties.
|
|
This ensures data integrity and creates indexes automatically.
|
|
"""
|
|
constraints = [
|
|
# Core entities
|
|
"CREATE CONSTRAINT person_id IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE",
|
|
"CREATE CONSTRAINT location_id IF NOT EXISTS FOR (l:Location) REQUIRE l.id IS UNIQUE",
|
|
"CREATE CONSTRAINT event_id IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE",
|
|
|
|
# Media types (Bowie, Bourdain, Hypatia domains)
|
|
"CREATE CONSTRAINT book_id IF NOT EXISTS FOR (b:Book) REQUIRE b.id IS UNIQUE",
|
|
"CREATE CONSTRAINT film_id IF NOT EXISTS FOR (f:Film) REQUIRE f.id IS UNIQUE",
|
|
"CREATE CONSTRAINT music_id IF NOT EXISTS FOR (m:Music) REQUIRE m.id IS UNIQUE",
|
|
"CREATE CONSTRAINT recipe_id IF NOT EXISTS FOR (r:Recipe) REQUIRE r.id IS UNIQUE",
|
|
|
|
# Activity/Practice nodes
|
|
"CREATE CONSTRAINT training_id IF NOT EXISTS FOR (t:Training) REQUIRE t.id IS UNIQUE",
|
|
"CREATE CONSTRAINT trip_id IF NOT EXISTS FOR (t:Trip) REQUIRE t.id IS UNIQUE",
|
|
"CREATE CONSTRAINT reflection_id IF NOT EXISTS FOR (r:Reflection) REQUIRE r.id IS UNIQUE",
|
|
|
|
# Knowledge/Learning (Hypatia domain)
|
|
"CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (t:Topic) REQUIRE t.id IS UNIQUE",
|
|
"CREATE CONSTRAINT concept_id IF NOT EXISTS FOR (c:Concept) REQUIRE c.id IS UNIQUE",
|
|
|
|
# Nature (Cousteau domain)
|
|
"CREATE CONSTRAINT species_id IF NOT EXISTS FOR (s:Species) REQUIRE s.id IS UNIQUE",
|
|
"CREATE CONSTRAINT plant_id IF NOT EXISTS FOR (p:Plant) REQUIRE p.id IS UNIQUE",
|
|
]
|
|
|
|
with self.driver.session() as session:
|
|
for constraint in constraints:
|
|
try:
|
|
session.run(constraint)
|
|
logger.info(f"Created constraint: {constraint.split('FOR')[1].split('REQUIRE')[0].strip()}")
|
|
except Exception as e:
|
|
logger.warning(f"Constraint may already exist: {e}")
|
|
|
|
def create_indexes(self):
|
|
"""
|
|
Create indexes for frequently queried properties.
|
|
These improve query performance for searches and filters.
|
|
"""
|
|
indexes = [
|
|
# Text search indexes
|
|
"CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.name)",
|
|
"CREATE INDEX location_name IF NOT EXISTS FOR (l:Location) ON (l.name)",
|
|
"CREATE INDEX book_title IF NOT EXISTS FOR (b:Book) ON (b.title)",
|
|
"CREATE INDEX film_title IF NOT EXISTS FOR (f:Film) ON (f.title)",
|
|
"CREATE INDEX music_title IF NOT EXISTS FOR (m:Music) ON (m.title)",
|
|
"CREATE INDEX recipe_name IF NOT EXISTS FOR (r:Recipe) ON (r.name)",
|
|
|
|
# Date-based indexes for temporal queries
|
|
"CREATE INDEX event_date IF NOT EXISTS FOR (e:Event) ON (e.date)",
|
|
"CREATE INDEX training_date IF NOT EXISTS FOR (t:Training) ON (t.date)",
|
|
"CREATE INDEX trip_start IF NOT EXISTS FOR (t:Trip) ON (t.start_date)",
|
|
"CREATE INDEX reflection_date IF NOT EXISTS FOR (r:Reflection) ON (r.date)",
|
|
|
|
# Category/type indexes for filtering
|
|
"CREATE INDEX event_type IF NOT EXISTS FOR (e:Event) ON (e.type)",
|
|
"CREATE INDEX location_category IF NOT EXISTS FOR (l:Location) ON (l.category)",
|
|
"CREATE INDEX music_genre IF NOT EXISTS FOR (m:Music) ON (m.genre)",
|
|
]
|
|
|
|
with self.driver.session() as session:
|
|
for index in indexes:
|
|
try:
|
|
session.run(index)
|
|
logger.info(f"Created index: {index.split('FOR')[1].split('ON')[0].strip()}")
|
|
except Exception as e:
|
|
logger.warning(f"Index may already exist: {e}")
|
|
|
|
def verify_schema(self):
|
|
"""
|
|
Verify that constraints and indexes were created successfully.
|
|
Returns a dict with counts and status.
|
|
"""
|
|
results = {"constraints": 0, "indexes": 0, "nodes": 0, "success": True}
|
|
|
|
with self.driver.session() as session:
|
|
# Count constraints
|
|
constraint_result = session.run("SHOW CONSTRAINTS")
|
|
constraints = list(constraint_result)
|
|
results["constraints"] = len(constraints)
|
|
|
|
# Count indexes (excluding constraint-created ones)
|
|
index_result = session.run("SHOW INDEXES WHERE type = 'RANGE'")
|
|
indexes = list(index_result)
|
|
results["indexes"] = len(indexes)
|
|
|
|
# Count nodes
|
|
node_result = session.run("MATCH (n) RETURN count(n) AS count")
|
|
results["nodes"] = node_result.single()["count"]
|
|
|
|
return results
|
|
|
|
def run_tests(self, include_schema_tests=True):
|
|
"""
|
|
Run comprehensive tests to verify schema and APOC functionality.
|
|
Returns True if all tests pass, False otherwise.
|
|
|
|
Args:
|
|
include_schema_tests: If True, also verify constraints/indexes exist
|
|
"""
|
|
tests_passed = 0
|
|
tests_failed = 0
|
|
|
|
test_cases = [
|
|
("Connection test", "RETURN 1 AS result", lambda r: r.single()["result"] == 1),
|
|
("APOC available", "RETURN apoc.version() AS version", lambda r: r.single()["version"] is not None),
|
|
("Create test node",
|
|
"CREATE (t:_Test {id: 'test_' + toString(timestamp())}) RETURN t.id AS id",
|
|
lambda r: r.single()["id"] is not None),
|
|
("Query test node",
|
|
"MATCH (t:_Test) RETURN count(t) AS count",
|
|
lambda r: r.single()["count"] >= 1),
|
|
("APOC collection functions",
|
|
"RETURN apoc.coll.sum([1,2,3]) AS total",
|
|
lambda r: r.single()["total"] == 6),
|
|
("APOC date functions",
|
|
"RETURN apoc.date.format(timestamp(), 'ms', 'yyyy-MM-dd') AS today",
|
|
lambda r: len(r.single()["today"]) == 10),
|
|
]
|
|
|
|
# Schema-specific tests (only run after schema creation)
|
|
schema_tests = [
|
|
("Constraint exists (Person)",
|
|
"SHOW CONSTRAINTS WHERE name = 'person_id'",
|
|
lambda r: len(list(r)) == 1),
|
|
("Index exists (person_name)",
|
|
"SHOW INDEXES WHERE name = 'person_name'",
|
|
lambda r: len(list(r)) == 1),
|
|
]
|
|
|
|
if include_schema_tests:
|
|
test_cases.extend(schema_tests)
|
|
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("RUNNING SCHEMA VERIFICATION TESTS")
|
|
logger.info("=" * 60)
|
|
|
|
with self.driver.session() as session:
|
|
for test_name, query, validator in test_cases:
|
|
try:
|
|
result = session.run(query)
|
|
if validator(result):
|
|
logger.info(f" ✓ {test_name}")
|
|
tests_passed += 1
|
|
else:
|
|
logger.error(f" ✗ {test_name} - Validation failed")
|
|
tests_failed += 1
|
|
except Exception as e:
|
|
logger.error(f" ✗ {test_name} - {e}")
|
|
tests_failed += 1
|
|
|
|
# Cleanup test nodes
|
|
try:
|
|
session.run("MATCH (t:_Test) DELETE t")
|
|
logger.info(" ✓ Cleanup test nodes")
|
|
except Exception as e:
|
|
logger.warning(f" ⚠ Cleanup failed: {e}")
|
|
|
|
logger.info("=" * 60)
|
|
logger.info(f"Tests: {tests_passed} passed, {tests_failed} failed")
|
|
logger.info("=" * 60 + "\n")
|
|
|
|
return tests_failed == 0
|
|
|
|
def create_sample_nodes(self):
|
|
"""
|
|
Create sample nodes to demonstrate the schema.
|
|
Replace this with your actual data import logic.
|
|
"""
|
|
queries = [
|
|
# Central person node (you)
|
|
"""
|
|
MERGE (p:Person {id: 'user_main'})
|
|
SET p.name = 'Main User',
|
|
p.relationship_type = 'self',
|
|
p.created_at = datetime()
|
|
""",
|
|
|
|
# Sample interest/preference
|
|
"""
|
|
MERGE (i:Interest {id: 'interest_cooking'})
|
|
SET i.category = 'culinary',
|
|
i.name = 'Cooking',
|
|
i.intensity = 'high',
|
|
i.notes = 'Especially interested in techniques and cultural context'
|
|
""",
|
|
|
|
# Sample location
|
|
"""
|
|
MERGE (l:Location {id: 'location_costarica'})
|
|
SET l.name = 'Costa Rica',
|
|
l.country = 'Costa Rica',
|
|
l.category = 'travel_destination',
|
|
l.notes = 'Planning future trip'
|
|
""",
|
|
]
|
|
|
|
with self.driver.session() as session:
|
|
for query in queries:
|
|
session.run(query)
|
|
logger.info("Created sample nodes")
|
|
|
|
def document_schema(self):
|
|
"""
|
|
Document the schema design for reference.
|
|
This prints the node types and their intended use by each assistant.
|
|
"""
|
|
schema_doc = """
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
LIFE GRAPH SCHEMA - NODE TYPES AND ASSISTANT RESPONSIBILITIES
|
|
════════════════════════════════════════════════════════════════
|
|
|
|
CORE NODES (Used by all assistants):
|
|
────────────────────────────────────────────────────────────────
|
|
Person - People in your life (family, friends, contacts)
|
|
Properties: name, relationship_type, birthday,
|
|
contact_info, notes
|
|
|
|
Location - Places (home, travel, favorites)
|
|
Properties: name, city, country, coordinates,
|
|
category, notes
|
|
|
|
Event - Life events (vacations, gatherings, milestones)
|
|
Properties: name, date, location, description, type
|
|
|
|
Interest - Preferences, hobbies, goals
|
|
Properties: category, name, intensity, notes
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
HYPATIA (Learning & Knowledge):
|
|
────────────────────────────────────────────────────────────────
|
|
Book - Books read or to-read
|
|
Properties: title, author, isbn, status, rating,
|
|
date_started, date_finished, notes
|
|
|
|
Topic - Subject areas of study
|
|
Properties: name, field, depth, resources
|
|
|
|
Concept - Ideas and principles learned
|
|
Properties: name, definition, examples, connections
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
MARCUS (Fitness & Training):
|
|
────────────────────────────────────────────────────────────────
|
|
Training - Individual workout sessions
|
|
Properties: date, type, duration, exercises,
|
|
volume, intensity, notes, feeling
|
|
|
|
Exercise - Specific movements/activities
|
|
Properties: name, category, equipment,
|
|
target_muscles, technique_notes
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
SENECA (Reflection & Wellness):
|
|
────────────────────────────────────────────────────────────────
|
|
Reflection - Journal entries and insights
|
|
Properties: date, content, mood, themes,
|
|
insights, questions
|
|
|
|
Goal - Life objectives and aspirations
|
|
Properties: name, category, timeline, status,
|
|
progress, reflections
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
NATE (Travel & Adventure):
|
|
────────────────────────────────────────────────────────────────
|
|
Trip - Travel plans and experiences
|
|
Properties: name, start_date, end_date,
|
|
destinations, purpose, budget, highlights
|
|
|
|
Activity - Things to do at destinations
|
|
Properties: name, type, location, cost,
|
|
difficulty, notes
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
BOWIE (Arts, Culture & Style):
|
|
────────────────────────────────────────────────────────────────
|
|
Film - Movies and TV shows
|
|
Properties: title, year, director, genre,
|
|
status, rating, date_watched, notes
|
|
|
|
Music - Songs, albums, artists
|
|
Properties: title, artist, album, genre, year,
|
|
rating, play_count, notes
|
|
|
|
Artwork - Visual art, exhibitions, collections
|
|
Properties: title, artist, medium, year, location,
|
|
notes
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
BOURDAIN (Food & Drink):
|
|
────────────────────────────────────────────────────────────────
|
|
Recipe - Dishes to cook
|
|
Properties: name, cuisine, difficulty, time,
|
|
ingredients, instructions, source, notes
|
|
|
|
Restaurant - Dining destinations
|
|
Properties: name, location, cuisine, price_range,
|
|
rating, dishes_tried, notes
|
|
|
|
Ingredient - Foods and cooking components
|
|
Properties: name, category, season, source,
|
|
substitutes, notes
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
COUSTEAU (Nature & Living Things):
|
|
────────────────────────────────────────────────────────────────
|
|
Species - Animals, fish, marine life
|
|
Properties: name, scientific_name, category,
|
|
habitat, conservation_status, notes
|
|
|
|
Plant - Garden plants, houseplants
|
|
Properties: name, scientific_name, type,
|
|
care_requirements, location, health_status
|
|
|
|
Ecosystem - Environments and habitats
|
|
Properties: name, type, location, characteristics,
|
|
species_present, conservation_notes
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
KEY RELATIONSHIP PATTERNS:
|
|
────────────────────────────────────────────────────────────────
|
|
|
|
Cross-domain connections:
|
|
- Training -[PREPARATION_FOR]-> Trip
|
|
- Reflection -[ABOUT]-> Event/Training/Trip
|
|
- Book -[INSPIRED]-> Trip/Recipe/Concept
|
|
- Recipe -[FROM_LOCATION]-> Location
|
|
- Music -[PLAYED_AT]-> Event/Location
|
|
- Film -[SET_IN]-> Location
|
|
- Species -[OBSERVED_AT]-> Location
|
|
- Plant -[GROWS_IN]-> Location
|
|
|
|
Personal connections:
|
|
- Person -[ATTENDED]-> Event
|
|
- Person -[TRAVELED_WITH]-> Trip
|
|
- Person -[TRAINED_WITH]-> Training
|
|
- Person -[SHARED_MEAL]-> Recipe/Restaurant
|
|
- Person -[RECOMMENDED]-> Book/Film/Music/Restaurant
|
|
|
|
Learning connections:
|
|
- Book -[ABOUT]-> Topic
|
|
- Topic -[CONTAINS]-> Concept
|
|
- Concept -[RELATES_TO]-> Concept
|
|
- Training -[TEACHES]-> Concept (movement patterns, discipline)
|
|
|
|
════════════════════════════════════════════════════════════════
|
|
"""
|
|
|
|
print(schema_doc)
|
|
logger.info("Schema documentation displayed")
|
|
|
|
|
|
def get_credentials(args):
|
|
"""
|
|
Collect Neo4j credentials from environment variables, CLI args, or prompts.
|
|
Priority: CLI args > Environment variables > Interactive prompts
|
|
"""
|
|
# URI
|
|
uri = args.uri or os.environ.get("NEO4J_URI")
|
|
if not uri:
|
|
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip()
|
|
if not uri:
|
|
uri = "bolt://localhost:7687"
|
|
|
|
# Username
|
|
user = args.user or os.environ.get("NEO4J_USER")
|
|
if not user:
|
|
user = input("Neo4j username [neo4j]: ").strip()
|
|
if not user:
|
|
user = "neo4j"
|
|
|
|
# Password (never from CLI for security)
|
|
password = os.environ.get("NEO4J_PASSWORD")
|
|
if not password:
|
|
password = getpass.getpass("Neo4j password: ")
|
|
if not password:
|
|
logger.error("Password is required")
|
|
sys.exit(1)
|
|
|
|
return uri, user, password
|
|
|
|
|
|
def parse_args():
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Initialize Neo4j Life Graph schema for AI assistants",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s # Interactive prompts
|
|
%(prog)s --uri bolt://ariel.incus:7687 # Specify URI, prompt for rest
|
|
%(prog)s --test-only # Run tests without creating schema
|
|
%(prog)s --skip-samples # Create schema without sample data
|
|
|
|
Environment Variables:
|
|
NEO4J_URI Bolt connection URI
|
|
NEO4J_USER Database username
|
|
NEO4J_PASSWORD Database password (recommended for scripts)
|
|
"""
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--uri", "-u",
|
|
help="Neo4j Bolt URI (default: bolt://localhost:7687)"
|
|
)
|
|
parser.add_argument(
|
|
"--user", "-U",
|
|
help="Neo4j username (default: neo4j)"
|
|
)
|
|
parser.add_argument(
|
|
"--test-only", "-t",
|
|
action="store_true",
|
|
help="Only run verification tests, don't create schema"
|
|
)
|
|
parser.add_argument(
|
|
"--skip-samples",
|
|
action="store_true",
|
|
help="Skip creating sample nodes"
|
|
)
|
|
parser.add_argument(
|
|
"--skip-docs",
|
|
action="store_true",
|
|
help="Skip displaying schema documentation"
|
|
)
|
|
parser.add_argument(
|
|
"--quiet", "-q",
|
|
action="store_true",
|
|
help="Reduce output verbosity"
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
"""
|
|
Main execution function.
|
|
Collects credentials via prompts or environment variables.
|
|
"""
|
|
args = parse_args()
|
|
|
|
# Set log level
|
|
if args.quiet:
|
|
logging.getLogger().setLevel(logging.WARNING)
|
|
|
|
# Get credentials
|
|
uri, user, password = get_credentials(args)
|
|
|
|
logger.info(f"Connecting to Neo4j at {uri}...")
|
|
|
|
try:
|
|
schema = LifeGraphSchema(uri, user, password)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create database driver: {e}")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
# Verify connection first
|
|
try:
|
|
schema.verify_connection()
|
|
except AuthError:
|
|
logger.error("✗ Authentication failed - check username/password")
|
|
sys.exit(1)
|
|
except ServiceUnavailable:
|
|
logger.error(f"✗ Cannot connect to Neo4j at {uri}")
|
|
sys.exit(1)
|
|
|
|
if args.test_only:
|
|
# Just run basic tests (no schema verification)
|
|
success = schema.run_tests(include_schema_tests=False)
|
|
sys.exit(0 if success else 1)
|
|
|
|
# Display schema documentation
|
|
if not args.skip_docs:
|
|
schema.document_schema()
|
|
|
|
# Create constraints (includes automatic indexes)
|
|
logger.info("Creating constraints...")
|
|
schema.create_constraints()
|
|
|
|
# Create additional indexes
|
|
logger.info("Creating indexes...")
|
|
schema.create_indexes()
|
|
|
|
# Create sample nodes to validate schema
|
|
if not args.skip_samples:
|
|
logger.info("Creating sample nodes...")
|
|
schema.create_sample_nodes()
|
|
|
|
# Run verification tests (including schema tests)
|
|
logger.info("Verifying schema...")
|
|
test_success = schema.run_tests(include_schema_tests=True)
|
|
|
|
# Summary
|
|
stats = schema.verify_schema()
|
|
logger.info("=" * 60)
|
|
logger.info("SCHEMA INITIALIZATION COMPLETE")
|
|
logger.info("=" * 60)
|
|
logger.info(f" Constraints: {stats['constraints']}")
|
|
logger.info(f" Indexes: {stats['indexes']}")
|
|
logger.info(f" Nodes: {stats['nodes']}")
|
|
logger.info("=" * 60)
|
|
|
|
if test_success:
|
|
logger.info("✓ All tests passed!")
|
|
logger.info("\nNext steps:")
|
|
logger.info(" 1. Import your Plex library (Film, Music nodes)")
|
|
logger.info(" 2. Import your Calibre library (Book nodes)")
|
|
logger.info(" 3. Configure your AI assistants to write to this graph")
|
|
else:
|
|
logger.warning("⚠ Some tests failed - review output above")
|
|
sys.exit(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\nOperation cancelled by user")
|
|
sys.exit(130)
|
|
except Exception as e:
|
|
logger.error(f"Error during schema initialization: {e}")
|
|
sys.exit(1)
|
|
finally:
|
|
schema.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|