Add Neo4j schema initialization and validation scripts

- Introduced `neo4j-schema-init.py` for creating the foundational schema for the personal knowledge graph used by multiple AI assistants.
- Implemented functionality for creating constraints, indexes, and sample nodes, along with comprehensive testing of the schema.
- Added `neo4j-validate.py` to perform validation checks on the Neo4j knowledge graph, including constraints, indexes, sample nodes, relationships, and junk data detection.
- Enhanced logging for better traceability and debugging during schema initialization and validation processes.
This commit is contained in:
2026-03-06 14:11:52 +00:00
parent b654a04185
commit 7859264359
46 changed files with 11679 additions and 2 deletions

156
utils/neo4j-reset.py Normal file
View File

@@ -0,0 +1,156 @@
"""
Neo4j Database Reset
====================
Wipes all nodes, relationships, constraints, and indexes from the database.
Use before re-running neo4j-schema-init.py for a clean slate.
Usage:
python neo4j-reset.py
python neo4j-reset.py --uri bolt://ariel.incus:7687
python neo4j-reset.py --uri bolt://ariel.incus:7687 --force
Environment Variables (optional):
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
NEO4J_USER - Username (default: neo4j)
NEO4J_PASSWORD - Password (will prompt if not set)
"""
import argparse
import getpass
import os
import sys
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
def get_credentials(args):
uri = args.uri or os.environ.get("NEO4J_URI")
if not uri:
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip() or "bolt://localhost:7687"
user = args.user or os.environ.get("NEO4J_USER")
if not user:
user = input("Neo4j username [neo4j]: ").strip() or "neo4j"
password = os.environ.get("NEO4J_PASSWORD")
if not password:
password = getpass.getpass("Neo4j password: ")
if not password:
print("ERROR: Password is required")
sys.exit(1)
return uri, user, password
def reset_database(driver):
"""Drop all constraints, indexes, and delete all data."""
with driver.session() as session:
# 1. Count what exists before wiping
node_count = session.run("MATCH (n) RETURN count(n) AS c").single()["c"]
rel_count = session.run("MATCH ()-[r]->() RETURN count(r) AS c").single()["c"]
constraints = list(session.run("SHOW CONSTRAINTS YIELD name RETURN name"))
indexes = list(session.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name"))
print(f"\nCurrent database contents:")
print(f" Nodes: {node_count}")
print(f" Relationships:{rel_count}")
print(f" Constraints: {len(constraints)}")
print(f" Indexes: {len(indexes)}")
if node_count == 0 and len(constraints) == 0 and len(indexes) == 0:
print("\nDatabase is already empty. Nothing to reset.")
return
# 2. Drop all constraints
dropped_constraints = 0
for record in constraints:
name = record["name"]
try:
session.run(f"DROP CONSTRAINT {name} IF EXISTS")
dropped_constraints += 1
except Exception as e:
print(f" WARNING: Could not drop constraint {name}: {e}")
print(f"\n Dropped {dropped_constraints} constraints")
# 3. Drop all non-lookup indexes
dropped_indexes = 0
for record in indexes:
name = record["name"]
try:
session.run(f"DROP INDEX {name} IF EXISTS")
dropped_indexes += 1
except Exception as e:
print(f" WARNING: Could not drop index {name}: {e}")
print(f" Dropped {dropped_indexes} indexes")
# 4. Delete all nodes and relationships (batch for large DBs)
deleted = 1
total_deleted = 0
while deleted > 0:
result = session.run(
"MATCH (n) WITH n LIMIT 10000 DETACH DELETE n RETURN count(*) AS deleted"
)
deleted = result.single()["deleted"]
total_deleted += deleted
print(f" Deleted {total_deleted} nodes (and their relationships)")
# 5. Verify clean
remaining = session.run("MATCH (n) RETURN count(n) AS c").single()["c"]
remaining_constraints = len(list(session.run("SHOW CONSTRAINTS")))
remaining_indexes = len(list(session.run(
"SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name"
)))
print(f"\nAfter reset:")
print(f" Nodes: {remaining}")
print(f" Constraints: {remaining_constraints}")
print(f" Indexes: {remaining_indexes}")
if remaining == 0 and remaining_constraints == 0 and remaining_indexes == 0:
print("\n✓ Database is clean. Ready for neo4j-schema-init.py")
else:
print("\n⚠ Some items remain — you may need to run this again")
def main():
parser = argparse.ArgumentParser(
description="Reset Neo4j database — wipe all data, constraints, and indexes"
)
parser.add_argument("--uri", "-u", help="Neo4j Bolt URI")
parser.add_argument("--user", "-U", help="Neo4j username")
parser.add_argument("--force", "-f", action="store_true",
help="Skip confirmation prompt")
args = parser.parse_args()
uri, user, password = get_credentials(args)
try:
driver = GraphDatabase.driver(uri, auth=(user, password))
# Test connection
with driver.session() as session:
session.run("RETURN 1")
print(f"✓ Connected to {uri}")
except AuthError:
print(f"✗ Authentication failed for {uri}")
sys.exit(1)
except ServiceUnavailable:
print(f"✗ Cannot connect to {uri}")
sys.exit(1)
if not args.force:
confirm = input(f"\n⚠ This will DELETE EVERYTHING in {uri}. Type 'yes' to confirm: ")
if confirm.strip().lower() != "yes":
print("Cancelled.")
sys.exit(0)
try:
reset_database(driver)
except Exception as e:
print(f"ERROR: {e}")
sys.exit(1)
finally:
driver.close()
if __name__ == "__main__":
main()

910
utils/neo4j-schema-init.py Normal file
View File

@@ -0,0 +1,910 @@
"""
Neo4j Unified Knowledge Graph Schema Initialization
=====================================================
Creates the foundational schema for a unified knowledge graph used by
fourteen AI assistants across three teams:
Personal Team:
Hypatia (Learning), Marcus (Fitness), Seneca (Reflection),
Nate (Travel), Bowie (Culture), Bourdain (Food),
Cousteau (Nature), Garth (Finance), Cristiano (Football)
Work Team:
Alan (Strategy), Ann (Marketing), Jeffrey (Sales), Jarvis (Execution)
Engineering Team:
Scotty (Infrastructure), Harper (Prototyping)
Schema Reference:
docs/neo4j-unified-schema.md
Requirements:
pip install neo4j
Usage:
python neo4j-schema-init.py
python neo4j-schema-init.py --uri bolt://ariel.incus:7687
python neo4j-schema-init.py --test-only
Environment Variables (optional):
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
NEO4J_USER - Username (default: neo4j)
NEO4J_PASSWORD - Password (will prompt if not set)
"""
import argparse
import getpass
import os
import sys
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LifeGraphSchema:
def __init__(self, uri, user, password):
"""Initialize connection to Neo4j database"""
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.uri = uri
def close(self):
"""Close the database connection"""
self.driver.close()
def verify_connection(self):
"""
Verify the connection to Neo4j is working.
Returns True if successful, raises exception otherwise.
"""
with self.driver.session() as session:
result = session.run("RETURN 1 AS test")
record = result.single()
if record and record["test"] == 1:
logger.info(f"✓ Connected to Neo4j at {self.uri}")
return True
raise ConnectionError("Failed to verify Neo4j connection")
def create_constraints(self):
"""
Create uniqueness constraints on key node properties.
This ensures data integrity and creates indexes automatically.
All 74 node types get an id uniqueness constraint.
"""
constraints = [
# ── Universal nodes ──────────────────────────────────────
"CREATE CONSTRAINT person_id IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT location_id IF NOT EXISTS FOR (n:Location) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT event_id IF NOT EXISTS FOR (n:Event) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (n:Topic) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT goal_id IF NOT EXISTS FOR (n:Goal) REQUIRE n.id IS UNIQUE",
# ── Nate: Travel & Adventure ─────────────────────────────
"CREATE CONSTRAINT trip_id IF NOT EXISTS FOR (n:Trip) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT destination_id IF NOT EXISTS FOR (n:Destination) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT activity_id IF NOT EXISTS FOR (n:Activity) REQUIRE n.id IS UNIQUE",
# ── Hypatia: Learning & Reading ──────────────────────────
"CREATE CONSTRAINT book_id IF NOT EXISTS FOR (n:Book) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT author_id IF NOT EXISTS FOR (n:Author) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT learningpath_id IF NOT EXISTS FOR (n:LearningPath) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT concept_id IF NOT EXISTS FOR (n:Concept) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT quote_id IF NOT EXISTS FOR (n:Quote) REQUIRE n.id IS UNIQUE",
# ── Marcus: Fitness & Training ───────────────────────────
"CREATE CONSTRAINT training_id IF NOT EXISTS FOR (n:Training) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT exercise_id IF NOT EXISTS FOR (n:Exercise) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT program_id IF NOT EXISTS FOR (n:Program) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT personalrecord_id IF NOT EXISTS FOR (n:PersonalRecord) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT bodymetric_id IF NOT EXISTS FOR (n:BodyMetric) REQUIRE n.id IS UNIQUE",
# ── Seneca: Reflection & Wellness ────────────────────────
"CREATE CONSTRAINT reflection_id IF NOT EXISTS FOR (n:Reflection) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT value_id IF NOT EXISTS FOR (n:Value) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT habit_id IF NOT EXISTS FOR (n:Habit) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT lifeevent_id IF NOT EXISTS FOR (n:LifeEvent) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT intention_id IF NOT EXISTS FOR (n:Intention) REQUIRE n.id IS UNIQUE",
# ── Bourdain: Food & Cooking ─────────────────────────────
"CREATE CONSTRAINT recipe_id IF NOT EXISTS FOR (n:Recipe) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT restaurant_id IF NOT EXISTS FOR (n:Restaurant) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT ingredient_id IF NOT EXISTS FOR (n:Ingredient) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT meal_id IF NOT EXISTS FOR (n:Meal) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT technique_id IF NOT EXISTS FOR (n:Technique) REQUIRE n.id IS UNIQUE",
# ── Bowie: Arts & Culture ────────────────────────────────
"CREATE CONSTRAINT music_id IF NOT EXISTS FOR (n:Music) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT film_id IF NOT EXISTS FOR (n:Film) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT artwork_id IF NOT EXISTS FOR (n:Artwork) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT playlist_id IF NOT EXISTS FOR (n:Playlist) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT artist_id IF NOT EXISTS FOR (n:Artist) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT style_id IF NOT EXISTS FOR (n:Style) REQUIRE n.id IS UNIQUE",
# ── Cousteau: Nature & Living Things ─────────────────────
"CREATE CONSTRAINT species_id IF NOT EXISTS FOR (n:Species) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT plant_id IF NOT EXISTS FOR (n:Plant) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT tank_id IF NOT EXISTS FOR (n:Tank) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT garden_id IF NOT EXISTS FOR (n:Garden) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT ecosystem_id IF NOT EXISTS FOR (n:Ecosystem) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT observation_id IF NOT EXISTS FOR (n:Observation) REQUIRE n.id IS UNIQUE",
# ── Garth: Personal Finance ──────────────────────────────
"CREATE CONSTRAINT account_id IF NOT EXISTS FOR (n:Account) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT investment_id IF NOT EXISTS FOR (n:Investment) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT asset_id IF NOT EXISTS FOR (n:Asset) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT liability_id IF NOT EXISTS FOR (n:Liability) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT budget_id IF NOT EXISTS FOR (n:Budget) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT financialgoal_id IF NOT EXISTS FOR (n:FinancialGoal) REQUIRE n.id IS UNIQUE",
# ── Cristiano: Football ───────────────────────────────────
"CREATE CONSTRAINT match_id IF NOT EXISTS FOR (n:Match) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT team_id IF NOT EXISTS FOR (n:Team) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT league_id IF NOT EXISTS FOR (n:League) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT tournament_id IF NOT EXISTS FOR (n:Tournament) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT player_id IF NOT EXISTS FOR (n:Player) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT season_id IF NOT EXISTS FOR (n:Season) REQUIRE n.id IS UNIQUE",
# ── Work: Business ───────────────────────────────────────
"CREATE CONSTRAINT client_id IF NOT EXISTS FOR (n:Client) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT contact_id IF NOT EXISTS FOR (n:Contact) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT opportunity_id IF NOT EXISTS FOR (n:Opportunity) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT proposal_id IF NOT EXISTS FOR (n:Proposal) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT project_id IF NOT EXISTS FOR (n:Project) REQUIRE n.id IS UNIQUE",
# ── Work: Market Intelligence ────────────────────────────
"CREATE CONSTRAINT vendor_id IF NOT EXISTS FOR (n:Vendor) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT competitor_id IF NOT EXISTS FOR (n:Competitor) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT markettrend_id IF NOT EXISTS FOR (n:MarketTrend) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT technology_id IF NOT EXISTS FOR (n:Technology) REQUIRE n.id IS UNIQUE",
# ── Work: Content & Visibility ───────────────────────────
"CREATE CONSTRAINT content_id IF NOT EXISTS FOR (n:Content) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT publication_id IF NOT EXISTS FOR (n:Publication) REQUIRE n.id IS UNIQUE",
# ── Work: Professional Development ───────────────────────
"CREATE CONSTRAINT skill_id IF NOT EXISTS FOR (n:Skill) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT certification_id IF NOT EXISTS FOR (n:Certification) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT relationship_id IF NOT EXISTS FOR (n:Relationship) REQUIRE n.id IS UNIQUE",
# ── Work: Daily Operations ───────────────────────────────
"CREATE CONSTRAINT task_id IF NOT EXISTS FOR (n:Task) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT meeting_id IF NOT EXISTS FOR (n:Meeting) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT note_id IF NOT EXISTS FOR (n:Note) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT decision_id IF NOT EXISTS FOR (n:Decision) REQUIRE n.id IS UNIQUE",
# ── Engineering: Scotty ──────────────────────────────────
"CREATE CONSTRAINT infrastructure_id IF NOT EXISTS FOR (n:Infrastructure) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT incident_id IF NOT EXISTS FOR (n:Incident) REQUIRE n.id IS UNIQUE",
# ── Engineering: Harper ──────────────────────────────────
"CREATE CONSTRAINT prototype_id IF NOT EXISTS FOR (n:Prototype) REQUIRE n.id IS UNIQUE",
"CREATE CONSTRAINT experiment_id IF NOT EXISTS FOR (n:Experiment) REQUIRE n.id IS UNIQUE",
]
with self.driver.session() as session:
created = 0
for constraint in constraints:
try:
session.run(constraint)
name = constraint.split("CONSTRAINT")[1].split("IF")[0].strip()
logger.info(f" ✓ Constraint: {name}")
created += 1
except Exception as e:
logger.warning(f" ⚠ Constraint may already exist: {e}")
logger.info(f"Constraints processed: {created}/{len(constraints)}")
def create_indexes(self):
"""
Create indexes for frequently queried properties.
These improve query performance for searches and filters.
Organized by query pattern: name/title, date, type/status, domain.
"""
indexes = [
# ── Name / Title text search ─────────────────────────────
"CREATE INDEX person_name IF NOT EXISTS FOR (n:Person) ON (n.name)",
"CREATE INDEX location_name IF NOT EXISTS FOR (n:Location) ON (n.name)",
"CREATE INDEX topic_name IF NOT EXISTS FOR (n:Topic) ON (n.name)",
"CREATE INDEX goal_name IF NOT EXISTS FOR (n:Goal) ON (n.name)",
"CREATE INDEX book_title IF NOT EXISTS FOR (n:Book) ON (n.title)",
"CREATE INDEX film_title IF NOT EXISTS FOR (n:Film) ON (n.title)",
"CREATE INDEX music_title IF NOT EXISTS FOR (n:Music) ON (n.title)",
"CREATE INDEX artwork_title IF NOT EXISTS FOR (n:Artwork) ON (n.title)",
"CREATE INDEX recipe_name IF NOT EXISTS FOR (n:Recipe) ON (n.name)",
"CREATE INDEX restaurant_name IF NOT EXISTS FOR (n:Restaurant) ON (n.name)",
"CREATE INDEX exercise_name IF NOT EXISTS FOR (n:Exercise) ON (n.name)",
"CREATE INDEX species_name IF NOT EXISTS FOR (n:Species) ON (n.name)",
"CREATE INDEX plant_name IF NOT EXISTS FOR (n:Plant) ON (n.name)",
"CREATE INDEX ingredient_name IF NOT EXISTS FOR (n:Ingredient) ON (n.name)",
"CREATE INDEX artist_name IF NOT EXISTS FOR (n:Artist) ON (n.name)",
"CREATE INDEX author_name IF NOT EXISTS FOR (n:Author) ON (n.name)",
"CREATE INDEX concept_name IF NOT EXISTS FOR (n:Concept) ON (n.name)",
"CREATE INDEX client_name IF NOT EXISTS FOR (n:Client) ON (n.name)",
"CREATE INDEX contact_name IF NOT EXISTS FOR (n:Contact) ON (n.name)",
"CREATE INDEX vendor_name IF NOT EXISTS FOR (n:Vendor) ON (n.name)",
"CREATE INDEX competitor_name IF NOT EXISTS FOR (n:Competitor) ON (n.name)",
"CREATE INDEX technology_name IF NOT EXISTS FOR (n:Technology) ON (n.name)",
"CREATE INDEX content_title IF NOT EXISTS FOR (n:Content) ON (n.title)",
"CREATE INDEX skill_name IF NOT EXISTS FOR (n:Skill) ON (n.name)",
"CREATE INDEX task_title IF NOT EXISTS FOR (n:Task) ON (n.title)",
"CREATE INDEX meeting_title IF NOT EXISTS FOR (n:Meeting) ON (n.title)",
"CREATE INDEX infrastructure_name IF NOT EXISTS FOR (n:Infrastructure) ON (n.name)",
"CREATE INDEX prototype_name IF NOT EXISTS FOR (n:Prototype) ON (n.name)",
"CREATE INDEX investment_ticker IF NOT EXISTS FOR (n:Investment) ON (n.ticker)",
"CREATE INDEX match_home IF NOT EXISTS FOR (n:Match) ON (n.home_team)",
"CREATE INDEX match_away IF NOT EXISTS FOR (n:Match) ON (n.away_team)",
"CREATE INDEX team_name IF NOT EXISTS FOR (n:Team) ON (n.name)",
"CREATE INDEX league_name IF NOT EXISTS FOR (n:League) ON (n.name)",
"CREATE INDEX tournament_name IF NOT EXISTS FOR (n:Tournament) ON (n.name)",
"CREATE INDEX player_name IF NOT EXISTS FOR (n:Player) ON (n.name)",
"CREATE INDEX season_team IF NOT EXISTS FOR (n:Season) ON (n.team)",
# ── Date indexes for temporal queries ────────────────────
"CREATE INDEX event_date IF NOT EXISTS FOR (n:Event) ON (n.date)",
"CREATE INDEX training_date IF NOT EXISTS FOR (n:Training) ON (n.date)",
"CREATE INDEX trip_start IF NOT EXISTS FOR (n:Trip) ON (n.start_date)",
"CREATE INDEX reflection_date IF NOT EXISTS FOR (n:Reflection) ON (n.date)",
"CREATE INDEX observation_date IF NOT EXISTS FOR (n:Observation) ON (n.date)",
"CREATE INDEX meal_date IF NOT EXISTS FOR (n:Meal) ON (n.date)",
"CREATE INDEX meeting_date IF NOT EXISTS FOR (n:Meeting) ON (n.date)",
"CREATE INDEX task_due IF NOT EXISTS FOR (n:Task) ON (n.due_date)",
"CREATE INDEX note_date IF NOT EXISTS FOR (n:Note) ON (n.date)",
"CREATE INDEX decision_date IF NOT EXISTS FOR (n:Decision) ON (n.date)",
"CREATE INDEX incident_date IF NOT EXISTS FOR (n:Incident) ON (n.date)",
"CREATE INDEX bodymetric_date IF NOT EXISTS FOR (n:BodyMetric) ON (n.date)",
"CREATE INDEX personalrecord_date IF NOT EXISTS FOR (n:PersonalRecord) ON (n.date)",
"CREATE INDEX lifeevent_date IF NOT EXISTS FOR (n:LifeEvent) ON (n.date)",
"CREATE INDEX intention_date IF NOT EXISTS FOR (n:Intention) ON (n.date)",
"CREATE INDEX match_date IF NOT EXISTS FOR (n:Match) ON (n.date)",
# ── Type / Status / Category indexes ─────────────────────
"CREATE INDEX event_type IF NOT EXISTS FOR (n:Event) ON (n.type)",
"CREATE INDEX location_type IF NOT EXISTS FOR (n:Location) ON (n.type)",
"CREATE INDEX activity_type IF NOT EXISTS FOR (n:Activity) ON (n.type)",
"CREATE INDEX training_type IF NOT EXISTS FOR (n:Training) ON (n.type)",
"CREATE INDEX music_genre IF NOT EXISTS FOR (n:Music) ON (n.genre)",
"CREATE INDEX species_category IF NOT EXISTS FOR (n:Species) ON (n.category)",
"CREATE INDEX exercise_category IF NOT EXISTS FOR (n:Exercise) ON (n.category)",
"CREATE INDEX book_status IF NOT EXISTS FOR (n:Book) ON (n.status)",
"CREATE INDEX trip_status IF NOT EXISTS FOR (n:Trip) ON (n.status)",
"CREATE INDEX goal_status IF NOT EXISTS FOR (n:Goal) ON (n.status)",
"CREATE INDEX goal_category IF NOT EXISTS FOR (n:Goal) ON (n.category)",
"CREATE INDEX habit_status IF NOT EXISTS FOR (n:Habit) ON (n.status)",
"CREATE INDEX program_status IF NOT EXISTS FOR (n:Program) ON (n.status)",
"CREATE INDEX client_status IF NOT EXISTS FOR (n:Client) ON (n.status)",
"CREATE INDEX opportunity_status IF NOT EXISTS FOR (n:Opportunity) ON (n.status)",
"CREATE INDEX proposal_status IF NOT EXISTS FOR (n:Proposal) ON (n.status)",
"CREATE INDEX project_status IF NOT EXISTS FOR (n:Project) ON (n.status)",
"CREATE INDEX task_status IF NOT EXISTS FOR (n:Task) ON (n.status)",
"CREATE INDEX task_priority IF NOT EXISTS FOR (n:Task) ON (n.priority)",
"CREATE INDEX content_status IF NOT EXISTS FOR (n:Content) ON (n.status)",
"CREATE INDEX content_type IF NOT EXISTS FOR (n:Content) ON (n.type)",
"CREATE INDEX incident_severity IF NOT EXISTS FOR (n:Incident) ON (n.severity)",
"CREATE INDEX incident_status IF NOT EXISTS FOR (n:Incident) ON (n.status)",
"CREATE INDEX infrastructure_status IF NOT EXISTS FOR (n:Infrastructure) ON (n.status)",
"CREATE INDEX account_type IF NOT EXISTS FOR (n:Account) ON (n.type)",
"CREATE INDEX investment_type IF NOT EXISTS FOR (n:Investment) ON (n.type)",
"CREATE INDEX liability_type IF NOT EXISTS FOR (n:Liability) ON (n.type)",
"CREATE INDEX financialgoal_status IF NOT EXISTS FOR (n:FinancialGoal) ON (n.status)",
"CREATE INDEX skill_category IF NOT EXISTS FOR (n:Skill) ON (n.category)",
"CREATE INDEX skill_level IF NOT EXISTS FOR (n:Skill) ON (n.level)",
"CREATE INDEX vendor_category IF NOT EXISTS FOR (n:Vendor) ON (n.category)",
"CREATE INDEX match_competition IF NOT EXISTS FOR (n:Match) ON (n.competition)",
"CREATE INDEX team_league IF NOT EXISTS FOR (n:Team) ON (n.league)",
"CREATE INDEX player_position IF NOT EXISTS FOR (n:Player) ON (n.position)",
"CREATE INDEX player_team IF NOT EXISTS FOR (n:Player) ON (n.team)",
"CREATE INDEX league_country IF NOT EXISTS FOR (n:League) ON (n.country)",
"CREATE INDEX season_year IF NOT EXISTS FOR (n:Season) ON (n.season_year)",
# ── Domain indexes for cross-team filtering ──────────────
"CREATE INDEX event_domain IF NOT EXISTS FOR (n:Event) ON (n.domain)",
"CREATE INDEX topic_domain IF NOT EXISTS FOR (n:Topic) ON (n.domain)",
"CREATE INDEX goal_domain IF NOT EXISTS FOR (n:Goal) ON (n.domain)",
"CREATE INDEX location_domain IF NOT EXISTS FOR (n:Location) ON (n.domain)",
"CREATE INDEX person_domain IF NOT EXISTS FOR (n:Person) ON (n.domain)",
]
with self.driver.session() as session:
created = 0
for index in indexes:
try:
session.run(index)
name = index.split("INDEX")[1].split("IF")[0].strip()
logger.info(f" ✓ Index: {name}")
created += 1
except Exception as e:
logger.warning(f" ⚠ Index may already exist: {e}")
logger.info(f"Indexes processed: {created}/{len(indexes)}")
def verify_schema(self):
"""
Verify that constraints and indexes were created successfully.
Returns a dict with counts and status.
"""
results = {"constraints": 0, "indexes": 0, "nodes": 0, "success": True}
with self.driver.session() as session:
# Count constraints
constraint_result = session.run("SHOW CONSTRAINTS")
constraints = list(constraint_result)
results["constraints"] = len(constraints)
# Count indexes (excluding constraint-created ones)
index_result = session.run("SHOW INDEXES WHERE type = 'RANGE'")
indexes = list(index_result)
results["indexes"] = len(indexes)
# Count nodes
node_result = session.run("MATCH (n) RETURN count(n) AS count")
results["nodes"] = node_result.single()["count"]
return results
def run_tests(self, include_schema_tests=True):
"""
Run comprehensive tests to verify schema and APOC functionality.
Returns True if all tests pass, False otherwise.
Args:
include_schema_tests: If True, also verify constraints/indexes exist
"""
tests_passed = 0
tests_failed = 0
test_cases = [
("Connection test", "RETURN 1 AS result", lambda r: r.single()["result"] == 1),
("APOC available", "RETURN apoc.version() AS version", lambda r: r.single()["version"] is not None),
("Create test node",
"CREATE (t:_Test {id: 'test_' + toString(timestamp())}) RETURN t.id AS id",
lambda r: r.single()["id"] is not None),
("Query test node",
"MATCH (t:_Test) RETURN count(t) AS count",
lambda r: r.single()["count"] >= 1),
("APOC collection functions",
"RETURN apoc.coll.sum([1,2,3]) AS total",
lambda r: r.single()["total"] == 6),
("APOC date functions",
"RETURN apoc.date.format(timestamp(), 'ms', 'yyyy-MM-dd') AS today",
lambda r: len(r.single()["today"]) == 10),
]
# Schema-specific tests
schema_tests = [
# Universal nodes
("Constraint: Person",
"SHOW CONSTRAINTS WHERE name = 'person_id'",
lambda r: len(list(r)) == 1),
("Constraint: Location",
"SHOW CONSTRAINTS WHERE name = 'location_id'",
lambda r: len(list(r)) == 1),
("Constraint: Topic",
"SHOW CONSTRAINTS WHERE name = 'topic_id'",
lambda r: len(list(r)) == 1),
("Constraint: Goal",
"SHOW CONSTRAINTS WHERE name = 'goal_id'",
lambda r: len(list(r)) == 1),
# Personal team samples
("Constraint: Book",
"SHOW CONSTRAINTS WHERE name = 'book_id'",
lambda r: len(list(r)) == 1),
("Constraint: Training",
"SHOW CONSTRAINTS WHERE name = 'training_id'",
lambda r: len(list(r)) == 1),
("Constraint: Recipe",
"SHOW CONSTRAINTS WHERE name = 'recipe_id'",
lambda r: len(list(r)) == 1),
("Constraint: Account",
"SHOW CONSTRAINTS WHERE name = 'account_id'",
lambda r: len(list(r)) == 1),
# Work team samples
("Constraint: Client",
"SHOW CONSTRAINTS WHERE name = 'client_id'",
lambda r: len(list(r)) == 1),
("Constraint: Opportunity",
"SHOW CONSTRAINTS WHERE name = 'opportunity_id'",
lambda r: len(list(r)) == 1),
("Constraint: Task",
"SHOW CONSTRAINTS WHERE name = 'task_id'",
lambda r: len(list(r)) == 1),
# Engineering team samples
("Constraint: Infrastructure",
"SHOW CONSTRAINTS WHERE name = 'infrastructure_id'",
lambda r: len(list(r)) == 1),
("Constraint: Prototype",
"SHOW CONSTRAINTS WHERE name = 'prototype_id'",
lambda r: len(list(r)) == 1),
# Index checks
("Index: person_name",
"SHOW INDEXES WHERE name = 'person_name'",
lambda r: len(list(r)) == 1),
("Index: event_domain",
"SHOW INDEXES WHERE name = 'event_domain'",
lambda r: len(list(r)) == 1),
("Index: client_status",
"SHOW INDEXES WHERE name = 'client_status'",
lambda r: len(list(r)) == 1),
# Cristiano team sample
("Constraint: Match",
"SHOW CONSTRAINTS WHERE name = 'match_id'",
lambda r: len(list(r)) == 1),
("Constraint: Team",
"SHOW CONSTRAINTS WHERE name = 'team_id'",
lambda r: len(list(r)) == 1),
# Total constraint count (74 node types)
("Total constraints >= 74",
"SHOW CONSTRAINTS",
lambda r: len(list(r)) >= 74),
]
if include_schema_tests:
test_cases.extend(schema_tests)
logger.info("\n" + "=" * 60)
logger.info("RUNNING SCHEMA VERIFICATION TESTS")
logger.info("=" * 60)
with self.driver.session() as session:
for test_name, query, validator in test_cases:
try:
result = session.run(query)
if validator(result):
logger.info(f"{test_name}")
tests_passed += 1
else:
logger.error(f"{test_name} - Validation failed")
tests_failed += 1
except Exception as e:
logger.error(f"{test_name} - {e}")
tests_failed += 1
# Cleanup test nodes
try:
session.run("MATCH (t:_Test) DELETE t")
logger.info(" ✓ Cleanup test nodes")
except Exception as e:
logger.warning(f" ⚠ Cleanup failed: {e}")
logger.info("=" * 60)
logger.info(f"Tests: {tests_passed} passed, {tests_failed} failed")
logger.info("=" * 60 + "\n")
return tests_failed == 0
def create_sample_nodes(self):
"""
Create sample nodes spanning all three teams to demonstrate
the unified schema and cross-domain relationships.
Uses explicit write transactions for reliable commits.
"""
node_queries = [
# ── Central person node ──────────────────────────────────
("Person:user_main", """
MERGE (p:Person {id: 'user_main'})
ON CREATE SET p.created_at = datetime()
SET p.name = 'Main User',
p.relationship = 'self',
p.domain = 'both',
p.updated_at = datetime()
RETURN p.id AS id
"""),
# ── Personal: Sample location ────────────────────────────
("Location:location_home", """
MERGE (l:Location {id: 'location_home'})
ON CREATE SET l.created_at = datetime()
SET l.name = 'Home',
l.type = 'residence',
l.domain = 'personal',
l.updated_at = datetime()
RETURN l.id AS id
"""),
# ── Personal: Sample trip (Nate) ─────────────────────────
("Trip:trip_sample_2025", """
MERGE (t:Trip {id: 'trip_sample_2025'})
ON CREATE SET t.created_at = datetime()
SET t.name = 'Sample Trip',
t.status = 'planning',
t.updated_at = datetime()
RETURN t.id AS id
"""),
# ── Personal: Sample book (Hypatia) ──────────────────────
("Book:book_meditations_aurelius", """
MERGE (b:Book {id: 'book_meditations_aurelius'})
ON CREATE SET b.created_at = datetime()
SET b.title = 'Meditations',
b.author = 'Marcus Aurelius',
b.status = 'completed',
b.rating = 5,
b.updated_at = datetime()
RETURN b.id AS id
"""),
# ── Personal: Sample goal (Seneca) ───────────────────────
("Goal:goal_sample_2025", """
MERGE (g:Goal {id: 'goal_sample_2025'})
ON CREATE SET g.created_at = datetime()
SET g.name = 'Sample Goal',
g.category = 'personal_growth',
g.domain = 'personal',
g.status = 'in_progress',
g.updated_at = datetime()
RETURN g.id AS id
"""),
# ── Personal: Sample topic (universal) ───────────────────
("Topic:topic_stoicism", """
MERGE (t:Topic {id: 'topic_stoicism'})
ON CREATE SET t.created_at = datetime()
SET t.name = 'Stoicism',
t.category = 'philosophy',
t.domain = 'personal',
t.updated_at = datetime()
RETURN t.id AS id
"""),
# ── Personal: Sample account (Garth) ─────────────────────
("Account:account_tfsa_sample", """
MERGE (a:Account {id: 'account_tfsa_sample'})
ON CREATE SET a.created_at = datetime()
SET a.name = 'TFSA - Sample',
a.type = 'TFSA',
a.updated_at = datetime()
RETURN a.id AS id
"""),
# ── Work: Sample client ──────────────────────────────────
("Client:client_sample_corp", """
MERGE (c:Client {id: 'client_sample_corp'})
ON CREATE SET c.created_at = datetime()
SET c.name = 'Sample Corp',
c.industry = 'Technology',
c.status = 'prospect',
c.updated_at = datetime()
RETURN c.id AS id
"""),
# ── Work: Sample skill ───────────────────────────────────
("Skill:skill_cx_strategy", """
MERGE (s:Skill {id: 'skill_cx_strategy'})
ON CREATE SET s.created_at = datetime()
SET s.name = 'CX Strategy',
s.category = 'consulting',
s.level = 'expert',
s.updated_at = datetime()
RETURN s.id AS id
"""),
# ── Work: Sample topic ───────────────────────────────────
("Topic:topic_ai_in_cx", """
MERGE (t:Topic {id: 'topic_ai_in_cx'})
ON CREATE SET t.created_at = datetime()
SET t.name = 'AI in Customer Experience',
t.category = 'technology',
t.domain = 'work',
t.updated_at = datetime()
RETURN t.id AS id
"""),
# ── Engineering: Sample infrastructure (Scotty) ──────────
("Infrastructure:infra_neo4j_prod", """
MERGE (i:Infrastructure {id: 'infra_neo4j_prod'})
ON CREATE SET i.created_at = datetime()
SET i.name = 'Neo4j Production',
i.type = 'database',
i.status = 'running',
i.environment = 'production',
i.updated_at = datetime()
RETURN i.id AS id
"""),
# ── Personal: Sample team (Cristiano) ──────────────────────
("Team:team_arsenal", """
MERGE (t:Team {id: 'team_arsenal'})
ON CREATE SET t.created_at = datetime()
SET t.name = 'Arsenal',
t.league = 'Premier League',
t.country = 'England',
t.followed = true,
t.updated_at = datetime()
RETURN t.id AS id
"""),
]
# Create all nodes in one explicit transaction (auto-commits on exit)
created_nodes = 0
with self.driver.session() as session:
with session.begin_transaction() as tx:
for label, query in node_queries:
try:
result = tx.run(query)
record = result.single()
logger.info(f" ✓ Node: {label}{record['id']}")
created_nodes += 1
except Exception as e:
logger.error(f" ✗ Node {label}: {e}")
# tx auto-commits when context exits normally
logger.info(f" Created {created_nodes}/{len(node_queries)} sample nodes")
# Verify nodes exist before creating relationships
with self.driver.session() as session:
count = session.run("MATCH (n) RETURN count(n) AS c").single()["c"]
logger.info(f" Verified {count} nodes exist before creating relationships")
# Create all relationships in one explicit transaction
rel_specs = [
("SUPPORTS", "Person", "user_main", "Team", "team_arsenal"),
("COMPLETED", "Person", "user_main", "Book", "book_meditations_aurelius"),
("PURSUING", "Person", "user_main", "Goal", "goal_sample_2025"),
("EXPLORES", "Book", "book_meditations_aurelius", "Topic", "topic_stoicism"),
("OWNS", "Person", "user_main", "Account", "account_tfsa_sample"),
]
created_rels = 0
with self.driver.session() as session:
with session.begin_transaction() as tx:
for rel_type, from_label, from_id, to_label, to_id in rel_specs:
desc = f"({from_id})-[:{rel_type}]->({to_id})"
try:
query = (
f"MATCH (a:{from_label} {{id: $from_id}}) "
f"MATCH (b:{to_label} {{id: $to_id}}) "
f"MERGE (a)-[r:{rel_type}]->(b) "
f"RETURN type(r) AS rel"
)
result = tx.run(query, from_id=from_id, to_id=to_id)
record = result.single()
if record is None:
logger.error(f" ✗ Rel {desc}: endpoints not found")
else:
logger.info(f" ✓ Rel: {desc}")
created_rels += 1
except Exception as e:
logger.error(f" ✗ Rel {desc}: {e}")
# tx auto-commits when context exits normally
logger.info(f" Created {created_rels}/{len(rel_specs)} sample relationships")
def document_schema(self):
"""
Display a summary of the unified schema design.
Full documentation: docs/neo4j-unified-schema.md
"""
schema_doc = """
════════════════════════════════════════════════════════════════
UNIFIED KNOWLEDGE GRAPH SCHEMA
One graph for all assistants across personal, work, and engineering
════════════════════════════════════════════════════════════════
UNIVERSAL NODES (any assistant can read/write):
────────────────────────────────────────────────────────────────
Person People (self, family, friends, colleagues)
Location Physical places (cities, venues, offices, trails)
Event Significant occurrences (celebrations, conferences)
Topic Subjects of interest (stoicism, AI in CX)
Goal Objectives (personal growth, career, fitness, financial)
PERSONAL TEAM:
────────────────────────────────────────────────────────────────
Nate (Travel) Trip, Destination, Activity
Hypatia (Learning) Book, Author, LearningPath, Concept, Quote
Marcus (Fitness) Training, Exercise, Program, PersonalRecord, BodyMetric
Seneca (Reflection) Reflection, Value, Habit, LifeEvent, Intention
Bourdain (Food) Recipe, Restaurant, Ingredient, Meal, Technique
Bowie (Culture) Music, Film, Artwork, Playlist, Artist, Style
Cousteau (Nature) Species, Plant, Tank, Garden, Ecosystem, Observation
Garth (Finance) Account, Investment, Asset, Liability, Budget, FinancialGoal
Cristiano (Football) Match, Team, League, Tournament, Player, Season
WORK TEAM:
────────────────────────────────────────────────────────────────
Alan (Strategy) Client, Vendor, Competitor, MarketTrend, Technology, Decision
Ann (Marketing) Content, Publication, Topic, Event
Jeffrey (Sales) Contact, Opportunity, Proposal, Meeting
Jarvis (Execution) Task, Meeting, Note, Decision, Project
ENGINEERING TEAM:
────────────────────────────────────────────────────────────────
Scotty (Infra) Infrastructure, Incident
Harper (Hacking) Prototype, Experiment
TOTAL: 74 node types, all with id uniqueness constraints
CROSS-TEAM CONNECTIONS (examples):
────────────────────────────────────────────────────────────────
Trip -[FOR_EVENT]-> Event (Personal ↔ Work)
Book -[DEVELOPS]-> Skill (Personal ↔ Work)
Book -[INFORMS]-> Content (Personal ↔ Work)
Infrastructure -[HOSTS]-> Project (Engineering ↔ Work)
Prototype -[SUPPORTS]-> Opportunity (Engineering ↔ Work)
Project -[GENERATES_REVENUE]-> Account (Work ↔ Personal)
Training -[BUILDS]-> Skill (Personal ↔ Work)
Full schema: docs/neo4j-unified-schema.md
════════════════════════════════════════════════════════════════
"""
print(schema_doc)
logger.info("Schema documentation displayed")
def get_credentials(args):
"""
Collect Neo4j credentials from environment variables, CLI args, or prompts.
Priority: CLI args > Environment variables > Interactive prompts
"""
# URI
uri = args.uri or os.environ.get("NEO4J_URI")
if not uri:
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip()
if not uri:
uri = "bolt://localhost:7687"
# Username
user = args.user or os.environ.get("NEO4J_USER")
if not user:
user = input("Neo4j username [neo4j]: ").strip()
if not user:
user = "neo4j"
# Password (never from CLI for security)
password = os.environ.get("NEO4J_PASSWORD")
if not password:
password = getpass.getpass("Neo4j password: ")
if not password:
logger.error("Password is required")
sys.exit(1)
return uri, user, password
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Initialize Neo4j Unified Knowledge Graph schema for all AI assistants",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Interactive prompts
%(prog)s --uri bolt://ariel.incus:7687 # Specify URI, prompt for rest
%(prog)s --test-only # Run tests without creating schema
%(prog)s --skip-samples # Create schema without sample data
Environment Variables:
NEO4J_URI Bolt connection URI
NEO4J_USER Database username
NEO4J_PASSWORD Database password (recommended for scripts)
Schema Reference:
docs/neo4j-unified-schema.md
"""
)
parser.add_argument(
"--uri", "-u",
help="Neo4j Bolt URI (default: bolt://localhost:7687)"
)
parser.add_argument(
"--user", "-U",
help="Neo4j username (default: neo4j)"
)
parser.add_argument(
"--test-only", "-t",
action="store_true",
help="Only run verification tests, don't create schema"
)
parser.add_argument(
"--skip-samples",
action="store_true",
help="Skip creating sample nodes"
)
parser.add_argument(
"--skip-docs",
action="store_true",
help="Skip displaying schema documentation"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Reduce output verbosity"
)
return parser.parse_args()
def main():
"""
Main execution function.
Collects credentials via prompts or environment variables.
"""
args = parse_args()
# Set log level
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
# Get credentials
uri, user, password = get_credentials(args)
logger.info(f"Connecting to Neo4j at {uri}...")
try:
schema = LifeGraphSchema(uri, user, password)
except Exception as e:
logger.error(f"Failed to create database driver: {e}")
sys.exit(1)
try:
# Verify connection first
try:
schema.verify_connection()
except AuthError:
logger.error("✗ Authentication failed - check username/password")
sys.exit(1)
except ServiceUnavailable:
logger.error(f"✗ Cannot connect to Neo4j at {uri}")
sys.exit(1)
if args.test_only:
# Just run basic tests (no schema verification)
success = schema.run_tests(include_schema_tests=False)
sys.exit(0 if success else 1)
# Display schema documentation
if not args.skip_docs:
schema.document_schema()
# Create constraints (includes automatic indexes)
logger.info("Creating constraints (74 node types)...")
schema.create_constraints()
# Create additional indexes
logger.info("Creating indexes...")
schema.create_indexes()
# Create sample nodes to validate schema
if not args.skip_samples:
logger.info("Creating sample nodes...")
schema.create_sample_nodes()
# Run verification tests (including schema tests)
logger.info("Verifying schema...")
test_success = schema.run_tests(include_schema_tests=True)
# Summary
stats = schema.verify_schema()
logger.info("=" * 60)
logger.info("SCHEMA INITIALIZATION COMPLETE")
logger.info("=" * 60)
logger.info(f" Constraints: {stats['constraints']}")
logger.info(f" Indexes: {stats['indexes']}")
logger.info(f" Nodes: {stats['nodes']}")
logger.info("=" * 60)
if test_success:
logger.info("✓ All tests passed!")
logger.info("\nUnified graph ready for all 15 assistants.")
logger.info("Schema reference: docs/neo4j-unified-schema.md")
logger.info("\nNext steps:")
logger.info(" 1. Import data (Plex, Calibre, etc.)")
logger.info(" 2. Configure MCP servers for each assistant")
logger.info(" 3. Update assistant prompts with unified graph sections")
else:
logger.warning("⚠ Some tests failed - review output above")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\nOperation cancelled by user")
sys.exit(130)
except Exception as e:
logger.error(f"Error during schema initialization: {e}")
sys.exit(1)
finally:
schema.close()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,587 @@
"""
Neo4j Life Graph Schema Initialization
=======================================
Creates the foundational schema for a personal knowledge graph used by
seven AI assistants: Hypatia, Marcus, Seneca, Nate, Bowie, Bourdain, Cousteau
Requirements:
pip install neo4j
Usage:
python neo4j-personal-schema-init.py
python neo4j-personal-schema-init.py --uri bolt://ariel.incus:7687
python neo4j-personal-schema-init.py --test-only
Environment Variables (optional):
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
NEO4J_USER - Username (default: neo4j)
NEO4J_PASSWORD - Password (will prompt if not set)
"""
import argparse
import getpass
import os
import sys
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class LifeGraphSchema:
def __init__(self, uri, user, password):
"""Initialize connection to Neo4j database"""
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self.uri = uri
def close(self):
"""Close the database connection"""
self.driver.close()
def verify_connection(self):
"""
Verify the connection to Neo4j is working.
Returns True if successful, raises exception otherwise.
"""
with self.driver.session() as session:
result = session.run("RETURN 1 AS test")
record = result.single()
if record and record["test"] == 1:
logger.info(f"✓ Connected to Neo4j at {self.uri}")
return True
raise ConnectionError("Failed to verify Neo4j connection")
def create_constraints(self):
"""
Create uniqueness constraints on key node properties.
This ensures data integrity and creates indexes automatically.
"""
constraints = [
# Core entities
"CREATE CONSTRAINT person_id IF NOT EXISTS FOR (p:Person) REQUIRE p.id IS UNIQUE",
"CREATE CONSTRAINT location_id IF NOT EXISTS FOR (l:Location) REQUIRE l.id IS UNIQUE",
"CREATE CONSTRAINT event_id IF NOT EXISTS FOR (e:Event) REQUIRE e.id IS UNIQUE",
# Media types (Bowie, Bourdain, Hypatia domains)
"CREATE CONSTRAINT book_id IF NOT EXISTS FOR (b:Book) REQUIRE b.id IS UNIQUE",
"CREATE CONSTRAINT film_id IF NOT EXISTS FOR (f:Film) REQUIRE f.id IS UNIQUE",
"CREATE CONSTRAINT music_id IF NOT EXISTS FOR (m:Music) REQUIRE m.id IS UNIQUE",
"CREATE CONSTRAINT recipe_id IF NOT EXISTS FOR (r:Recipe) REQUIRE r.id IS UNIQUE",
# Activity/Practice nodes
"CREATE CONSTRAINT training_id IF NOT EXISTS FOR (t:Training) REQUIRE t.id IS UNIQUE",
"CREATE CONSTRAINT trip_id IF NOT EXISTS FOR (t:Trip) REQUIRE t.id IS UNIQUE",
"CREATE CONSTRAINT reflection_id IF NOT EXISTS FOR (r:Reflection) REQUIRE r.id IS UNIQUE",
# Knowledge/Learning (Hypatia domain)
"CREATE CONSTRAINT topic_id IF NOT EXISTS FOR (t:Topic) REQUIRE t.id IS UNIQUE",
"CREATE CONSTRAINT concept_id IF NOT EXISTS FOR (c:Concept) REQUIRE c.id IS UNIQUE",
# Nature (Cousteau domain)
"CREATE CONSTRAINT species_id IF NOT EXISTS FOR (s:Species) REQUIRE s.id IS UNIQUE",
"CREATE CONSTRAINT plant_id IF NOT EXISTS FOR (p:Plant) REQUIRE p.id IS UNIQUE",
]
with self.driver.session() as session:
for constraint in constraints:
try:
session.run(constraint)
logger.info(f"Created constraint: {constraint.split('FOR')[1].split('REQUIRE')[0].strip()}")
except Exception as e:
logger.warning(f"Constraint may already exist: {e}")
def create_indexes(self):
"""
Create indexes for frequently queried properties.
These improve query performance for searches and filters.
"""
indexes = [
# Text search indexes
"CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.name)",
"CREATE INDEX location_name IF NOT EXISTS FOR (l:Location) ON (l.name)",
"CREATE INDEX book_title IF NOT EXISTS FOR (b:Book) ON (b.title)",
"CREATE INDEX film_title IF NOT EXISTS FOR (f:Film) ON (f.title)",
"CREATE INDEX music_title IF NOT EXISTS FOR (m:Music) ON (m.title)",
"CREATE INDEX recipe_name IF NOT EXISTS FOR (r:Recipe) ON (r.name)",
# Date-based indexes for temporal queries
"CREATE INDEX event_date IF NOT EXISTS FOR (e:Event) ON (e.date)",
"CREATE INDEX training_date IF NOT EXISTS FOR (t:Training) ON (t.date)",
"CREATE INDEX trip_start IF NOT EXISTS FOR (t:Trip) ON (t.start_date)",
"CREATE INDEX reflection_date IF NOT EXISTS FOR (r:Reflection) ON (r.date)",
# Category/type indexes for filtering
"CREATE INDEX event_type IF NOT EXISTS FOR (e:Event) ON (e.type)",
"CREATE INDEX location_category IF NOT EXISTS FOR (l:Location) ON (l.category)",
"CREATE INDEX music_genre IF NOT EXISTS FOR (m:Music) ON (m.genre)",
]
with self.driver.session() as session:
for index in indexes:
try:
session.run(index)
logger.info(f"Created index: {index.split('FOR')[1].split('ON')[0].strip()}")
except Exception as e:
logger.warning(f"Index may already exist: {e}")
def verify_schema(self):
"""
Verify that constraints and indexes were created successfully.
Returns a dict with counts and status.
"""
results = {"constraints": 0, "indexes": 0, "nodes": 0, "success": True}
with self.driver.session() as session:
# Count constraints
constraint_result = session.run("SHOW CONSTRAINTS")
constraints = list(constraint_result)
results["constraints"] = len(constraints)
# Count indexes (excluding constraint-created ones)
index_result = session.run("SHOW INDEXES WHERE type = 'RANGE'")
indexes = list(index_result)
results["indexes"] = len(indexes)
# Count nodes
node_result = session.run("MATCH (n) RETURN count(n) AS count")
results["nodes"] = node_result.single()["count"]
return results
def run_tests(self, include_schema_tests=True):
"""
Run comprehensive tests to verify schema and APOC functionality.
Returns True if all tests pass, False otherwise.
Args:
include_schema_tests: If True, also verify constraints/indexes exist
"""
tests_passed = 0
tests_failed = 0
test_cases = [
("Connection test", "RETURN 1 AS result", lambda r: r.single()["result"] == 1),
("APOC available", "RETURN apoc.version() AS version", lambda r: r.single()["version"] is not None),
("Create test node",
"CREATE (t:_Test {id: 'test_' + toString(timestamp())}) RETURN t.id AS id",
lambda r: r.single()["id"] is not None),
("Query test node",
"MATCH (t:_Test) RETURN count(t) AS count",
lambda r: r.single()["count"] >= 1),
("APOC collection functions",
"RETURN apoc.coll.sum([1,2,3]) AS total",
lambda r: r.single()["total"] == 6),
("APOC date functions",
"RETURN apoc.date.format(timestamp(), 'ms', 'yyyy-MM-dd') AS today",
lambda r: len(r.single()["today"]) == 10),
]
# Schema-specific tests (only run after schema creation)
schema_tests = [
("Constraint exists (Person)",
"SHOW CONSTRAINTS WHERE name = 'person_id'",
lambda r: len(list(r)) == 1),
("Index exists (person_name)",
"SHOW INDEXES WHERE name = 'person_name'",
lambda r: len(list(r)) == 1),
]
if include_schema_tests:
test_cases.extend(schema_tests)
logger.info("\n" + "=" * 60)
logger.info("RUNNING SCHEMA VERIFICATION TESTS")
logger.info("=" * 60)
with self.driver.session() as session:
for test_name, query, validator in test_cases:
try:
result = session.run(query)
if validator(result):
logger.info(f"{test_name}")
tests_passed += 1
else:
logger.error(f"{test_name} - Validation failed")
tests_failed += 1
except Exception as e:
logger.error(f"{test_name} - {e}")
tests_failed += 1
# Cleanup test nodes
try:
session.run("MATCH (t:_Test) DELETE t")
logger.info(" ✓ Cleanup test nodes")
except Exception as e:
logger.warning(f" ⚠ Cleanup failed: {e}")
logger.info("=" * 60)
logger.info(f"Tests: {tests_passed} passed, {tests_failed} failed")
logger.info("=" * 60 + "\n")
return tests_failed == 0
def create_sample_nodes(self):
"""
Create sample nodes to demonstrate the schema.
Replace this with your actual data import logic.
"""
queries = [
# Central person node (you)
"""
MERGE (p:Person {id: 'user_main'})
SET p.name = 'Main User',
p.relationship_type = 'self',
p.created_at = datetime()
""",
# Sample interest/preference
"""
MERGE (i:Interest {id: 'interest_cooking'})
SET i.category = 'culinary',
i.name = 'Cooking',
i.intensity = 'high',
i.notes = 'Especially interested in techniques and cultural context'
""",
# Sample location
"""
MERGE (l:Location {id: 'location_costarica'})
SET l.name = 'Costa Rica',
l.country = 'Costa Rica',
l.category = 'travel_destination',
l.notes = 'Planning future trip'
""",
]
with self.driver.session() as session:
for query in queries:
session.run(query)
logger.info("Created sample nodes")
def document_schema(self):
"""
Document the schema design for reference.
This prints the node types and their intended use by each assistant.
"""
schema_doc = """
════════════════════════════════════════════════════════════════
LIFE GRAPH SCHEMA - NODE TYPES AND ASSISTANT RESPONSIBILITIES
════════════════════════════════════════════════════════════════
CORE NODES (Used by all assistants):
────────────────────────────────────────────────────────────────
Person - People in your life (family, friends, contacts)
Properties: name, relationship_type, birthday,
contact_info, notes
Location - Places (home, travel, favorites)
Properties: name, city, country, coordinates,
category, notes
Event - Life events (vacations, gatherings, milestones)
Properties: name, date, location, description, type
Interest - Preferences, hobbies, goals
Properties: category, name, intensity, notes
════════════════════════════════════════════════════════════════
HYPATIA (Learning & Knowledge):
────────────────────────────────────────────────────────────────
Book - Books read or to-read
Properties: title, author, isbn, status, rating,
date_started, date_finished, notes
Topic - Subject areas of study
Properties: name, field, depth, resources
Concept - Ideas and principles learned
Properties: name, definition, examples, connections
════════════════════════════════════════════════════════════════
MARCUS (Fitness & Training):
────────────────────────────────────────────────────────────────
Training - Individual workout sessions
Properties: date, type, duration, exercises,
volume, intensity, notes, feeling
Exercise - Specific movements/activities
Properties: name, category, equipment,
target_muscles, technique_notes
════════════════════════════════════════════════════════════════
SENECA (Reflection & Wellness):
────────────────────────────────────────────────────────────────
Reflection - Journal entries and insights
Properties: date, content, mood, themes,
insights, questions
Goal - Life objectives and aspirations
Properties: name, category, timeline, status,
progress, reflections
════════════════════════════════════════════════════════════════
NATE (Travel & Adventure):
────────────────────────────────────────────────────────────────
Trip - Travel plans and experiences
Properties: name, start_date, end_date,
destinations, purpose, budget, highlights
Activity - Things to do at destinations
Properties: name, type, location, cost,
difficulty, notes
════════════════════════════════════════════════════════════════
BOWIE (Arts, Culture & Style):
────────────────────────────────────────────────────────────────
Film - Movies and TV shows
Properties: title, year, director, genre,
status, rating, date_watched, notes
Music - Songs, albums, artists
Properties: title, artist, album, genre, year,
rating, play_count, notes
Artwork - Visual art, exhibitions, collections
Properties: title, artist, medium, year, location,
notes
════════════════════════════════════════════════════════════════
BOURDAIN (Food & Drink):
────────────────────────────────────────────────────────────────
Recipe - Dishes to cook
Properties: name, cuisine, difficulty, time,
ingredients, instructions, source, notes
Restaurant - Dining destinations
Properties: name, location, cuisine, price_range,
rating, dishes_tried, notes
Ingredient - Foods and cooking components
Properties: name, category, season, source,
substitutes, notes
════════════════════════════════════════════════════════════════
COUSTEAU (Nature & Living Things):
────────────────────────────────────────────────────────────────
Species - Animals, fish, marine life
Properties: name, scientific_name, category,
habitat, conservation_status, notes
Plant - Garden plants, houseplants
Properties: name, scientific_name, type,
care_requirements, location, health_status
Ecosystem - Environments and habitats
Properties: name, type, location, characteristics,
species_present, conservation_notes
════════════════════════════════════════════════════════════════
KEY RELATIONSHIP PATTERNS:
────────────────────────────────────────────────────────────────
Cross-domain connections:
- Training -[PREPARATION_FOR]-> Trip
- Reflection -[ABOUT]-> Event/Training/Trip
- Book -[INSPIRED]-> Trip/Recipe/Concept
- Recipe -[FROM_LOCATION]-> Location
- Music -[PLAYED_AT]-> Event/Location
- Film -[SET_IN]-> Location
- Species -[OBSERVED_AT]-> Location
- Plant -[GROWS_IN]-> Location
Personal connections:
- Person -[ATTENDED]-> Event
- Person -[TRAVELED_WITH]-> Trip
- Person -[TRAINED_WITH]-> Training
- Person -[SHARED_MEAL]-> Recipe/Restaurant
- Person -[RECOMMENDED]-> Book/Film/Music/Restaurant
Learning connections:
- Book -[ABOUT]-> Topic
- Topic -[CONTAINS]-> Concept
- Concept -[RELATES_TO]-> Concept
- Training -[TEACHES]-> Concept (movement patterns, discipline)
════════════════════════════════════════════════════════════════
"""
print(schema_doc)
logger.info("Schema documentation displayed")
def get_credentials(args):
"""
Collect Neo4j credentials from environment variables, CLI args, or prompts.
Priority: CLI args > Environment variables > Interactive prompts
"""
# URI
uri = args.uri or os.environ.get("NEO4J_URI")
if not uri:
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip()
if not uri:
uri = "bolt://localhost:7687"
# Username
user = args.user or os.environ.get("NEO4J_USER")
if not user:
user = input("Neo4j username [neo4j]: ").strip()
if not user:
user = "neo4j"
# Password (never from CLI for security)
password = os.environ.get("NEO4J_PASSWORD")
if not password:
password = getpass.getpass("Neo4j password: ")
if not password:
logger.error("Password is required")
sys.exit(1)
return uri, user, password
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description="Initialize Neo4j Life Graph schema for AI assistants",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s # Interactive prompts
%(prog)s --uri bolt://ariel.incus:7687 # Specify URI, prompt for rest
%(prog)s --test-only # Run tests without creating schema
%(prog)s --skip-samples # Create schema without sample data
Environment Variables:
NEO4J_URI Bolt connection URI
NEO4J_USER Database username
NEO4J_PASSWORD Database password (recommended for scripts)
"""
)
parser.add_argument(
"--uri", "-u",
help="Neo4j Bolt URI (default: bolt://localhost:7687)"
)
parser.add_argument(
"--user", "-U",
help="Neo4j username (default: neo4j)"
)
parser.add_argument(
"--test-only", "-t",
action="store_true",
help="Only run verification tests, don't create schema"
)
parser.add_argument(
"--skip-samples",
action="store_true",
help="Skip creating sample nodes"
)
parser.add_argument(
"--skip-docs",
action="store_true",
help="Skip displaying schema documentation"
)
parser.add_argument(
"--quiet", "-q",
action="store_true",
help="Reduce output verbosity"
)
return parser.parse_args()
def main():
"""
Main execution function.
Collects credentials via prompts or environment variables.
"""
args = parse_args()
# Set log level
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
# Get credentials
uri, user, password = get_credentials(args)
logger.info(f"Connecting to Neo4j at {uri}...")
try:
schema = LifeGraphSchema(uri, user, password)
except Exception as e:
logger.error(f"Failed to create database driver: {e}")
sys.exit(1)
try:
# Verify connection first
try:
schema.verify_connection()
except AuthError:
logger.error("✗ Authentication failed - check username/password")
sys.exit(1)
except ServiceUnavailable:
logger.error(f"✗ Cannot connect to Neo4j at {uri}")
sys.exit(1)
if args.test_only:
# Just run basic tests (no schema verification)
success = schema.run_tests(include_schema_tests=False)
sys.exit(0 if success else 1)
# Display schema documentation
if not args.skip_docs:
schema.document_schema()
# Create constraints (includes automatic indexes)
logger.info("Creating constraints...")
schema.create_constraints()
# Create additional indexes
logger.info("Creating indexes...")
schema.create_indexes()
# Create sample nodes to validate schema
if not args.skip_samples:
logger.info("Creating sample nodes...")
schema.create_sample_nodes()
# Run verification tests (including schema tests)
logger.info("Verifying schema...")
test_success = schema.run_tests(include_schema_tests=True)
# Summary
stats = schema.verify_schema()
logger.info("=" * 60)
logger.info("SCHEMA INITIALIZATION COMPLETE")
logger.info("=" * 60)
logger.info(f" Constraints: {stats['constraints']}")
logger.info(f" Indexes: {stats['indexes']}")
logger.info(f" Nodes: {stats['nodes']}")
logger.info("=" * 60)
if test_success:
logger.info("✓ All tests passed!")
logger.info("\nNext steps:")
logger.info(" 1. Import your Plex library (Film, Music nodes)")
logger.info(" 2. Import your Calibre library (Book nodes)")
logger.info(" 3. Configure your AI assistants to write to this graph")
else:
logger.warning("⚠ Some tests failed - review output above")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\nOperation cancelled by user")
sys.exit(130)
except Exception as e:
logger.error(f"Error during schema initialization: {e}")
sys.exit(1)
finally:
schema.close()
if __name__ == "__main__":
main()

