""" Neo4j Knowledge Graph Validation ================================= Comprehensive validation report for the Koios unified knowledge graph. Checks constraints, indexes, sample nodes, relationships, and detects junk data. Share the output with an AI assistant to confirm everything is correct. Usage: python neo4j-validate.py python neo4j-validate.py --uri bolt://ariel.incus:7687 Environment Variables (optional): NEO4J_URI - Bolt URI (default: bolt://localhost:7687) NEO4J_USER - Username (default: neo4j) NEO4J_PASSWORD - Password (will prompt if not set) """ import argparse import getpass import os import sys from datetime import datetime from neo4j import GraphDatabase from neo4j.exceptions import AuthError, ServiceUnavailable # ── Expected schema counts ─────────────────────────────────────────────────── EXPECTED_CONSTRAINTS = [ # Universal "person_id", "location_id", "event_id", "topic_id", "goal_id", # Nate "trip_id", "destination_id", "activity_id", # Hypatia "book_id", "author_id", "learningpath_id", "concept_id", "quote_id", # Marcus "training_id", "exercise_id", "program_id", "personalrecord_id", "bodymetric_id", # Watson (Reflection & Emotional Safety) "reflection_id", "value_id", "habit_id", "lifeevent_id", "intention_id", "emotionalmemory_id", "relationshiptheme_id", "dialoguenote_id", "dynamicpattern_id", # Bourdain "recipe_id", "restaurant_id", "ingredient_id", "meal_id", "technique_id", # David (Arts & Culture) "music_id", "film_id", "artwork_id", "playlist_id", "artist_id", "style_id", # Cousteau "species_id", "plant_id", "tank_id", "garden_id", "ecosystem_id", "observation_id", # Garth "account_id", "investment_id", "asset_id", "liability_id", "budget_id", "financialgoal_id", # Cristiano "match_id", "team_id", "league_id", "tournament_id", "player_id", "season_id", # Shawn (Personal General Assistant) "communication_id", # Work: Business "client_id", "contact_id", "opportunity_id", "proposal_id", "project_id", # Work: Market Intelligence "vendor_id", "competitor_id", "markettrend_id", "technology_id", # Work: Content & Visibility "content_id", "publication_id", # Work: Professional Development "skill_id", "certification_id", "relationship_id", # Work: Daily Operations "task_id", "meeting_id", "note_id", "decision_id", # Engineering "infrastructure_id", "incident_id", "prototype_id", "experiment_id", ] # All 74 valid node labels from the schema EXPECTED_LABELS = { "Person", "Location", "Event", "Topic", "Goal", "Trip", "Destination", "Activity", "Book", "Author", "LearningPath", "Concept", "Quote", "Training", "Exercise", "Program", "PersonalRecord", "BodyMetric", "Reflection", "Value", "Habit", "LifeEvent", "Intention", "EmotionalMemory", "RelationshipTheme", "DialogueNote", "DynamicPattern", "Recipe", "Restaurant", "Ingredient", "Meal", "Technique", "Music", "Film", "Artwork", "Playlist", "Artist", "Style", "Species", "Plant", "Tank", "Garden", "Ecosystem", "Observation", "Account", "Investment", "Asset", "Liability", "Budget", "FinancialGoal", "Match", "Team", "League", "Tournament", "Player", "Season", "Communication", "Client", "Contact", "Opportunity", "Proposal", "Project", "Vendor", "Competitor", "MarketTrend", "Technology", "Content", "Publication", "Skill", "Certification", "Relationship", "Task", "Meeting", "Note", "Decision", "Infrastructure", "Incident", "Prototype", "Experiment", } EXPECTED_SAMPLE_NODES = [ ("Person", "user_main"), ("Location", "location_home"), ("Trip", "trip_sample_2025"), ("Book", "book_meditations_aurelius"), ("Goal", "goal_sample_2025"), ("Topic", "topic_stoicism"), ("Topic", "topic_ai_in_cx"), ("Account", "account_tfsa_sample"), ("Contact", "contact_sample_personal"), ("Communication", "comm_sample"), ("EmotionalMemory", "memory_sample"), ("Client", "client_sample_corp"), ("Skill", "skill_cx_strategy"), ("Infrastructure", "infra_neo4j_prod"), ("Team", "team_arsenal"), ] EXPECTED_SAMPLE_RELS = [ ("Person", "user_main", "SUPPORTS", "Team", "team_arsenal"), ("Person", "user_main", "COMPLETED", "Book", "book_meditations_aurelius"), ("Person", "user_main", "PURSUING", "Goal", "goal_sample_2025"), ("Book", "book_meditations_aurelius", "EXPLORES", "Topic", "topic_stoicism"), ("Person", "user_main", "OWNS", "Account", "account_tfsa_sample"), ("Person", "user_main", "HAD", "Communication", "comm_sample"), ("Communication", "comm_sample", "WITH", "Contact", "contact_sample_personal"), ] # A sampling of expected indexes (not exhaustive, just key ones to spot-check) EXPECTED_INDEX_SAMPLES = [ "person_name", "book_title", "client_name", "event_date", "training_date", "client_status", "task_status", "event_domain", "team_name", "player_name", "match_competition", "contact_domain", "task_domain", ] def _mask_password(pw): """Mask a password for display: keep first and last char, hide the middle.""" if not pw: return "(empty)" if len(pw) <= 2: return "*" * len(pw) return f"{pw[0]}{'*' * (len(pw) - 2)}{pw[-1]} ({len(pw)} chars)" def get_credentials(args): """ Collect Neo4j credentials by prompting for each value sequentially. For each of URI, username, password: show the current default (from CLI arg, env var, or built-in fallback) in brackets; user hits Enter to accept or types a new value to override. Password prompt uses getpass so it isn't echoed and doesn't land in shell history. Finally, print a summary (password masked) and ask for final confirmation. If the user declines, exit cleanly without touching the database. """ print() print("─" * 60) print(" Neo4j Connection") print("─" * 60) # URI uri_default = args.uri or os.environ.get("NEO4J_URI") or "bolt://localhost:7687" uri = input(f" Neo4j URI [{uri_default}]: ").strip() or uri_default # Username user_default = args.user or os.environ.get("NEO4J_USER") or "neo4j" user = input(f" Neo4j username [{user_default}]: ").strip() or user_default # Password (always via getpass, never echoed) env_password = os.environ.get("NEO4J_PASSWORD") if env_password: prompt = " Neo4j password [from $NEO4J_PASSWORD, Enter to accept]: " else: prompt = " Neo4j password: " password = getpass.getpass(prompt) or env_password or "" if not password: print("ERROR: Password is required") sys.exit(1) # Summary + confirm print() print("─" * 60) print(" Connection summary") print("─" * 60) print(f" URI: {uri}") print(f" User: {user}") print(f" Password: {_mask_password(password)}") print("─" * 60) print(" Validation is read-only — no graph data will be modified.") confirm = input("Proceed with these credentials? [Y/n]: ").strip().lower() if confirm and confirm not in ("y", "yes"): print("Cancelled by user.") sys.exit(0) return uri, user, password def validate(driver, uri): """Run all validation checks and print the report.""" passed = 0 failed = 0 warnings = 0 def check(ok, label): nonlocal passed, failed if ok: passed += 1 print(f" ✓ {label}") else: failed += 1 print(f" ✗ {label}") def warn(label): nonlocal warnings warnings += 1 print(f" ⚠ {label}") now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print() print("═" * 65) print(" VALIDATION REPORT — Koios Unified Knowledge Graph") print("═" * 65) print(f" Schema Version: 2.3.0") print(f" Database: {uri}") print(f" Timestamp: {now}") print("═" * 65) with driver.session() as session: # ── 1. CONNECTION ──────────────────────────────────────────── print("\n[CONNECTION]") try: r = session.run("RETURN 1 AS test").single()["test"] check(r == 1, "Database reachable") except Exception as e: check(False, f"Database reachable — {e}") print("\nCannot proceed without a connection.") return False # APOC check try: v = session.run("RETURN apoc.version() AS v").single()["v"] check(True, f"APOC available (v{v})") except Exception: warn("APOC not available — some assistants may need it") # ── 2. CONSTRAINTS ─────────────────────────────────────────── print(f"\n[CONSTRAINTS] (expecting {len(EXPECTED_CONSTRAINTS)})") existing_constraints = set() for rec in session.run("SHOW CONSTRAINTS YIELD name RETURN name"): existing_constraints.add(rec["name"]) missing_constraints = [] for name in EXPECTED_CONSTRAINTS: if name in existing_constraints: pass # Don't print every single one — just summarize else: missing_constraints.append(name) if not missing_constraints: check(True, f"All {len(EXPECTED_CONSTRAINTS)} constraints present") else: check(False, f"Missing {len(missing_constraints)} constraints: {', '.join(missing_constraints[:10])}") if len(missing_constraints) > 10: print(f" ... and {len(missing_constraints) - 10} more") extra_constraints = existing_constraints - set(EXPECTED_CONSTRAINTS) if extra_constraints: warn(f"Extra constraints not in schema: {', '.join(sorted(extra_constraints))}") # ── 3. INDEXES ─────────────────────────────────────────────── print(f"\n[INDEXES]") existing_indexes = set() for rec in session.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name"): existing_indexes.add(rec["name"]) # Don't count constraint-backed indexes (they share names with constraints) pure_indexes = existing_indexes - set(EXPECTED_CONSTRAINTS) total_indexes = len(existing_indexes) check(total_indexes >= len(EXPECTED_CONSTRAINTS), f"Total indexes: {total_indexes} (includes {len(EXPECTED_CONSTRAINTS)} from constraints)") missing_index_samples = [i for i in EXPECTED_INDEX_SAMPLES if i not in existing_indexes] if not missing_index_samples: check(True, f"Key index spot-check passed ({len(EXPECTED_INDEX_SAMPLES)} sampled)") else: check(False, f"Missing indexes: {', '.join(missing_index_samples)}") # ── 4. NODE LABELS ─────────────────────────────────────────── print(f"\n[NODE LABELS]") label_counts = {} for rec in session.run("CALL db.labels() YIELD label RETURN label"): label = rec["label"] count_rec = session.run(f"MATCH (n:`{label}`) RETURN count(n) AS c").single() label_counts[label] = count_rec["c"] labels_in_db = set(label_counts.keys()) unexpected_labels = labels_in_db - EXPECTED_LABELS - {"_Test"} # _Test is from test suite if not unexpected_labels: check(True, "No unexpected labels (no junk from Memory server)") else: check(False, f"Unexpected labels found: {', '.join(sorted(unexpected_labels))}") for ul in sorted(unexpected_labels): print(f" → {ul}: {label_counts.get(ul, '?')} nodes — SHOULD BE REMOVED") labels_with_data = {l for l, c in label_counts.items() if c > 0} print(f" ℹ Labels with data: {len(labels_with_data)} of {len(EXPECTED_LABELS)} schema types") # ── 5. SAMPLE NODES ────────────────────────────────────────── print(f"\n[SAMPLE NODES] (expecting {len(EXPECTED_SAMPLE_NODES)})") for label, node_id in EXPECTED_SAMPLE_NODES: result = session.run( f"MATCH (n:`{label}` {{id: $id}}) RETURN n.id AS id, n.name AS name, " f"n.title AS title, n.updated_at AS updated", id=node_id ).single() if result: display = result["name"] or result["title"] or result["id"] check(True, f"{label}:{node_id} → {display}") else: check(False, f"{label}:{node_id} — NOT FOUND") # ── 6. SAMPLE RELATIONSHIPS ────────────────────────────────── print(f"\n[SAMPLE RELATIONSHIPS] (expecting {len(EXPECTED_SAMPLE_RELS)})") for from_label, from_id, rel_type, to_label, to_id in EXPECTED_SAMPLE_RELS: result = session.run( f"MATCH (a:`{from_label}` {{id: $from_id}})-[r:`{rel_type}`]->(b:`{to_label}` {{id: $to_id}}) " f"RETURN type(r) AS rel", from_id=from_id, to_id=to_id ).single() if result: check(True, f"({from_id})-[:{rel_type}]->({to_id})") else: check(False, f"({from_id})-[:{rel_type}]->({to_id}) — NOT FOUND") # ── 7. RELATIONSHIP SUMMARY ────────────────────────────────── print(f"\n[RELATIONSHIP SUMMARY]") total_rels = session.run("MATCH ()-[r]->() RETURN count(r) AS c").single()["c"] print(f" ℹ Total relationships: {total_rels}") if total_rels > 0: rel_types = list(session.run( "MATCH ()-[r]->() RETURN type(r) AS type, count(r) AS count ORDER BY count DESC" )) for rec in rel_types: print(f" {rec['type']}: {rec['count']}") else: check(False, "No relationships exist — sample data may not have been created") # ── 8. TOTAL NODE COUNT ────────────────────────────────────── print(f"\n[NODE SUMMARY]") total_nodes = session.run("MATCH (n) RETURN count(n) AS c").single()["c"] print(f" ℹ Total nodes: {total_nodes}") if label_counts: for label in sorted(label_counts.keys()): if label_counts[label] > 0: print(f" {label}: {label_counts[label]}") # ── SUMMARY ────────────────────────────────────────────────────── print() print("═" * 65) total = passed + failed if failed == 0: print(f" RESULT: ALL {passed} CHECKS PASSED ✓") if warnings: print(f" ({warnings} warning{'s' if warnings != 1 else ''})") else: print(f" RESULT: {failed} FAILED / {passed} passed / {total} total") if warnings: print(f" ({warnings} warning{'s' if warnings != 1 else ''})") print("═" * 65) print() return failed == 0 def main(): parser = argparse.ArgumentParser( description="Validate Koios Neo4j knowledge graph schema and data" ) parser.add_argument("--uri", "-u", help="Neo4j Bolt URI") parser.add_argument("--user", "-U", help="Neo4j username") args = parser.parse_args() uri, user, password = get_credentials(args) try: driver = GraphDatabase.driver(uri, auth=(user, password)) with driver.session() as session: session.run("RETURN 1") print(f"✓ Connected to {uri}") except AuthError: print(f"✗ Authentication failed for {uri}") sys.exit(1) except ServiceUnavailable: print(f"✗ Cannot connect to {uri}") sys.exit(1) try: success = validate(driver, uri) sys.exit(0 if success else 1) except Exception as e: print(f"ERROR: {e}") sys.exit(1) finally: driver.close() if __name__ == "__main__": main()