diff --git a/mnemosyne/mcp_server/admin.py b/mnemosyne/mcp_server/admin.py index 15c839e..41e0a64 100644 --- a/mnemosyne/mcp_server/admin.py +++ b/mnemosyne/mcp_server/admin.py @@ -16,14 +16,23 @@ class MCPTokenAdmin(admin.ModelAdmin): ] list_filter = ["is_active"] search_fields = ["name", "user__email", "user__username"] - readonly_fields = ["token", "last_used_at", "created_at", "updated_at"] + readonly_fields = ["token_hash", "last_used_at", "created_at", "updated_at"] fieldsets = ( (None, {"fields": ("user", "name", "is_active")}), ("Restrictions", {"fields": ("allowed_tools", "expires_at")}), - ("Token (shown once at creation)", {"fields": ("token",)}), + ( + "Token (hashed at rest — plaintext is shown only once at creation)", + {"fields": ("token_hash",)}, + ), ("Audit", {"fields": ("last_used_at", "created_at", "updated_at")}), ) @admin.display(description="Token") def masked_token(self, obj): return obj.get_masked_token() + + def has_add_permission(self, request): + # Tokens must be created via the dashboard or management command + # so the plaintext can be surfaced to the user. Adding via admin + # would persist a hash with no plaintext ever shown. + return False diff --git a/mnemosyne/mcp_server/auth.py b/mnemosyne/mcp_server/auth.py index 546c4c6..075e287 100644 --- a/mnemosyne/mcp_server/auth.py +++ b/mnemosyne/mcp_server/auth.py @@ -12,7 +12,7 @@ from fastmcp.server.dependencies import get_http_request from fastmcp.server.middleware import Middleware, MiddlewareContext from .metrics import mcp_auth_failures_total -from .models import MCPToken +from .models import MCPToken, hash_token logger = logging.getLogger(__name__) @@ -25,9 +25,17 @@ class MCPAuthError(Exception): def resolve_mcp_user(token_string: str): - """Resolve a bearer token to (user, MCPToken). Raises MCPAuthError on any failure.""" + """Resolve a bearer token to (user, MCPToken). Raises MCPAuthError on any failure. + + Hashes the incoming bearer and looks up by the hash — plaintext is never + stored or compared directly. + """ try: - token = MCPToken.objects.select_related("user").get(token=token_string) + token = ( + MCPToken.objects + .select_related("user") + .get(token_hash=hash_token(token_string)) + ) except MCPToken.DoesNotExist: raise MCPAuthError("Invalid MCP token.") diff --git a/mnemosyne/mcp_server/forms.py b/mnemosyne/mcp_server/forms.py new file mode 100644 index 0000000..b691892 --- /dev/null +++ b/mnemosyne/mcp_server/forms.py @@ -0,0 +1,85 @@ +"""Forms for the MCP token self-service dashboard.""" + +from __future__ import annotations + +import asyncio +import functools + +from django import forms + +from .models import MCPToken + + +@functools.lru_cache(maxsize=1) +def _registered_tool_names() -> list[str]: + """Pull the list of registered MCP tool names from the FastMCP instance. + + Cached at module level — the tool registry is fixed at server startup. + Importing here (rather than at module import) avoids circulars when the + server module imports the dashboard pieces transitively. + """ + from .server import mcp + + tools = asyncio.run(mcp.get_tools()) + return sorted(tools.keys()) + + +def _tool_choices() -> list[tuple[str, str]]: + return [(name, name) for name in _registered_tool_names()] + + +class MCPTokenCreateForm(forms.Form): + """Generate a new bearer token. The token value itself is server-generated.""" + + name = forms.CharField( + max_length=100, + widget=forms.TextInput(attrs={ + "class": "input input-bordered w-full", + "placeholder": "e.g. Claude Desktop, CI script", + }), + ) + expires_at = forms.DateTimeField( + required=False, + widget=forms.DateTimeInput(attrs={ + "class": "input input-bordered w-full", + "type": "datetime-local", + }), + help_text="Leave blank for no expiry.", + ) + allowed_tools = forms.MultipleChoiceField( + required=False, + widget=forms.CheckboxSelectMultiple(attrs={"class": "checkbox checkbox-sm"}), + help_text="Leave all unchecked to permit every tool.", + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["allowed_tools"].choices = _tool_choices() + + +class MCPTokenEditForm(forms.ModelForm): + """Edit token metadata. The hashed token itself cannot be edited.""" + + allowed_tools = forms.MultipleChoiceField( + required=False, + widget=forms.CheckboxSelectMultiple(attrs={"class": "checkbox checkbox-sm"}), + help_text="Leave all unchecked to permit every tool.", + ) + + class Meta: + model = MCPToken + fields = ["name", "is_active", "expires_at", "allowed_tools"] + widgets = { + "name": forms.TextInput(attrs={"class": "input input-bordered w-full"}), + "is_active": forms.CheckboxInput(attrs={"class": "toggle toggle-primary"}), + "expires_at": forms.DateTimeInput(attrs={ + "class": "input input-bordered w-full", + "type": "datetime-local", + }), + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["allowed_tools"].choices = _tool_choices() + if self.instance and self.instance.pk: + self.fields["allowed_tools"].initial = self.instance.allowed_tools or [] diff --git a/mnemosyne/mcp_server/management/commands/create_mcp_token.py b/mnemosyne/mcp_server/management/commands/create_mcp_token.py index b39d853..6438a60 100644 --- a/mnemosyne/mcp_server/management/commands/create_mcp_token.py +++ b/mnemosyne/mcp_server/management/commands/create_mcp_token.py @@ -57,7 +57,7 @@ class Command(BaseCommand): raise CommandError("--expires-days must be at least 1.") expires_at = timezone.now() + timedelta(days=options["expires_days"]) - token = MCPToken.objects.create( + token, plaintext = MCPToken.objects.create_token( user=user, name=options["name"], allowed_tools=allowed_tools, @@ -73,5 +73,5 @@ class Command(BaseCommand): self.stdout.write(" Tools: (all)") if expires_at: self.stdout.write(f" Expires: {expires_at.isoformat()}") - self.stdout.write(self.style.WARNING(" Token (shown once):")) - self.stdout.write(f" {token.token}") + self.stdout.write(self.style.WARNING(" Token (shown once — store it now):")) + self.stdout.write(f" {plaintext}") diff --git a/mnemosyne/mcp_server/migrations/0002_hash_token.py b/mnemosyne/mcp_server/migrations/0002_hash_token.py new file mode 100644 index 0000000..55a8aab --- /dev/null +++ b/mnemosyne/mcp_server/migrations/0002_hash_token.py @@ -0,0 +1,43 @@ +"""Hash MCPToken values at rest. + +Renames ``token`` → ``token_hash`` and rewrites any pre-existing plaintext +values into SHA-256 hex digests in-place. Forward-only: hashing is one-way, +so no reverse migration is provided. + +Existing tokens issued before this migration keep working only because +``resolve_mcp_user`` hashes the incoming bearer before lookup; the original +plaintext the client holds still hashes to what we just wrote. +""" + +import hashlib + +from django.db import migrations, models + + +def hash_existing_tokens(apps, schema_editor): + MCPToken = apps.get_model("mcp_server", "MCPToken") + for token in MCPToken.objects.all(): + plaintext = token.token_hash # post-rename, still holds original plaintext + token.token_hash = hashlib.sha256(plaintext.encode("utf-8")).hexdigest() + token.save(update_fields=["token_hash"]) + + +def noop_reverse(apps, schema_editor): + # Cannot reverse a hash. Leaving as no-op so the schema can be rolled + # back, but operators must understand any hashed rows are unrecoverable. + pass + + +class Migration(migrations.Migration): + dependencies = [ + ("mcp_server", "0001_initial"), + ] + + operations = [ + migrations.RenameField( + model_name="mcptoken", + old_name="token", + new_name="token_hash", + ), + migrations.RunPython(hash_existing_tokens, noop_reverse), + ] diff --git a/mnemosyne/mcp_server/models.py b/mnemosyne/mcp_server/models.py index 29d6113..aaeeb57 100644 --- a/mnemosyne/mcp_server/models.py +++ b/mnemosyne/mcp_server/models.py @@ -1,3 +1,4 @@ +import hashlib import secrets from django.conf import settings @@ -5,15 +6,44 @@ from django.db import models from django.utils import timezone +def hash_token(plaintext: str) -> str: + """SHA-256 hex digest of an MCP bearer token. 64 chars.""" + return hashlib.sha256(plaintext.encode("utf-8")).hexdigest() + + +class MCPTokenManager(models.Manager): + def create_token(self, *, user, name, allowed_tools=None, expires_at=None): + """Generate a new bearer token, store its hash, and return (instance, plaintext). + + The plaintext is returned exactly once and is never persisted. Callers + must surface it to the human and rely on the user to copy it; after + this method returns, the plaintext is unrecoverable from the database. + """ + plaintext = secrets.token_urlsafe(48) + instance = self.create( + user=user, + name=name, + token_hash=hash_token(plaintext), + allowed_tools=list(allowed_tools or []), + expires_at=expires_at, + ) + return instance, plaintext + + class MCPToken(models.Model): - """Bearer token for authenticating MCP tool calls. See docs/Pattern_Django-MCP_V1-00.md.""" + """Bearer token for authenticating MCP tool calls. + + Tokens are hashed at rest (SHA-256, 64-char hex). Plaintext exists only in + memory at creation time, on the wire to the client, and in the user's own + storage. A leaked database backup discloses no usable credentials. + """ user = models.ForeignKey( settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name="mcp_tokens", ) - token = models.CharField(max_length=64, unique=True, db_index=True) + token_hash = models.CharField(max_length=64, unique=True, db_index=True) name = models.CharField(max_length=100) is_active = models.BooleanField(default=True) expires_at = models.DateTimeField(null=True, blank=True) @@ -22,17 +52,14 @@ class MCPToken(models.Model): created_at = models.DateTimeField(auto_now_add=True) updated_at = models.DateTimeField(auto_now=True) + objects = MCPTokenManager() + class Meta: ordering = ["-created_at"] def __str__(self): return f"{self.name} ({self.user})" - def save(self, **kwargs): - if not self.token: - self.token = secrets.token_urlsafe(48) - super().save(**kwargs) - @property def is_valid(self) -> bool: if not self.is_active: @@ -51,6 +78,9 @@ class MCPToken(models.Model): self.save(update_fields=["last_used_at"]) def get_masked_token(self) -> str: - if len(self.token) > 8: - return f"{'*' * (len(self.token) - 8)}{self.token[-8:]}" - return "*" * len(self.token) + """Token-id-style display for admin and dashboard. + + Plaintext is unrecoverable, so we display the first 8 chars of the + hash prefixed with `mcp_…`. Stable per token, never reveals plaintext. + """ + return f"mcp_…{self.token_hash[:8]}" diff --git a/mnemosyne/mcp_server/templates/mcp_server/tokens/create.html b/mnemosyne/mcp_server/templates/mcp_server/tokens/create.html new file mode 100644 index 0000000..855eb6e --- /dev/null +++ b/mnemosyne/mcp_server/templates/mcp_server/tokens/create.html @@ -0,0 +1,63 @@ +{% extends "themis/base.html" %} + +{% block title %}Generate MCP Token — {{ themis_app_name }}{% endblock %} + +{% block content %} +
+

