Files
koios/utils/neo4j-validate.py
Robert Helewka 7859264359 Add Neo4j schema initialization and validation scripts
- Introduced `neo4j-schema-init.py` for creating the foundational schema for the personal knowledge graph used by multiple AI assistants.
- Implemented functionality for creating constraints, indexes, and sample nodes, along with comprehensive testing of the schema.
- Added `neo4j-validate.py` to perform validation checks on the Neo4j knowledge graph, including constraints, indexes, sample nodes, relationships, and junk data detection.
- Enhanced logging for better traceability and debugging during schema initialization and validation processes.
2026-03-06 14:11:52 +00:00

349 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Neo4j Knowledge Graph Validation
=================================
Comprehensive validation report for the Koios unified knowledge graph.
Checks constraints, indexes, sample nodes, relationships, and detects junk data.
Share the output with an AI assistant to confirm everything is correct.
Usage:
python neo4j-validate.py
python neo4j-validate.py --uri bolt://ariel.incus:7687
Environment Variables (optional):
NEO4J_URI - Bolt URI (default: bolt://localhost:7687)
NEO4J_USER - Username (default: neo4j)
NEO4J_PASSWORD - Password (will prompt if not set)
"""
import argparse
import getpass
import os
import sys
from datetime import datetime
from neo4j import GraphDatabase
from neo4j.exceptions import AuthError, ServiceUnavailable
# ── Expected schema counts ───────────────────────────────────────────────────
EXPECTED_CONSTRAINTS = [
# Universal
"person_id", "location_id", "event_id", "topic_id", "goal_id",
# Nate
"trip_id", "destination_id", "activity_id",
# Hypatia
"book_id", "author_id", "learningpath_id", "concept_id", "quote_id",
# Marcus
"training_id", "exercise_id", "program_id", "personalrecord_id", "bodymetric_id",
# Seneca
"reflection_id", "value_id", "habit_id", "lifeevent_id", "intention_id",
# Bourdain
"recipe_id", "restaurant_id", "ingredient_id", "meal_id", "technique_id",
# Bowie
"music_id", "film_id", "artwork_id", "playlist_id", "artist_id", "style_id",
# Cousteau
"species_id", "plant_id", "tank_id", "garden_id", "ecosystem_id", "observation_id",
# Garth
"account_id", "investment_id", "asset_id", "liability_id", "budget_id", "financialgoal_id",
# Cristiano
"match_id", "team_id", "league_id", "tournament_id", "player_id", "season_id",
# Work: Business
"client_id", "contact_id", "opportunity_id", "proposal_id", "project_id",
# Work: Market Intelligence
"vendor_id", "competitor_id", "markettrend_id", "technology_id",
# Work: Content & Visibility
"content_id", "publication_id",
# Work: Professional Development
"skill_id", "certification_id", "relationship_id",
# Work: Daily Operations
"task_id", "meeting_id", "note_id", "decision_id",
# Engineering
"infrastructure_id", "incident_id", "prototype_id", "experiment_id",
]
# All 74 valid node labels from the schema
EXPECTED_LABELS = {
"Person", "Location", "Event", "Topic", "Goal",
"Trip", "Destination", "Activity",
"Book", "Author", "LearningPath", "Concept", "Quote",
"Training", "Exercise", "Program", "PersonalRecord", "BodyMetric",
"Reflection", "Value", "Habit", "LifeEvent", "Intention",
"Recipe", "Restaurant", "Ingredient", "Meal", "Technique",
"Music", "Film", "Artwork", "Playlist", "Artist", "Style",
"Species", "Plant", "Tank", "Garden", "Ecosystem", "Observation",
"Account", "Investment", "Asset", "Liability", "Budget", "FinancialGoal",
"Match", "Team", "League", "Tournament", "Player", "Season",
"Client", "Contact", "Opportunity", "Proposal", "Project",
"Vendor", "Competitor", "MarketTrend", "Technology",
"Content", "Publication",
"Skill", "Certification", "Relationship",
"Task", "Meeting", "Note", "Decision",
"Infrastructure", "Incident", "Prototype", "Experiment",
}
EXPECTED_SAMPLE_NODES = [
("Person", "user_main"),
("Location", "location_home"),
("Trip", "trip_sample_2025"),
("Book", "book_meditations_aurelius"),
("Goal", "goal_sample_2025"),
("Topic", "topic_stoicism"),
("Topic", "topic_ai_in_cx"),
("Account", "account_tfsa_sample"),
("Client", "client_sample_corp"),
("Skill", "skill_cx_strategy"),
("Infrastructure", "infra_neo4j_prod"),
("Team", "team_arsenal"),
]
EXPECTED_SAMPLE_RELS = [
("Person", "user_main", "SUPPORTS", "Team", "team_arsenal"),
("Person", "user_main", "COMPLETED", "Book", "book_meditations_aurelius"),
("Person", "user_main", "PURSUING", "Goal", "goal_sample_2025"),
("Book", "book_meditations_aurelius", "EXPLORES", "Topic", "topic_stoicism"),
("Person", "user_main", "OWNS", "Account", "account_tfsa_sample"),
]
# A sampling of expected indexes (not exhaustive, just key ones to spot-check)
EXPECTED_INDEX_SAMPLES = [
"person_name", "book_title", "client_name", "event_date",
"training_date", "client_status", "task_status", "event_domain",
"team_name", "player_name", "match_competition",
]
def get_credentials(args):
uri = args.uri or os.environ.get("NEO4J_URI")
if not uri:
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip() or "bolt://localhost:7687"
user = args.user or os.environ.get("NEO4J_USER")
if not user:
user = input("Neo4j username [neo4j]: ").strip() or "neo4j"
password = os.environ.get("NEO4J_PASSWORD")
if not password:
password = getpass.getpass("Neo4j password: ")
if not password:
print("ERROR: Password is required")
sys.exit(1)
return uri, user, password
def validate(driver, uri):
"""Run all validation checks and print the report."""
passed = 0
failed = 0
warnings = 0
def check(ok, label):
nonlocal passed, failed
if ok:
passed += 1
print(f"{label}")
else:
failed += 1
print(f"{label}")
def warn(label):
nonlocal warnings
warnings += 1
print(f"{label}")
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print()
print("" * 65)
print(" VALIDATION REPORT — Koios Unified Knowledge Graph")
print("" * 65)
print(f" Schema Version: 2.1.0")
print(f" Database: {uri}")
print(f" Timestamp: {now}")
print("" * 65)
with driver.session() as session:
# ── 1. CONNECTION ────────────────────────────────────────────
print("\n[CONNECTION]")
try:
r = session.run("RETURN 1 AS test").single()["test"]
check(r == 1, "Database reachable")
except Exception as e:
check(False, f"Database reachable — {e}")
print("\nCannot proceed without a connection.")
return False
# APOC check
try:
v = session.run("RETURN apoc.version() AS v").single()["v"]
check(True, f"APOC available (v{v})")
except Exception:
warn("APOC not available — some assistants may need it")
# ── 2. CONSTRAINTS ───────────────────────────────────────────
print(f"\n[CONSTRAINTS] (expecting {len(EXPECTED_CONSTRAINTS)})")
existing_constraints = set()
for rec in session.run("SHOW CONSTRAINTS YIELD name RETURN name"):
existing_constraints.add(rec["name"])
missing_constraints = []
for name in EXPECTED_CONSTRAINTS:
if name in existing_constraints:
pass # Don't print every single one — just summarize
else:
missing_constraints.append(name)
if not missing_constraints:
check(True, f"All {len(EXPECTED_CONSTRAINTS)} constraints present")
else:
check(False, f"Missing {len(missing_constraints)} constraints: {', '.join(missing_constraints[:10])}")
if len(missing_constraints) > 10:
print(f" ... and {len(missing_constraints) - 10} more")
extra_constraints = existing_constraints - set(EXPECTED_CONSTRAINTS)
if extra_constraints:
warn(f"Extra constraints not in schema: {', '.join(sorted(extra_constraints))}")
# ── 3. INDEXES ───────────────────────────────────────────────
print(f"\n[INDEXES]")
existing_indexes = set()
for rec in session.run("SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name"):
existing_indexes.add(rec["name"])
# Don't count constraint-backed indexes (they share names with constraints)
pure_indexes = existing_indexes - set(EXPECTED_CONSTRAINTS)
total_indexes = len(existing_indexes)
check(total_indexes >= len(EXPECTED_CONSTRAINTS),
f"Total indexes: {total_indexes} (includes {len(EXPECTED_CONSTRAINTS)} from constraints)")
missing_index_samples = [i for i in EXPECTED_INDEX_SAMPLES if i not in existing_indexes]
if not missing_index_samples:
check(True, f"Key index spot-check passed ({len(EXPECTED_INDEX_SAMPLES)} sampled)")
else:
check(False, f"Missing indexes: {', '.join(missing_index_samples)}")
# ── 4. NODE LABELS ───────────────────────────────────────────
print(f"\n[NODE LABELS]")
label_counts = {}
for rec in session.run("CALL db.labels() YIELD label RETURN label"):
label = rec["label"]
count_rec = session.run(f"MATCH (n:`{label}`) RETURN count(n) AS c").single()
label_counts[label] = count_rec["c"]
labels_in_db = set(label_counts.keys())
unexpected_labels = labels_in_db - EXPECTED_LABELS - {"_Test"} # _Test is from test suite
if not unexpected_labels:
check(True, "No unexpected labels (no junk from Memory server)")
else:
check(False, f"Unexpected labels found: {', '.join(sorted(unexpected_labels))}")
for ul in sorted(unexpected_labels):
print(f"{ul}: {label_counts.get(ul, '?')} nodes — SHOULD BE REMOVED")
labels_with_data = {l for l, c in label_counts.items() if c > 0}
print(f" Labels with data: {len(labels_with_data)} of {len(EXPECTED_LABELS)} schema types")
# ── 5. SAMPLE NODES ──────────────────────────────────────────
print(f"\n[SAMPLE NODES] (expecting {len(EXPECTED_SAMPLE_NODES)})")
for label, node_id in EXPECTED_SAMPLE_NODES:
result = session.run(
f"MATCH (n:`{label}` {{id: $id}}) RETURN n.id AS id, n.name AS name, "
f"n.title AS title, n.updated_at AS updated",
id=node_id
).single()
if result:
display = result["name"] or result["title"] or result["id"]
check(True, f"{label}:{node_id}{display}")
else:
check(False, f"{label}:{node_id} — NOT FOUND")
# ── 6. SAMPLE RELATIONSHIPS ──────────────────────────────────
print(f"\n[SAMPLE RELATIONSHIPS] (expecting {len(EXPECTED_SAMPLE_RELS)})")
for from_label, from_id, rel_type, to_label, to_id in EXPECTED_SAMPLE_RELS:
result = session.run(
f"MATCH (a:`{from_label}` {{id: $from_id}})-[r:`{rel_type}`]->(b:`{to_label}` {{id: $to_id}}) "
f"RETURN type(r) AS rel",
from_id=from_id, to_id=to_id
).single()
if result:
check(True, f"({from_id})-[:{rel_type}]->({to_id})")
else:
check(False, f"({from_id})-[:{rel_type}]->({to_id}) — NOT FOUND")
# ── 7. RELATIONSHIP SUMMARY ──────────────────────────────────
print(f"\n[RELATIONSHIP SUMMARY]")
total_rels = session.run("MATCH ()-[r]->() RETURN count(r) AS c").single()["c"]
print(f" Total relationships: {total_rels}")
if total_rels > 0:
rel_types = list(session.run(
"MATCH ()-[r]->() RETURN type(r) AS type, count(r) AS count ORDER BY count DESC"
))
for rec in rel_types:
print(f" {rec['type']}: {rec['count']}")
else:
check(False, "No relationships exist — sample data may not have been created")
# ── 8. TOTAL NODE COUNT ──────────────────────────────────────
print(f"\n[NODE SUMMARY]")
total_nodes = session.run("MATCH (n) RETURN count(n) AS c").single()["c"]
print(f" Total nodes: {total_nodes}")
if label_counts:
for label in sorted(label_counts.keys()):
if label_counts[label] > 0:
print(f" {label}: {label_counts[label]}")
# ── SUMMARY ──────────────────────────────────────────────────────
print()
print("" * 65)
total = passed + failed
if failed == 0:
print(f" RESULT: ALL {passed} CHECKS PASSED ✓")
if warnings:
print(f" ({warnings} warning{'s' if warnings != 1 else ''})")
else:
print(f" RESULT: {failed} FAILED / {passed} passed / {total} total")
if warnings:
print(f" ({warnings} warning{'s' if warnings != 1 else ''})")
print("" * 65)
print()
return failed == 0
def main():
parser = argparse.ArgumentParser(
description="Validate Koios Neo4j knowledge graph schema and data"
)
parser.add_argument("--uri", "-u", help="Neo4j Bolt URI")
parser.add_argument("--user", "-U", help="Neo4j username")
args = parser.parse_args()
uri, user, password = get_credentials(args)
try:
driver = GraphDatabase.driver(uri, auth=(user, password))
with driver.session() as session:
session.run("RETURN 1")
print(f"✓ Connected to {uri}")
except AuthError:
print(f"✗ Authentication failed for {uri}")
sys.exit(1)
except ServiceUnavailable:
print(f"✗ Cannot connect to {uri}")
sys.exit(1)
try:
success = validate(driver, uri)
sys.exit(0 if success else 1)
except Exception as e:
print(f"ERROR: {e}")
sys.exit(1)
finally:
driver.close()
if __name__ == "__main__":
main()