348
utils/neo4j-validate.py Normal file
View File

@@ -0,0 +1,348 @@
"""
Neo4j Knowledge Graph Validation
=================================
Comprehensive validation report for the Koios unified knowledge graph.
Checks constraints, indexes, sample nodes, relationships, and detects junk data.
Share the output with an AI assistant to confirm everything is correct.
Usage:
python neo4j-validate.py
python neo4j-validate.py --uri bolt://ariel.incus:7687
Environment Variables (optional):
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
NEO4J_USER - Username (default: neo4j)
NEO4J_PASSWORD - Password (will prompt if not set)
"""
import argparse
import getpass
import os
import sys
from datetime import datetime
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
# ── Expected schema counts ───────────────────────────────────────────────────
EXPECTED_CONSTRAINTS = [
# Universal
"person_id", "location_id", "event_id", "topic_id", "goal_id",
# Nate
"trip_id", "destination_id", "activity_id",
# Hypatia
"book_id", "author_id", "learningpath_id", "concept_id", "quote_id",
# Marcus
"training_id", "exercise_id", "program_id", "personalrecord_id", "bodymetric_id",
# Seneca
"reflection_id", "value_id", "habit_id", "lifeevent_id", "intention_id",
# Bourdain
"recipe_id", "restaurant_id", "ingredient_id", "meal_id", "technique_id",
# Bowie
"music_id", "film_id", "artwork_id", "playlist_id", "artist_id", "style_id",
# Cousteau
"species_id", "plant_id", "tank_id", "garden_id", "ecosystem_id", "observation_id",
# Garth
"account_id", "investment_id", "asset_id", "liability_id", "budget_id", "financialgoal_id",
# Cristiano
"match_id", "team_id", "league_id", "tournament_id", "player_id", "season_id",
# Work: Business
"client_id", "contact_id", "opportunity_id", "proposal_id", "project_id",
# Work: Market Intelligence
"vendor_id", "competitor_id", "markettrend_id", "technology_id",
# Work: Content & Visibility
"content_id", "publication_id",
# Work: Professional Development
"skill_id", "certification_id", "relationship_id",
# Work: Daily Operations
"task_id", "meeting_id", "note_id", "decision_id",
# Engineering
"infrastructure_id", "incident_id", "prototype_id", "experiment_id",
]
# All 74 valid node labels from the schema
EXPECTED_LABELS = {
"Person", "Location", "Event", "Topic", "Goal",
"Trip", "Destination", "Activity",
"Book", "Author", "LearningPath", "Concept", "Quote",
"Training", "Exercise", "Program", "PersonalRecord", "BodyMetric",
"Reflection", "Value", "Habit", "LifeEvent", "Intention",
"Recipe", "Restaurant", "Ingredient", "Meal", "Technique",
"Music", "Film", "Artwork", "Playlist", "Artist", "Style",
"Species", "Plant", "Tank", "Garden", "Ecosystem", "Observation",
"Account", "Investment", "Asset", "Liability", "Budget", "FinancialGoal",
"Match", "Team", "League", "Tournament", "Player", "Season",
"Client", "Contact", "Opportunity", "Proposal", "Project",
"Vendor", "Competitor", "MarketTrend", "Technology",
"Content", "Publication",
"Skill", "Certification", "Relationship",
"Task", "Meeting", "Note", "Decision",
"Infrastructure", "Incident", "Prototype", "Experiment",
}
EXPECTED_SAMPLE_NODES = [
("Person", "user_main"),
("Location", "location_home"),
("Trip", "trip_sample_2025"),
("Book", "book_meditations_aurelius"),
("Goal", "goal_sample_2025"),
("Topic", "topic_stoicism"),
("Topic", "topic_ai_in_cx"),
("Account", "account_tfsa_sample"),
("Client", "client_sample_corp"),
("Skill", "skill_cx_strategy"),
("Infrastructure", "infra_neo4j_prod"),
("Team", "team_arsenal"),
]
EXPECTED_SAMPLE_RELS = [
("Person", "user_main", "SUPPORTS", "Team", "team_arsenal"),
("Person", "user_main", "COMPLETED", "Book", "book_meditations_aurelius"),
("Person", "user_main", "PURSUING", "Goal", "goal_sample_2025"),
("Book", "book_meditations_aurelius", "EXPLORES", "Topic", "topic_stoicism"),
("Person", "user_main", "OWNS", "Account", "account_tfsa_sample"),
]
# A sampling of expected indexes (not exhaustive, just key ones to spot-check)
EXPECTED_INDEX_SAMPLES = [
"person_name", "book_title", "client_name", "event_date",
"training_date", "client_status", "task_status", "event_domain",
"team_name", "player_name", "match_competition",
]
def get_credentials(args):
uri = args.uri or os.environ.get("NEO4J_URI")
if not uri:
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip() or "bolt://localhost:7687"
user = args.user or os.environ.get("NEO4J_USER")
if not user:
user = input("Neo4j username [neo4j]: ").strip() or "neo4j"
password = os.environ.get("NEO4J_PASSWORD")
if not password:
password = getpass.getpass("Neo4j password: ")
if not password:
print("ERROR: Password is required")
sys.exit(1)
return uri, user, password
def validate(driver, uri):
"""Run all validation checks and print the report."""
passed = 0
failed = 0
warnings = 0
def check(ok, label):
nonlocal passed, failed
if ok:
passed += 1
print(f"{label}")
else:
failed += 1
print(f"{label}")
def warn(label):
nonlocal warnings
warnings += 1
print(f"{label}")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print()
print("" * 65)
print(" VALIDATION REPORT — Koios Unified Knowledge Graph")
print("" * 65)
print(f" Schema Version: 2.1.0")
print(f" Database: {uri}")
print(f" Timestamp: {now}")
print("" * 65)
with driver.session() as session:
# ── 1. CONNECTION ────────────────────────────────────────────
print("\n[CONNECTION]")
try:
r = session.run("RETURN 1 AS test").single()["test"]
check(r == 1, "Database reachable")
except Exception as e:
check(False, f"Database reachable — {e}")
print("\nCannot proceed without a connection.")
return False
# APOC check
try:
v = session.run("RETURN apoc.version() AS v").single()["v"]
check(True, f"APOC available (v{v})")
except Exception:
warn("APOC not available — some assistants may need it")
# ── 2. CONSTRAINTS ───────────────────────────────────────────
print(f"\n[CONSTRAINTS] (expecting {len(EXPECTED_CONSTRAINTS)})")
existing_constraints = set()
for rec in session.run("SHOW CONSTRAINTS YIELD name RETURN name"):
existing_constraints.add(rec["name"])
missing_constraints = []
for name in EXPECTED_CONSTRAINTS:
if name in existing_constraints:
pass # Don't print every single one — just summarize
else:
missing_constraints.append(name)
if not missing_constraints:
check(True, f"All {len(EXPECTED_CONSTRAINTS)} constraints present")
else:
check(False, f"Missing {len(missing_constraints)} constraints: {', '.join(missing_constraints[:10])}")
if len(missing_constraints) > 10:
print(f" ... and {len(missing_constraints) - 10} more")
extra_constraints = existing_constraints - set(EXPECTED_CONSTRAINTS)
if extra_constraints:
warn(f"Extra constraints not in schema: {', '.join(sorted(extra_constraints))}")
# ── 3. INDEXES ───────────────────────────────────────────────
print(f"\n[INDEXES]")
existing_indexes = set()
for rec in session.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name"):
existing_indexes.add(rec["name"])
# Don't count constraint-backed indexes (they share names with constraints)
pure_indexes = existing_indexes - set(EXPECTED_CONSTRAINTS)
total_indexes = len(existing_indexes)
check(total_indexes >= len(EXPECTED_CONSTRAINTS),
f"Total indexes: {total_indexes} (includes {len(EXPECTED_CONSTRAINTS)} from constraints)")
missing_index_samples = [i for i in EXPECTED_INDEX_SAMPLES if i not in existing_indexes]
if not missing_index_samples:
check(True, f"Key index spot-check passed ({len(EXPECTED_INDEX_SAMPLES)} sampled)")
else:
check(False, f"Missing indexes: {', '.join(missing_index_samples)}")
# ── 4. NODE LABELS ───────────────────────────────────────────
print(f"\n[NODE LABELS]")
label_counts = {}
for rec in session.run("CALL db.labels() YIELD label RETURN label"):
label = rec["label"]
count_rec = session.run(f"MATCH (n:`{label}`) RETURN count(n) AS c").single()
label_counts[label] = count_rec["c"]
labels_in_db = set(label_counts.keys())
unexpected_labels = labels_in_db - EXPECTED_LABELS - {"_Test"} # _Test is from test suite
if not unexpected_labels:
check(True, "No unexpected labels (no junk from Memory server)")
else:
check(False, f"Unexpected labels found: {', '.join(sorted(unexpected_labels))}")
for ul in sorted(unexpected_labels):
print(f"{ul}: {label_counts.get(ul, '?')} nodes — SHOULD BE REMOVED")
labels_with_data = {l for l, c in label_counts.items() if c > 0}
print(f" Labels with data: {len(labels_with_data)} of {len(EXPECTED_LABELS)} schema types")
# ── 5. SAMPLE NODES ──────────────────────────────────────────
print(f"\n[SAMPLE NODES] (expecting {len(EXPECTED_SAMPLE_NODES)})")
for label, node_id in EXPECTED_SAMPLE_NODES:
result = session.run(
f"MATCH (n:`{label}` {{id: $id}}) RETURN n.id AS id, n.name AS name, "
f"n.title AS title, n.updated_at AS updated",
id=node_id
).single()
if result:
display = result["name"] or result["title"] or result["id"]
check(True, f"{label}:{node_id}{display}")
else:
check(False, f"{label}:{node_id} — NOT FOUND")
# ── 6. SAMPLE RELATIONSHIPS ──────────────────────────────────
print(f"\n[SAMPLE RELATIONSHIPS] (expecting {len(EXPECTED_SAMPLE_RELS)})")
for from_label, from_id, rel_type, to_label, to_id in EXPECTED_SAMPLE_RELS:
result = session.run(
f"MATCH (a:`{from_label}` {{id: $from_id}})-[r:`{rel_type}`]->(b:`{to_label}` {{id: $to_id}}) "
f"RETURN type(r) AS rel",
from_id=from_id, to_id=to_id
).single()
if result:
check(True, f"({from_id})-[:{rel_type}]->({to_id})")
else:
check(False, f"({from_id})-[:{rel_type}]->({to_id}) — NOT FOUND")
# ── 7. RELATIONSHIP SUMMARY ──────────────────────────────────
print(f"\n[RELATIONSHIP SUMMARY]")
total_rels = session.run("MATCH ()-[r]->() RETURN count(r) AS c").single()["c"]
print(f" Total relationships: {total_rels}")
if total_rels > 0:
rel_types = list(session.run(
"MATCH ()-[r]->() RETURN type(r) AS type, count(r) AS count ORDER BY count DESC"
))
for rec in rel_types:
print(f" {rec['type']}: {rec['count']}")
else:
check(False, "No relationships exist — sample data may not have been created")
# ── 8. TOTAL NODE COUNT ──────────────────────────────────────
print(f"\n[NODE SUMMARY]")
total_nodes = session.run("MATCH (n) RETURN count(n) AS c").single()["c"]
print(f" Total nodes: {total_nodes}")
if label_counts:
for label in sorted(label_counts.keys()):
if label_counts[label] > 0:
print(f" {label}: {label_counts[label]}")
# ── SUMMARY ──────────────────────────────────────────────────────
print()
print("" * 65)
total = passed + failed
if failed == 0:
print(f" RESULT: ALL {passed} CHECKS PASSED ✓")
if warnings:
print(f" ({warnings} warning{'s' if warnings != 1 else ''})")
else:
print(f" RESULT: {failed} FAILED / {passed} passed / {total} total")
if warnings:
print(f" ({warnings} warning{'s' if warnings != 1 else ''})")
print("" * 65)
print()
return failed == 0
def main():
parser = argparse.ArgumentParser(
description="Validate Koios Neo4j knowledge graph schema and data"
)
parser.add_argument("--uri", "-u", help="Neo4j Bolt URI")
parser.add_argument("--user", "-U", help="Neo4j username")
args = parser.parse_args()
uri, user, password = get_credentials(args)
try:
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session() as session:
session.run("RETURN 1")
print(f"✓ Connected to {uri}")
except AuthError:
print(f"✗ Authentication failed for {uri}")
sys.exit(1)
except ServiceUnavailable:
print(f"✗ Cannot connect to {uri}")
sys.exit(1)
try:
success = validate(driver, uri)
sys.exit(0 if success else 1)
except Exception as e:
print(f"ERROR: {e}")
sys.exit(1)
finally:
driver.close()
if __name__ == "__main__":
main()