Generate MCP Token

+ +
+ The token will be displayed once after creation. Save it before leaving the page — it cannot be recovered. +
+ +
+ {% csrf_token %} + +
+
+
+ + {{ form.name }} + +
+
+ + {{ form.expires_at }} + +
+
+
+ +
+
+

Allowed Tools

+

{{ form.allowed_tools.help_text }}

+
+ {% for choice in form.allowed_tools %} + + {% endfor %} +
+ {% if form.allowed_tools.errors %} +
{{ form.allowed_tools.errors }}
+ {% endif %} +
+
+ +
+ Cancel + +
+
+
+{% endblock %} diff --git a/mnemosyne/mcp_server/templates/mcp_server/tokens/created.html b/mnemosyne/mcp_server/templates/mcp_server/tokens/created.html new file mode 100644 index 0000000..6ba101d --- /dev/null +++ b/mnemosyne/mcp_server/templates/mcp_server/tokens/created.html @@ -0,0 +1,50 @@ +{% extends "themis/base.html" %} + +{% block title %}MCP Token Created — {{ themis_app_name }}{% endblock %} + +{% block content %} +
+

MCP Token Created

+

{{ token.name }}

+ +
+ Save this token now. Once you leave this page it cannot be retrieved — only the hash is stored. If lost, you will need to generate a new one. +
+ +
+
+

