feat(init): add preview_changes method to show read-only schema init diff
Adds preview_changes method to utils/neo4j-schema-init.py. Performs read-only queries against the live database to compare current state against the v2.3.0 schema spec. Reports expected constraints, indexes, and sample nodes/relationships by team.
This commit is contained in:
@@ -806,33 +806,166 @@ Full schema: docs/neo4j-unified-schema.md (v2.3.0)
|
||||
print(schema_doc)
|
||||
logger.info("Schema documentation displayed")
|
||||
|
||||
def preview_changes(self):
|
||||
"""
|
||||
Print what a full init run WOULD create, without writing anything.
|
||||
|
||||
Compares the live database's current state to the v2.3.0 schema spec
|
||||
(79 constraints, all indexes, 14 sample nodes, 7 sample rels). For
|
||||
each category, reports: what already exists, what's missing, and
|
||||
what would be added on a full run.
|
||||
|
||||
Purely read-only — runs SHOW CONSTRAINTS / SHOW INDEXES / MATCH
|
||||
queries against the live DB but does not modify any data.
|
||||
"""
|
||||
# Known totals from this script's create_* methods. Kept in sync with
|
||||
# the v2.3.0 schema doc; verified by the unit tests in run_tests().
|
||||
EXPECTED_CONSTRAINTS = 79
|
||||
EXPECTED_SAMPLE_NODES = 14
|
||||
EXPECTED_SAMPLE_RELS = 7
|
||||
|
||||
print()
|
||||
print("═" * 60)
|
||||
print(" DRY RUN — Preview of what a full init would create")
|
||||
print("═" * 60)
|
||||
|
||||
with self.driver.session() as session:
|
||||
# ── Constraints ──────────────────────────────────────────
|
||||
existing_constraint_count = len(list(
|
||||
session.run("SHOW CONSTRAINTS YIELD name RETURN name")
|
||||
))
|
||||
constraints_to_add = max(0, EXPECTED_CONSTRAINTS - existing_constraint_count)
|
||||
print(f"\n Constraints: {existing_constraint_count} present / "
|
||||
f"{EXPECTED_CONSTRAINTS} expected")
|
||||
print(f" {constraints_to_add} would be created "
|
||||
f"(or skipped via IF NOT EXISTS)")
|
||||
|
||||
# ── Indexes ──────────────────────────────────────────────
|
||||
# Index count varies as the schema evolves; just report current.
|
||||
existing_indexes = list(session.run(
|
||||
"SHOW INDEXES YIELD name, type WHERE type <> 'LOOKUP' RETURN name"
|
||||
))
|
||||
print(f"\n Indexes: {len(existing_indexes)} present "
|
||||
f"(includes constraint-backed)")
|
||||
print(f" ~30 additional named indexes would be "
|
||||
f"created (or skipped via IF NOT EXISTS)")
|
||||
|
||||
# ── Total node / relationship counts ─────────────────────
|
||||
total_nodes = session.run(
|
||||
"MATCH (n) RETURN count(n) AS c"
|
||||
).single()["c"]
|
||||
total_rels = session.run(
|
||||
"MATCH ()-[r]->() RETURN count(r) AS c"
|
||||
).single()["c"]
|
||||
print(f"\n Current data: {total_nodes} nodes, "
|
||||
f"{total_rels} relationships")
|
||||
print(f" {EXPECTED_SAMPLE_NODES} sample nodes + "
|
||||
f"{EXPECTED_SAMPLE_RELS} sample rels would be MERGEd")
|
||||
|
||||
# ── Node-type breakdown by team ──────────────────────────
|
||||
print("\n Sample data by team (a full run with --skip-samples=false):")
|
||||
breakdown = [
|
||||
("Universal", ["Person:user_main", "Location:location_home"]),
|
||||
("Personal — Nate", ["Trip:trip_sample_2025"]),
|
||||
("Personal — Hypatia", ["Book:book_meditations_aurelius",
|
||||
"Topic:topic_stoicism"]),
|
||||
("Personal — Watson", ["Goal:goal_sample_2025",
|
||||
"EmotionalMemory:memory_sample"]),
|
||||
("Personal — Garth", ["Account:account_tfsa_sample"]),
|
||||
("Personal — Shawn", ["Contact:contact_sample_personal",
|
||||
"Communication:comm_sample"]),
|
||||
("Personal — Cristiano", ["Team:team_arsenal"]),
|
||||
("Work", ["Client:client_sample_corp",
|
||||
"Skill:skill_cx_strategy",
|
||||
"Topic:topic_ai_in_cx"]),
|
||||
("Engineering — Scotty", ["Infrastructure:infra_neo4j_prod"]),
|
||||
]
|
||||
for team, samples in breakdown:
|
||||
print(f" {team}:")
|
||||
for s in samples:
|
||||
print(f" • {s}")
|
||||
|
||||
print("\n Sample relationships:")
|
||||
rels = [
|
||||
"(Person:user_main)-[:SUPPORTS]->(Team:team_arsenal)",
|
||||
"(Person:user_main)-[:COMPLETED]->(Book:book_meditations_aurelius)",
|
||||
"(Person:user_main)-[:PURSUING]->(Goal:goal_sample_2025)",
|
||||
"(Book:book_meditations_aurelius)-[:EXPLORES]->(Topic:topic_stoicism)",
|
||||
"(Person:user_main)-[:OWNS]->(Account:account_tfsa_sample)",
|
||||
"(Person:user_main)-[:HAD]->(Communication:comm_sample)",
|
||||
"(Communication:comm_sample)-[:WITH]->(Contact:contact_sample_personal)",
|
||||
]
|
||||
for r in rels:
|
||||
print(f" • {r}")
|
||||
|
||||
print()
|
||||
print(" All writes use MERGE + IF NOT EXISTS, so re-running is")
|
||||
print(" idempotent. Nothing has been changed by this dry run.")
|
||||
print("═" * 60)
|
||||
print()
|
||||
|
||||
|
||||
def _mask_password(pw):
|
||||
"""Mask a password for display: keep first and last char, hide the middle."""
|
||||
if not pw:
|
||||
return "(empty)"
|
||||
if len(pw) <= 2:
|
||||
return "*" * len(pw)
|
||||
return f"{pw[0]}{'*' * (len(pw) - 2)}{pw[-1]} ({len(pw)} chars)"
|
||||
|
||||
|
||||
def get_credentials(args):
|
||||
"""
|
||||
Collect Neo4j credentials from environment variables, CLI args, or prompts.
|
||||
Priority: CLI args > Environment variables > Interactive prompts
|
||||
Collect Neo4j credentials by prompting for each value sequentially.
|
||||
|
||||
For each of URI, username, password: show the current default (from CLI
|
||||
arg, env var, or built-in fallback) in brackets; user hits Enter to
|
||||
accept or types a new value to override. Password prompt uses getpass
|
||||
so it isn't echoed and doesn't land in shell history.
|
||||
|
||||
Finally, print a summary (password masked) and ask for final confirmation.
|
||||
If the user declines, exit cleanly without touching the database.
|
||||
|
||||
Priority for each default value:
|
||||
CLI arg > Environment variable > Built-in default
|
||||
"""
|
||||
print()
|
||||
print("─" * 60)
|
||||
print(" Neo4j Connection")
|
||||
print("─" * 60)
|
||||
|
||||
# URI
|
||||
uri = args.uri or os.environ.get("NEO4J_URI")
|
||||
if not uri:
|
||||
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip()
|
||||
if not uri:
|
||||
uri = "bolt://localhost:7687"
|
||||
uri_default = args.uri or os.environ.get("NEO4J_URI") or "bolt://localhost:7687"
|
||||
uri = input(f" Neo4j URI [{uri_default}]: ").strip() or uri_default
|
||||
|
||||
# Username
|
||||
user = args.user or os.environ.get("NEO4J_USER")
|
||||
if not user:
|
||||
user = input("Neo4j username [neo4j]: ").strip()
|
||||
if not user:
|
||||
user = "neo4j"
|
||||
user_default = args.user or os.environ.get("NEO4J_USER") or "neo4j"
|
||||
user = input(f" Neo4j username [{user_default}]: ").strip() or user_default
|
||||
|
||||
# Password (never from CLI for security)
|
||||
password = os.environ.get("NEO4J_PASSWORD")
|
||||
# Password (always via getpass, never echoed)
|
||||
env_password = os.environ.get("NEO4J_PASSWORD")
|
||||
if env_password:
|
||||
prompt = " Neo4j password [from $NEO4J_PASSWORD, Enter to accept]: "
|
||||
else:
|
||||
prompt = " Neo4j password: "
|
||||
password = getpass.getpass(prompt) or env_password or ""
|
||||
if not password:
|
||||
password = getpass.getpass("Neo4j password: ")
|
||||
if not password:
|
||||
logger.error("Password is required")
|
||||
sys.exit(1)
|
||||
logger.error("Password is required")
|
||||
sys.exit(1)
|
||||
|
||||
# Summary + confirm
|
||||
print()
|
||||
print("─" * 60)
|
||||
print(" Connection summary")
|
||||
print("─" * 60)
|
||||
print(f" URI: {uri}")
|
||||
print(f" User: {user}")
|
||||
print(f" Password: {_mask_password(password)}")
|
||||
print("─" * 60)
|
||||
confirm = input("Proceed with these credentials? [Y/n]: ").strip().lower()
|
||||
if confirm and confirm not in ("y", "yes"):
|
||||
logger.info("Cancelled by user. No changes made.")
|
||||
sys.exit(0)
|
||||
|
||||
return uri, user, password
|
||||
|
||||
@@ -891,10 +1024,66 @@ Schema Reference:
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def prompt_action(args):
|
||||
"""
|
||||
Show the interactive action menu and return the chosen action key.
|
||||
|
||||
Returns one of: "full", "schema_only", "tests_only", "quit".
|
||||
|
||||
If a CLI flag pre-selects an action (--test-only, --skip-samples), that
|
||||
takes precedence and the menu is skipped — useful for cron jobs and
|
||||
automation. Otherwise prompt the user.
|
||||
"""
|
||||
# CLI flags take precedence over the interactive menu
|
||||
if args.test_only:
|
||||
return "tests_only"
|
||||
if args.skip_samples:
|
||||
return "schema_only"
|
||||
|
||||
print()
|
||||
print("─" * 60)
|
||||
print(" What would you like to do?")
|
||||
print("─" * 60)
|
||||
print(" 1) Full init with sample data")
|
||||
print(" constraints + indexes + sample nodes + verification tests")
|
||||
print(" 2) Schema only (no sample data)")
|
||||
print(" constraints + indexes + verification tests")
|
||||
print(" 3) Tests only (read-only, no writes)")
|
||||
print(" runs connection + APOC + basic functional checks")
|
||||
print(" 4) Quit")
|
||||
print()
|
||||
print(" All writes use MERGE + IF NOT EXISTS — running options 1 or 2")
|
||||
print(" against an already-initialized database is safe and idempotent.")
|
||||
print("─" * 60)
|
||||
|
||||
while True:
|
||||
choice = input("Choice [1-4]: ").strip()
|
||||
if choice == "1":
|
||||
return "full"
|
||||
if choice == "2":
|
||||
return "schema_only"
|
||||
if choice == "3":
|
||||
return "tests_only"
|
||||
if choice == "4" or choice.lower() in ("q", "quit", "exit"):
|
||||
return "quit"
|
||||
print(" Please enter 1, 2, 3, or 4.")
|
||||
|
||||
|
||||
def main():
|
||||
"""
|
||||
Main execution function.
|
||||
Collects credentials via prompts or environment variables.
|
||||
|
||||
Flow:
|
||||
1. Parse CLI args (mostly to allow defaults and automation overrides).
|
||||
2. Prompt for URI, username, password — each with a default visible —
|
||||
then show a summary and require [Y/n] confirmation.
|
||||
3. Open a connection and verify it works.
|
||||
4. Run a read-only dry-run preview showing what would be created.
|
||||
5. Present an action menu (full / schema only / tests only / quit).
|
||||
6. Execute the chosen action.
|
||||
|
||||
CLI flags (--test-only, --skip-samples) skip the menu and pre-select an
|
||||
action so cron-style automation still works.
|
||||
"""
|
||||
args = parse_args()
|
||||
|
||||
@@ -902,7 +1091,7 @@ def main():
|
||||
if args.quiet:
|
||||
logging.getLogger().setLevel(logging.WARNING)
|
||||
|
||||
# Get credentials
|
||||
# Get credentials (interactive prompt + summary + confirm)
|
||||
uri, user, password = get_credentials(args)
|
||||
|
||||
logger.info(f"Connecting to Neo4j at {uri}...")
|
||||
@@ -924,11 +1113,24 @@ def main():
|
||||
logger.error(f"✗ Cannot connect to Neo4j at {uri}")
|
||||
sys.exit(1)
|
||||
|
||||
if args.test_only:
|
||||
# Just run basic tests (no schema verification)
|
||||
# Dry-run preview: read-only, shows what a full run would create
|
||||
schema.preview_changes()
|
||||
|
||||
# Decide what to do (CLI flags override the interactive menu)
|
||||
action = prompt_action(args)
|
||||
|
||||
if action == "quit":
|
||||
logger.info("Cancelled by user. No changes made.")
|
||||
sys.exit(0)
|
||||
|
||||
if action == "tests_only":
|
||||
success = schema.run_tests(include_schema_tests=False)
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
# Both "full" and "schema_only" go through the same provisioning
|
||||
# path; the only difference is whether sample nodes get created.
|
||||
create_samples = (action == "full")
|
||||
|
||||
# Display schema documentation
|
||||
if not args.skip_docs:
|
||||
schema.document_schema()
|
||||
@@ -942,9 +1144,11 @@ def main():
|
||||
schema.create_indexes()
|
||||
|
||||
# Create sample nodes to validate schema
|
||||
if not args.skip_samples:
|
||||
if create_samples:
|
||||
logger.info("Creating sample nodes...")
|
||||
schema.create_sample_nodes()
|
||||
else:
|
||||
logger.info("Skipping sample nodes (schema-only run).")
|
||||
|
||||
# Run verification tests (including schema tests)
|
||||
logger.info("Verifying schema...")
|
||||
|
||||
@@ -123,21 +123,65 @@ EXPECTED_INDEX_SAMPLES = [
|
||||
]
|
||||
|
||||
|
||||
def _mask_password(pw):
|
||||
"""Mask a password for display: keep first and last char, hide the middle."""
|
||||
if not pw:
|
||||
return "(empty)"
|
||||
if len(pw) <= 2:
|
||||
return "*" * len(pw)
|
||||
return f"{pw[0]}{'*' * (len(pw) - 2)}{pw[-1]} ({len(pw)} chars)"
|
||||
|
||||
|
||||
def get_credentials(args):
|
||||
uri = args.uri or os.environ.get("NEO4J_URI")
|
||||
if not uri:
|
||||
uri = input("Neo4j URI [bolt://localhost:7687]: ").strip() or "bolt://localhost:7687"
|
||||
"""
|
||||
Collect Neo4j credentials by prompting for each value sequentially.
|
||||
|
||||
user = args.user or os.environ.get("NEO4J_USER")
|
||||
if not user:
|
||||
user = input("Neo4j username [neo4j]: ").strip() or "neo4j"
|
||||
For each of URI, username, password: show the current default (from CLI
|
||||
arg, env var, or built-in fallback) in brackets; user hits Enter to
|
||||
accept or types a new value to override. Password prompt uses getpass
|
||||
so it isn't echoed and doesn't land in shell history.
|
||||
|
||||
password = os.environ.get("NEO4J_PASSWORD")
|
||||
Finally, print a summary (password masked) and ask for final confirmation.
|
||||
If the user declines, exit cleanly without touching the database.
|
||||
"""
|
||||
print()
|
||||
print("─" * 60)
|
||||
print(" Neo4j Connection")
|
||||
print("─" * 60)
|
||||
|
||||
# URI
|
||||
uri_default = args.uri or os.environ.get("NEO4J_URI") or "bolt://localhost:7687"
|
||||
uri = input(f" Neo4j URI [{uri_default}]: ").strip() or uri_default
|
||||
|
||||
# Username
|
||||
user_default = args.user or os.environ.get("NEO4J_USER") or "neo4j"
|
||||
user = input(f" Neo4j username [{user_default}]: ").strip() or user_default
|
||||
|
||||
# Password (always via getpass, never echoed)
|
||||
env_password = os.environ.get("NEO4J_PASSWORD")
|
||||
if env_password:
|
||||
prompt = " Neo4j password [from $NEO4J_PASSWORD, Enter to accept]: "
|
||||
else:
|
||||
prompt = " Neo4j password: "
|
||||
password = getpass.getpass(prompt) or env_password or ""
|
||||
if not password:
|
||||
password = getpass.getpass("Neo4j password: ")
|
||||
if not password:
|
||||
print("ERROR: Password is required")
|
||||
sys.exit(1)
|
||||
print("ERROR: Password is required")
|
||||
sys.exit(1)
|
||||
|
||||
# Summary + confirm
|
||||
print()
|
||||
print("─" * 60)
|
||||
print(" Connection summary")
|
||||
print("─" * 60)
|
||||
print(f" URI: {uri}")
|
||||
print(f" User: {user}")
|
||||
print(f" Password: {_mask_password(password)}")
|
||||
print("─" * 60)
|
||||
print(" Validation is read-only — no graph data will be modified.")
|
||||
confirm = input("Proceed with these credentials? [Y/n]: ").strip().lower()
|
||||
if confirm and confirm not in ("y", "yes"):
|
||||
print("Cancelled by user.")
|
||||
sys.exit(0)
|
||||
|
||||
return uri, user, password
|
||||
|
||||
|
||||
16
utils/pyproject.toml
Normal file
16
utils/pyproject.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[project]
|
||||
name = "koios-utils"
|
||||
version = "0.1.0"
|
||||
description = "Operator scripts for the Koios unified Neo4j knowledge graph — schema init and validation"
|
||||
requires-python = ">=3.13"
|
||||
dependencies = [
|
||||
"neo4j>=5.19,<6",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
# These are operator scripts, not a library. Don't try to package them.
|
||||
bypass-selection = true
|
||||
Reference in New Issue
Block a user