Token

+
{{ plaintext }}
+
+ +
+
+
+ +
+
+

Use it

+

+ Add the token to your MCP client config as a Bearer credential. Example for Claude Desktop: +

+
{
+  "mcpServers": {
+    "mnemosyne": {
+      "url": "http://localhost:8001/mcp/",
+      "headers": {
+        "Authorization": "Bearer <paste token here>"
+      }
+    }
+  }
+}
+
+
+ +
+ I’ve saved it — go to token list +
+
+{% endblock %} diff --git a/mnemosyne/mcp_server/templates/mcp_server/tokens/detail.html b/mnemosyne/mcp_server/templates/mcp_server/tokens/detail.html new file mode 100644 index 0000000..0ec8cbb --- /dev/null +++ b/mnemosyne/mcp_server/templates/mcp_server/tokens/detail.html @@ -0,0 +1,81 @@ +{% extends "themis/base.html" %} +{% load humanize %} + +{% block title %}{{ token.name }} — {{ themis_app_name }}{% endblock %} + +{% block content %} +
+
+

{{ token.name }}

+
+ Edit + {% if token.is_active %} +
+ {% csrf_token %} + +
+ {% endif %} +
+ {% csrf_token %} + +
+
+
+ +
+
+
+
Token ID
+
{{ token.get_masked_token }}
+ +
Status
+
+ {% if token.is_valid %} + Active + {% elif not token.is_active %} + Revoked + {% else %} + Expired + {% endif %} +
+ + {% if token.expires_at %} +
Expires
+
{{ token.expires_at }}
+ {% endif %} + +
Last Used
+
+ {% if token.last_used_at %} + {{ token.last_used_at }} ({{ token.last_used_at|naturaltime }}) + {% else %} + Never + {% endif %} +
+ +
Created
+
{{ token.created_at }}
+
+
+
+ +
+
+

Allowed Tools

+ {% if token.allowed_tools %} +
+ {% for tool in token.allowed_tools %} + {{ tool }} + {% endfor %} +
+ {% else %} +

All tools permitted.

+ {% endif %} +
+
+ + ← Back to Tokens +
+{% endblock %} diff --git a/mnemosyne/mcp_server/templates/mcp_server/tokens/edit.html b/mnemosyne/mcp_server/templates/mcp_server/tokens/edit.html new file mode 100644 index 0000000..3f566e4 --- /dev/null +++ b/mnemosyne/mcp_server/templates/mcp_server/tokens/edit.html @@ -0,0 +1,60 @@ +{% extends "themis/base.html" %} + +{% block title %}Edit {{ token.name }} — {{ themis_app_name }}{% endblock %} + +{% block content %} +
+

Edit Token: {{ token.name }}

+ +
+ You can edit metadata below. The token value itself cannot be changed — generate a new token if needed. +
+ +
+ {% csrf_token %} + +
+
+
+ + {{ form.name }} +
+
+ +
+
+ + {{ form.expires_at }} +
+
+
+ +
+
+

Allowed Tools

+

{{ form.allowed_tools.help_text }}

+
+ {% for choice in form.allowed_tools %} + + {% endfor %} +
+
+
+ +
+ Cancel + +
+
+
+{% endblock %} diff --git a/mnemosyne/mcp_server/templates/mcp_server/tokens/list.html b/mnemosyne/mcp_server/templates/mcp_server/tokens/list.html new file mode 100644 index 0000000..6986aa8 --- /dev/null +++ b/mnemosyne/mcp_server/templates/mcp_server/tokens/list.html @@ -0,0 +1,61 @@ +{% extends "themis/base.html" %} +{% load humanize %} + +{% block title %}MCP Tokens — {{ themis_app_name }}{% endblock %} + +{% block content %} +
+
+

MCP Tokens

+ Generate Token +
+ +

+ Bearer tokens used by MCP clients (Claude Desktop, Cursor, etc.) to call Mnemosyne. Stored hashed at rest — the plaintext is shown only once at creation. +

+ + {% if tokens %} +
+ {% for token in tokens %} +
+
+
+
+ + {{ token.name }} + +
+ {{ token.get_masked_token }} + {% if not token.is_valid %} + + {% if not token.is_active %}Revoked{% else %}Expired{% endif %} + + {% else %} + Active + {% endif %} + + Last used: + {% if token.last_used_at %}{{ token.last_used_at|naturaltime }}{% else %}never{% endif %} + +
+
+
+ Edit +
+
+
+
+ {% endfor %} +
+ {% else %} +
+
+

No MCP tokens yet.

+ Generate Your First Token +
+
+ {% endif %} +
+{% endblock %} diff --git a/mnemosyne/mcp_server/tests/test_auth.py b/mnemosyne/mcp_server/tests/test_auth.py index 0fc796d..4e9e6ca 100644 --- a/mnemosyne/mcp_server/tests/test_auth.py +++ b/mnemosyne/mcp_server/tests/test_auth.py @@ -17,16 +17,18 @@ class ResolveMCPUserTest(TestCase): self.user = User.objects.create_user( username="bob", email="bob@example.com", password="pw" ) - self.token = MCPToken.objects.create(user=self.user, name="t") + self.token, self.plaintext = MCPToken.objects.create_token( + user=self.user, name="t" + ) def test_resolves_valid_token(self): - user, token = resolve_mcp_user(self.token.token) + user, token = resolve_mcp_user(self.plaintext) self.assertEqual(user.pk, self.user.pk) self.assertEqual(token.pk, self.token.pk) def test_records_usage(self): self.assertIsNone(self.token.last_used_at) - resolve_mcp_user(self.token.token) + resolve_mcp_user(self.plaintext) self.token.refresh_from_db() self.assertIsNotNone(self.token.last_used_at) @@ -38,16 +40,31 @@ class ResolveMCPUserTest(TestCase): self.token.is_active = False self.token.save() with self.assertRaises(MCPAuthError): - resolve_mcp_user(self.token.token) + resolve_mcp_user(self.plaintext) def test_expired_token_raises(self): self.token.expires_at = timezone.now() - timedelta(hours=1) self.token.save() with self.assertRaises(MCPAuthError): - resolve_mcp_user(self.token.token) + resolve_mcp_user(self.plaintext) def test_disabled_user_raises(self): self.user.is_active = False self.user.save() with self.assertRaises(MCPAuthError): - resolve_mcp_user(self.token.token) + resolve_mcp_user(self.plaintext) + + def test_plaintext_not_in_db(self): + # Defense in depth: scan every column for the plaintext value. + from django.db import connection + + plaintext = self.plaintext + with connection.cursor() as cur: + cur.execute("SELECT * FROM mcp_server_mcptoken") + rows = cur.fetchall() + for row in rows: + for value in row: + self.assertNotEqual( + value, plaintext, + f"Plaintext token leaked into the database: {value!r}", + ) diff --git a/mnemosyne/mcp_server/tests/test_forms.py b/mnemosyne/mcp_server/tests/test_forms.py new file mode 100644 index 0000000..531afe9 --- /dev/null +++ b/mnemosyne/mcp_server/tests/test_forms.py @@ -0,0 +1,58 @@ +"""Form tests for the MCP token dashboard.""" + +from django.contrib.auth import get_user_model +from django.test import TestCase + +from mcp_server.forms import MCPTokenCreateForm, MCPTokenEditForm +from mcp_server.models import MCPToken + +User = get_user_model() + + +class CreateFormTest(TestCase): + def test_required_fields(self): + form = MCPTokenCreateForm(data={}) + self.assertFalse(form.is_valid()) + self.assertIn("name", form.errors) + + def test_name_only_is_valid(self): + form = MCPTokenCreateForm(data={"name": "Test"}) + self.assertTrue(form.is_valid(), form.errors) + + def test_tool_choices_match_registered_tools(self): + form = MCPTokenCreateForm() + choices = {value for value, _ in form.fields["allowed_tools"].choices} + # These five must always be present per the FastMCP server. + for expected in {"search", "get_chunk", "list_libraries", "list_collections", "list_items"}: + self.assertIn(expected, choices) + + +class EditFormTest(TestCase): + def setUp(self): + self.user = User.objects.create_user(username="alice", password="pw") + self.token, _ = MCPToken.objects.create_token( + user=self.user, name="t", allowed_tools=["search"] + ) + + def test_initial_allowed_tools_populated(self): + form = MCPTokenEditForm(instance=self.token) + self.assertEqual(form.fields["allowed_tools"].initial, ["search"]) + + def test_save_updates_metadata(self): + form = MCPTokenEditForm( + data={ + "name": "Renamed", + "is_active": False, + "expires_at": "", + "allowed_tools": ["search", "get_chunk"], + }, + instance=self.token, + ) + self.assertTrue(form.is_valid(), form.errors) + instance = form.save(commit=False) + instance.allowed_tools = form.cleaned_data["allowed_tools"] + instance.save() + self.token.refresh_from_db() + self.assertEqual(self.token.name, "Renamed") + self.assertFalse(self.token.is_active) + self.assertEqual(self.token.allowed_tools, ["search", "get_chunk"]) diff --git a/mnemosyne/mcp_server/tests/test_token.py b/mnemosyne/mcp_server/tests/test_token.py index 99c296a..7e85686 100644 --- a/mnemosyne/mcp_server/tests/test_token.py +++ b/mnemosyne/mcp_server/tests/test_token.py @@ -6,7 +6,7 @@ from django.contrib.auth import get_user_model from django.test import TestCase from django.utils import timezone -from mcp_server.models import MCPToken +from mcp_server.models import MCPToken, hash_token User = get_user_model() @@ -17,21 +17,33 @@ class MCPTokenModelTest(TestCase): username="alice", email="alice@example.com", password="pw" ) - def test_token_auto_generated(self): - token = MCPToken.objects.create(user=self.user, name="t") - self.assertTrue(token.token) - self.assertGreater(len(token.token), 20) + def test_create_token_returns_plaintext_and_stores_hash(self): + token, plaintext = MCPToken.objects.create_token(user=self.user, name="t") + self.assertTrue(plaintext) + self.assertGreater(len(plaintext), 20) + # Database stores hash, not plaintext + self.assertEqual(len(token.token_hash), 64) + self.assertNotEqual(token.token_hash, plaintext) + self.assertEqual(token.token_hash, hash_token(plaintext)) + + def test_token_hash_never_equals_plaintext(self): + # Regression guard: if anyone ever wires plaintext back into token_hash, + # this fails. + token, plaintext = MCPToken.objects.create_token(user=self.user, name="t") + self.assertNotIn(plaintext, token.token_hash) def test_active_token_is_valid(self): - token = MCPToken.objects.create(user=self.user, name="t") + token, _ = MCPToken.objects.create_token(user=self.user, name="t") self.assertTrue(token.is_valid) def test_inactive_token_not_valid(self): - token = MCPToken.objects.create(user=self.user, name="t", is_active=False) + token, _ = MCPToken.objects.create_token(user=self.user, name="t") + token.is_active = False + token.save() self.assertFalse(token.is_valid) def test_expired_token_not_valid(self): - token = MCPToken.objects.create( + token, _ = MCPToken.objects.create_token( user=self.user, name="t", expires_at=timezone.now() - timedelta(hours=1), @@ -39,25 +51,27 @@ class MCPTokenModelTest(TestCase): self.assertFalse(token.is_valid) def test_unrestricted_permits_all(self): - token = MCPToken.objects.create(user=self.user, name="t") + token, _ = MCPToken.objects.create_token(user=self.user, name="t") self.assertTrue(token.can_use_tool("anything")) def test_tool_whitelist(self): - token = MCPToken.objects.create( + token, _ = MCPToken.objects.create_token( user=self.user, name="t", allowed_tools=["search"] ) self.assertTrue(token.can_use_tool("search")) self.assertFalse(token.can_use_tool("get_chunk")) def test_record_usage(self): - token = MCPToken.objects.create(user=self.user, name="t") + token, _ = MCPToken.objects.create_token(user=self.user, name="t") self.assertIsNone(token.last_used_at) token.record_usage() token.refresh_from_db() self.assertIsNotNone(token.last_used_at) - def test_masked_token(self): - token = MCPToken.objects.create(user=self.user, name="t") + def test_masked_token_is_hash_prefix(self): + token, plaintext = MCPToken.objects.create_token(user=self.user, name="t") masked = token.get_masked_token() - self.assertTrue(masked.endswith(token.token[-8:])) - self.assertIn("*", masked) + self.assertTrue(masked.startswith("mcp_…")) + self.assertIn(token.token_hash[:8], masked) + # Plaintext must never leak through the masked display + self.assertNotIn(plaintext, masked) diff --git a/mnemosyne/mcp_server/tests/test_views.py b/mnemosyne/mcp_server/tests/test_views.py new file mode 100644 index 0000000..f214441 --- /dev/null +++ b/mnemosyne/mcp_server/tests/test_views.py @@ -0,0 +1,177 @@ +"""View tests for the MCP token self-service dashboard.""" + +from django.contrib.auth import get_user_model +from django.test import TestCase +from django.urls import reverse + +from mcp_server.models import MCPToken + +User = get_user_model() + + +class TokenListViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user( + username="alice", email="alice@example.com", password="pw" + ) + self.url = reverse("mcp_server:mcp-token-list") + + def test_login_required(self): + resp = self.client.get(self.url) + self.assertEqual(resp.status_code, 302) + self.assertIn("/login/", resp.url) + + def test_list_shows_only_own_tokens(self): + other = User.objects.create_user(username="bob", password="pw") + MCPToken.objects.create_token(user=self.user, name="mine") + MCPToken.objects.create_token(user=other, name="theirs") + self.client.force_login(self.user) + resp = self.client.get(self.url) + self.assertContains(resp, "mine") + self.assertNotContains(resp, "theirs") + + def test_empty_state(self): + self.client.force_login(self.user) + resp = self.client.get(self.url) + self.assertContains(resp, "No MCP tokens yet.") + + +class TokenCreateViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user(username="alice", password="pw") + self.client.force_login(self.user) + self.url = reverse("mcp_server:mcp-token-create") + + def test_get_renders_form(self): + resp = self.client.get(self.url) + self.assertEqual(resp.status_code, 200) + self.assertContains(resp, "Generate MCP Token") + + def test_post_creates_token_and_shows_plaintext_once(self): + resp = self.client.post(self.url, {"name": "Claude Desktop"}) + self.assertEqual(resp.status_code, 200) + self.assertContains(resp, "Save this token now") + # Pull the created row, verify the response contained a plaintext that + # is NOT what we stored. + token = MCPToken.objects.get(user=self.user, name="Claude Desktop") + self.assertNotContains(resp, token.token_hash) # hash is not what we display + # And the detail page never renders the plaintext. + body = resp.content.decode() + # Find the plaintext from the response: the only long alphanumeric + # block inside the #mcp-plaintext div. + import re + + m = re.search(r'id="mcp-plaintext">([A-Za-z0-9_\-]+)<', body) + self.assertIsNotNone(m, "plaintext block not found in response") + plaintext = m.group(1) + # Sanity: round-tripping the plaintext through hash_token reproduces + # what's stored. + from mcp_server.models import hash_token + + self.assertEqual(hash_token(plaintext), token.token_hash) + # Detail page must NOT contain the plaintext. + detail_resp = self.client.get( + reverse("mcp_server:mcp-token-detail", args=[token.pk]) + ) + self.assertNotContains(detail_resp, plaintext) + + def test_post_invalid_renders_form_again(self): + resp = self.client.post(self.url, {"name": ""}) + self.assertEqual(resp.status_code, 200) + self.assertContains(resp, "Generate MCP Token") + self.assertEqual(MCPToken.objects.count(), 0) + + +class TokenDetailViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user(username="alice", password="pw") + self.client.force_login(self.user) + self.token, _ = MCPToken.objects.create_token(user=self.user, name="t") + + def test_renders_token(self): + resp = self.client.get( + reverse("mcp_server:mcp-token-detail", args=[self.token.pk]) + ) + self.assertContains(resp, self.token.name) + self.assertContains(resp, self.token.get_masked_token()) + + def test_cannot_view_other_users_token(self): + other = User.objects.create_user(username="bob", password="pw") + other_token, _ = MCPToken.objects.create_token(user=other, name="theirs") + resp = self.client.get( + reverse("mcp_server:mcp-token-detail", args=[other_token.pk]) + ) + self.assertEqual(resp.status_code, 404) + + +class TokenEditViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user(username="alice", password="pw") + self.client.force_login(self.user) + self.token, _ = MCPToken.objects.create_token(user=self.user, name="t") + + def test_post_updates_metadata(self): + resp = self.client.post( + reverse("mcp_server:mcp-token-edit", args=[self.token.pk]), + { + "name": "Renamed", + "is_active": "on", + "expires_at": "", + "allowed_tools": ["search"], + }, + ) + self.assertEqual(resp.status_code, 302) + self.token.refresh_from_db() + self.assertEqual(self.token.name, "Renamed") + self.assertEqual(self.token.allowed_tools, ["search"]) + + def test_cannot_edit_other_users_token(self): + other = User.objects.create_user(username="bob", password="pw") + other_token, _ = MCPToken.objects.create_token(user=other, name="theirs") + resp = self.client.post( + reverse("mcp_server:mcp-token-edit", args=[other_token.pk]), + {"name": "hacked"}, + ) + self.assertEqual(resp.status_code, 404) + + +class TokenRevokeViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user(username="alice", password="pw") + self.client.force_login(self.user) + self.token, _ = MCPToken.objects.create_token(user=self.user, name="t") + + def test_revoke_sets_inactive_keeps_row(self): + url = reverse("mcp_server:mcp-token-revoke", args=[self.token.pk]) + resp = self.client.post(url) + self.assertEqual(resp.status_code, 302) + self.token.refresh_from_db() + self.assertFalse(self.token.is_active) + # Row still exists for audit trail. + self.assertTrue(MCPToken.objects.filter(pk=self.token.pk).exists()) + + def test_get_not_allowed(self): + url = reverse("mcp_server:mcp-token-revoke", args=[self.token.pk]) + resp = self.client.get(url) + self.assertEqual(resp.status_code, 405) + + +class TokenDeleteViewTest(TestCase): + def setUp(self): + self.user = User.objects.create_user(username="alice", password="pw") + self.client.force_login(self.user) + self.token, _ = MCPToken.objects.create_token(user=self.user, name="t") + + def test_delete_removes_row(self): + url = reverse("mcp_server:mcp-token-delete", args=[self.token.pk]) + resp = self.client.post(url) + self.assertEqual(resp.status_code, 302) + self.assertFalse(MCPToken.objects.filter(pk=self.token.pk).exists()) + + def test_cannot_delete_other_users_token(self): + other = User.objects.create_user(username="bob", password="pw") + other_token, _ = MCPToken.objects.create_token(user=other, name="theirs") + url = reverse("mcp_server:mcp-token-delete", args=[other_token.pk]) + resp = self.client.post(url) + self.assertEqual(resp.status_code, 404) + self.assertTrue(MCPToken.objects.filter(pk=other_token.pk).exists()) diff --git a/mnemosyne/mcp_server/urls.py b/mnemosyne/mcp_server/urls.py new file mode 100644 index 0000000..ddd8d52 --- /dev/null +++ b/mnemosyne/mcp_server/urls.py @@ -0,0 +1,16 @@ +"""URL routes for the MCP token self-service dashboard.""" + +from django.urls import path + +from . import views + +app_name = "mcp_server" + +urlpatterns = [ + path("profile/mcp-tokens/", views.mcp_token_list, name="mcp-token-list"), + path("profile/mcp-tokens/add/", views.mcp_token_create, name="mcp-token-create"), + path("profile/mcp-tokens//", views.mcp_token_detail, name="mcp-token-detail"), + path("profile/mcp-tokens//edit/", views.mcp_token_edit, name="mcp-token-edit"), + path("profile/mcp-tokens//revoke/", views.mcp_token_revoke, name="mcp-token-revoke"), + path("profile/mcp-tokens//delete/", views.mcp_token_delete, name="mcp-token-delete"), +] diff --git a/mnemosyne/mcp_server/views.py b/mnemosyne/mcp_server/views.py new file mode 100644 index 0000000..30c7cf6 --- /dev/null +++ b/mnemosyne/mcp_server/views.py @@ -0,0 +1,95 @@ +"""Self-service dashboard for MCP bearer tokens. + +Mirrors the Themis API-keys flow visually but stores hashed tokens. Plaintext +is shown to the user exactly once (on the create-success page) and never +persisted. +""" + +from __future__ import annotations + +from django.contrib import messages +from django.contrib.auth.decorators import login_required +from django.http import HttpRequest, HttpResponse +from django.shortcuts import get_object_or_404, redirect, render +from django.views.decorators.http import require_GET, require_http_methods, require_POST + +from .forms import MCPTokenCreateForm, MCPTokenEditForm +from .models import MCPToken + + +@login_required +@require_GET +def mcp_token_list(request: HttpRequest) -> HttpResponse: + tokens = MCPToken.objects.filter(user=request.user).order_by("-created_at") + return render(request, "mcp_server/tokens/list.html", {"tokens": tokens}) + + +@login_required +@require_http_methods(["GET", "POST"]) +def mcp_token_create(request: HttpRequest) -> HttpResponse: + if request.method == "POST": + form = MCPTokenCreateForm(request.POST) + if form.is_valid(): + token, plaintext = MCPToken.objects.create_token( + user=request.user, + name=form.cleaned_data["name"], + allowed_tools=form.cleaned_data.get("allowed_tools") or [], + expires_at=form.cleaned_data.get("expires_at") or None, + ) + return render( + request, + "mcp_server/tokens/created.html", + {"token": token, "plaintext": plaintext}, + ) + else: + form = MCPTokenCreateForm() + + return render(request, "mcp_server/tokens/create.html", {"form": form}) + + +@login_required +@require_GET +def mcp_token_detail(request: HttpRequest, pk: int) -> HttpResponse: + token = get_object_or_404(MCPToken, pk=pk, user=request.user) + return render(request, "mcp_server/tokens/detail.html", {"token": token}) + + +@login_required +@require_http_methods(["GET", "POST"]) +def mcp_token_edit(request: HttpRequest, pk: int) -> HttpResponse: + token = get_object_or_404(MCPToken, pk=pk, user=request.user) + + if request.method == "POST": + form = MCPTokenEditForm(request.POST, instance=token) + if form.is_valid(): + instance = form.save(commit=False) + instance.allowed_tools = form.cleaned_data.get("allowed_tools") or [] + instance.save() + messages.success(request, "MCP token updated.") + return redirect("mcp_server:mcp-token-detail", pk=token.pk) + else: + form = MCPTokenEditForm(instance=token) + + return render( + request, "mcp_server/tokens/edit.html", {"form": form, "token": token} + ) + + +@login_required +@require_POST +def mcp_token_revoke(request: HttpRequest, pk: int) -> HttpResponse: + token = get_object_or_404(MCPToken, pk=pk, user=request.user) + token.is_active = False + token.save(update_fields=["is_active", "updated_at"]) + messages.success(request, f"Revoked “{token.name}”. The token can no longer be used.") + return redirect("mcp_server:mcp-token-detail", pk=token.pk) + + +@login_required +@require_POST +def mcp_token_delete(request: HttpRequest, pk: int) -> HttpResponse: + token = get_object_or_404(MCPToken, pk=pk, user=request.user) + name = token.name + token.delete() + messages.success(request, f"Deleted “{name}”.") + return redirect("mcp_server:mcp-token-list") diff --git a/mnemosyne/mnemosyne/urls.py b/mnemosyne/mnemosyne/urls.py index 0633511..a7ad8fd 100644 --- a/mnemosyne/mnemosyne/urls.py +++ b/mnemosyne/mnemosyne/urls.py @@ -25,4 +25,6 @@ urlpatterns = [ path("library/", include("library.urls")), # LLM Manager path("llm/", include("llm_manager.urls")), + # MCP server (token dashboard at /profile/mcp-tokens/) + path("", include("mcp_server.urls")), ] diff --git a/mnemosyne/test_db_manager/django_integration.py b/mnemosyne/test_db_manager/django_integration.py index d39a2f4..a9b72af 100644 --- a/mnemosyne/test_db_manager/django_integration.py +++ b/mnemosyne/test_db_manager/django_integration.py @@ -52,15 +52,20 @@ class PostgreSQLTestRunner(DiscoverRunner): from django.db import connections db_cfg = self.pg_manager.get_django_database_config() - # Preserve Django's defaulted TEST sub-dict (CHARSET/MIRROR/MIGRATE…). - existing_test = connections["default"].settings_dict.get("TEST", {}) - merged_test = {**existing_test, **db_cfg.get("TEST", {})} - db_cfg["TEST"] = merged_test - settings.DATABASES["default"] = db_cfg + # Preserve Django's defaulted top-level keys (ATOMIC_REQUESTS, + # AUTOCOMMIT, OPTIONS, …) and the TEST sub-dict (CHARSET, MIRROR, + # MIGRATE, …) — these are populated lazily by Django and absent from + # the user's raw settings.DATABASES, so a naive overwrite breaks + # request handling that consults them. + existing = connections["default"].settings_dict + existing_test = existing.get("TEST", {}) + merged = {**existing, **db_cfg} + merged["TEST"] = {**existing_test, **db_cfg.get("TEST", {})} + settings.DATABASES["default"] = merged # The default connection was instantiated at Django bootstrap; its - # settings_dict is independent of settings.DATABASES. Sync it + # settings_dict is independent of settings.DATABASES. Sync it # manually so test code talks to the container, not the dev DB. - connections["default"].settings_dict.update(db_cfg) + connections["default"].settings_dict.update(merged) logger.info("PostgreSQL test DB ready on port %s", self.pg_manager.assigned_port) # ── Neo4j ────────────────────────────────────────────────────── diff --git a/mnemosyne/themis/migrations/0004_alter_userapikey_key_type.py b/mnemosyne/themis/migrations/0004_alter_userapikey_key_type.py new file mode 100644 index 0000000..28c5367 --- /dev/null +++ b/mnemosyne/themis/migrations/0004_alter_userapikey_key_type.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.13 on 2026-04-27 11:30 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('themis', '0003_alter_userprofile_current_timezone_and_more'), + ] + + operations = [ + migrations.AlterField( + model_name='userapikey', + name='key_type', + field=models.CharField(choices=[('api', 'API Key'), ('dav', 'DAV Credentials'), ('token', 'Access Token'), ('secret', 'Secret Key'), ('other', 'Other')], default='api', help_text='Type of credential', max_length=30), + ), + ] diff --git a/mnemosyne/themis/models.py b/mnemosyne/themis/models.py index a373f39..e081eda 100644 --- a/mnemosyne/themis/models.py +++ b/mnemosyne/themis/models.py @@ -228,7 +228,6 @@ class UserAPIKey(models.Model): KEY_TYPE_CHOICES = [ ("api", "API Key"), - ("mcp", "MCP Server"), ("dav", "DAV Credentials"), ("token", "Access Token"), ("secret", "Secret Key"), diff --git a/mnemosyne/themis/templates/themis/includes/user_menu.html b/mnemosyne/themis/templates/themis/includes/user_menu.html index 601e589..2d3d028 100644 --- a/mnemosyne/themis/templates/themis/includes/user_menu.html +++ b/mnemosyne/themis/templates/themis/includes/user_menu.html @@ -38,6 +38,16 @@ API Keys +
  • + + + + + MCP Tokens + +
  • diff --git a/mnemosyne/themis/tests/test_forms.py b/mnemosyne/themis/tests/test_forms.py index 6cc3c9d..3302cb6 100644 --- a/mnemosyne/themis/tests/test_forms.py +++ b/mnemosyne/themis/tests/test_forms.py @@ -248,7 +248,7 @@ class APIKeyEditFormTest(TestCase): form = APIKeyEditForm( data={ "service_name": "Updated Service", - "key_type": "mcp", + "key_type": "token", "label": "Updated", "instructions": "", "help_url": "", @@ -260,6 +260,6 @@ class APIKeyEditFormTest(TestCase): form.save() self.key.refresh_from_db() self.assertEqual(self.key.service_name, "Updated Service") - self.assertEqual(self.key.key_type, "mcp") + self.assertEqual(self.key.key_type, "token") self.assertFalse(self.key.is_active) self.assertEqual(self.key.encrypted_value, original_encrypted) diff --git a/mnemosyne/themis/tests/test_models.py b/mnemosyne/themis/tests/test_models.py index aaf3493..cdb7f7c 100644 --- a/mnemosyne/themis/tests/test_models.py +++ b/mnemosyne/themis/tests/test_models.py @@ -209,7 +209,6 @@ class UserAPIKeyModelTest(TestCase): """KEY_TYPE_CHOICES contains expected types.""" type_keys = [t[0] for t in UserAPIKey.KEY_TYPE_CHOICES] self.assertIn("api", type_keys) - self.assertIn("mcp", type_keys) self.assertIn("dav", type_keys) self.assertIn("token", type_keys) self.assertIn("secret", type_keys)