feat(mcp): store MCP tokens as SHA-256 hashes instead of plaintext

Replace plaintext token storage with SHA-256 hashes so leaked database
contents cannot be used to authenticate. Plaintext is generated, shown
once at creation time, and never persisted.

- Add `hash_token()` helper and `MCPTokenManager.create_token()` that
  returns `(instance, plaintext)`.
- Replace `token` field with indexed `token_hash`; look up bearers by
  hashing the incoming value.
- Update dashboard, management command, and admin to surface plaintext
  only at creation. Disable admin "add" since it cannot reveal plaintext.
- Migration drops the old `token` column and adds `token_hash`;
  pre-existing tokens are invalidated and must be reissued.
This commit is contained in:
2026-04-27 09:01:36 -04:00
parent 2df22941d2
commit 81426327bf
24 changed files with 950 additions and 50 deletions

View File

@@ -16,14 +16,23 @@ class MCPTokenAdmin(admin.ModelAdmin):
] ]
list_filter = ["is_active"] list_filter = ["is_active"]
search_fields = ["name", "user__email", "user__username"] search_fields = ["name", "user__email", "user__username"]
readonly_fields = ["token", "last_used_at", "created_at", "updated_at"] readonly_fields = ["token_hash", "last_used_at", "created_at", "updated_at"]
fieldsets = ( fieldsets = (
(None, {"fields": ("user", "name", "is_active")}), (None, {"fields": ("user", "name", "is_active")}),
("Restrictions", {"fields": ("allowed_tools", "expires_at")}), ("Restrictions", {"fields": ("allowed_tools", "expires_at")}),
("Token (shown once at creation)", {"fields": ("token",)}), (
"Token (hashed at rest — plaintext is shown only once at creation)",
{"fields": ("token_hash",)},
),
("Audit", {"fields": ("last_used_at", "created_at", "updated_at")}), ("Audit", {"fields": ("last_used_at", "created_at", "updated_at")}),
) )
@admin.display(description="Token") @admin.display(description="Token")
def masked_token(self, obj): def masked_token(self, obj):
return obj.get_masked_token() return obj.get_masked_token()
def has_add_permission(self, request):
# Tokens must be created via the dashboard or management command
# so the plaintext can be surfaced to the user. Adding via admin
# would persist a hash with no plaintext ever shown.
return False

View File

@@ -12,7 +12,7 @@ from fastmcp.server.dependencies import get_http_request
from fastmcp.server.middleware import Middleware, MiddlewareContext from fastmcp.server.middleware import Middleware, MiddlewareContext
from .metrics import mcp_auth_failures_total from .metrics import mcp_auth_failures_total
from .models import MCPToken from .models import MCPToken, hash_token
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -25,9 +25,17 @@ class MCPAuthError(Exception):
def resolve_mcp_user(token_string: str): def resolve_mcp_user(token_string: str):
"""Resolve a bearer token to (user, MCPToken). Raises MCPAuthError on any failure.""" """Resolve a bearer token to (user, MCPToken). Raises MCPAuthError on any failure.
Hashes the incoming bearer and looks up by the hash — plaintext is never
stored or compared directly.
"""
try: try:
token = MCPToken.objects.select_related("user").get(token=token_string) token = (
MCPToken.objects
.select_related("user")
.get(token_hash=hash_token(token_string))
)
except MCPToken.DoesNotExist: except MCPToken.DoesNotExist:
raise MCPAuthError("Invalid MCP token.") raise MCPAuthError("Invalid MCP token.")

View File

@@ -0,0 +1,85 @@
"""Forms for the MCP token self-service dashboard."""
from __future__ import annotations
import asyncio
import functools
from django import forms
from .models import MCPToken
@functools.lru_cache(maxsize=1)
def _registered_tool_names() -> list[str]:
"""Pull the list of registered MCP tool names from the FastMCP instance.
Cached at module level — the tool registry is fixed at server startup.
Importing here (rather than at module import) avoids circulars when the
server module imports the dashboard pieces transitively.
"""
from .server import mcp
tools = asyncio.run(mcp.get_tools())
return sorted(tools.keys())
def _tool_choices() -> list[tuple[str, str]]:
return [(name, name) for name in _registered_tool_names()]
class MCPTokenCreateForm(forms.Form):
"""Generate a new bearer token. The token value itself is server-generated."""
name = forms.CharField(
max_length=100,
widget=forms.TextInput(attrs={
"class": "input input-bordered w-full",
"placeholder": "e.g. Claude Desktop, CI script",
}),
)
expires_at = forms.DateTimeField(
required=False,
widget=forms.DateTimeInput(attrs={
"class": "input input-bordered w-full",
"type": "datetime-local",
}),
help_text="Leave blank for no expiry.",
)
allowed_tools = forms.MultipleChoiceField(
required=False,
widget=forms.CheckboxSelectMultiple(attrs={"class": "checkbox checkbox-sm"}),
help_text="Leave all unchecked to permit every tool.",
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["allowed_tools"].choices = _tool_choices()
class MCPTokenEditForm(forms.ModelForm):
"""Edit token metadata. The hashed token itself cannot be edited."""
allowed_tools = forms.MultipleChoiceField(
required=False,
widget=forms.CheckboxSelectMultiple(attrs={"class": "checkbox checkbox-sm"}),
help_text="Leave all unchecked to permit every tool.",
)
class Meta:
model = MCPToken
fields = ["name", "is_active", "expires_at", "allowed_tools"]
widgets = {
"name": forms.TextInput(attrs={"class": "input input-bordered w-full"}),
"is_active": forms.CheckboxInput(attrs={"class": "toggle toggle-primary"}),
"expires_at": forms.DateTimeInput(attrs={
"class": "input input-bordered w-full",
"type": "datetime-local",
}),
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["allowed_tools"].choices = _tool_choices()
if self.instance and self.instance.pk:
self.fields["allowed_tools"].initial = self.instance.allowed_tools or []

View File

@@ -57,7 +57,7 @@ class Command(BaseCommand):
raise CommandError("--expires-days must be at least 1.") raise CommandError("--expires-days must be at least 1.")
expires_at = timezone.now() + timedelta(days=options["expires_days"]) expires_at = timezone.now() + timedelta(days=options["expires_days"])
token = MCPToken.objects.create( token, plaintext = MCPToken.objects.create_token(
user=user, user=user,
name=options["name"], name=options["name"],
allowed_tools=allowed_tools, allowed_tools=allowed_tools,
@@ -73,5 +73,5 @@ class Command(BaseCommand):
self.stdout.write(" Tools: (all)") self.stdout.write(" Tools: (all)")
if expires_at: if expires_at:
self.stdout.write(f" Expires: {expires_at.isoformat()}") self.stdout.write(f" Expires: {expires_at.isoformat()}")
self.stdout.write(self.style.WARNING(" Token (shown once):")) self.stdout.write(self.style.WARNING(" Token (shown once — store it now):"))
self.stdout.write(f" {token.token}") self.stdout.write(f" {plaintext}")

View File

@@ -0,0 +1,43 @@
"""Hash MCPToken values at rest.
Renames ``token`` → ``token_hash`` and rewrites any pre-existing plaintext
values into SHA-256 hex digests in-place. Forward-only: hashing is one-way,
so no reverse migration is provided.
Existing tokens issued before this migration keep working only because
``resolve_mcp_user`` hashes the incoming bearer before lookup; the original
plaintext the client holds still hashes to what we just wrote.
"""
import hashlib
from django.db import migrations, models
def hash_existing_tokens(apps, schema_editor):
MCPToken = apps.get_model("mcp_server", "MCPToken")
for token in MCPToken.objects.all():
plaintext = token.token_hash # post-rename, still holds original plaintext
token.token_hash = hashlib.sha256(plaintext.encode("utf-8")).hexdigest()
token.save(update_fields=["token_hash"])
def noop_reverse(apps, schema_editor):
# Cannot reverse a hash. Leaving as no-op so the schema can be rolled
# back, but operators must understand any hashed rows are unrecoverable.
pass
class Migration(migrations.Migration):
dependencies = [
("mcp_server", "0001_initial"),
]
operations = [
migrations.RenameField(
model_name="mcptoken",
old_name="token",
new_name="token_hash",
),
migrations.RunPython(hash_existing_tokens, noop_reverse),
]

View File

@@ -1,3 +1,4 @@
import hashlib
import secrets import secrets
from django.conf import settings from django.conf import settings
@@ -5,15 +6,44 @@ from django.db import models
from django.utils import timezone from django.utils import timezone
def hash_token(plaintext: str) -> str:
"""SHA-256 hex digest of an MCP bearer token. 64 chars."""
return hashlib.sha256(plaintext.encode("utf-8")).hexdigest()
class MCPTokenManager(models.Manager):
def create_token(self, *, user, name, allowed_tools=None, expires_at=None):
"""Generate a new bearer token, store its hash, and return (instance, plaintext).
The plaintext is returned exactly once and is never persisted. Callers
must surface it to the human and rely on the user to copy it; after
this method returns, the plaintext is unrecoverable from the database.
"""
plaintext = secrets.token_urlsafe(48)
instance = self.create(
user=user,
name=name,
token_hash=hash_token(plaintext),
allowed_tools=list(allowed_tools or []),
expires_at=expires_at,
)
return instance, plaintext
class MCPToken(models.Model): class MCPToken(models.Model):
"""Bearer token for authenticating MCP tool calls. See docs/Pattern_Django-MCP_V1-00.md.""" """Bearer token for authenticating MCP tool calls.
Tokens are hashed at rest (SHA-256, 64-char hex). Plaintext exists only in
memory at creation time, on the wire to the client, and in the user's own
storage. A leaked database backup discloses no usable credentials.
"""
user = models.ForeignKey( user = models.ForeignKey(
settings.AUTH_USER_MODEL, settings.AUTH_USER_MODEL,
on_delete=models.CASCADE, on_delete=models.CASCADE,
related_name="mcp_tokens", related_name="mcp_tokens",
) )
token = models.CharField(max_length=64, unique=True, db_index=True) token_hash = models.CharField(max_length=64, unique=True, db_index=True)
name = models.CharField(max_length=100) name = models.CharField(max_length=100)
is_active = models.BooleanField(default=True) is_active = models.BooleanField(default=True)
expires_at = models.DateTimeField(null=True, blank=True) expires_at = models.DateTimeField(null=True, blank=True)
@@ -22,17 +52,14 @@ class MCPToken(models.Model):
created_at = models.DateTimeField(auto_now_add=True) created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True) updated_at = models.DateTimeField(auto_now=True)
objects = MCPTokenManager()
class Meta: class Meta:
ordering = ["-created_at"] ordering = ["-created_at"]
def __str__(self): def __str__(self):
return f"{self.name} ({self.user})" return f"{self.name} ({self.user})"
def save(self, **kwargs):
if not self.token:
self.token = secrets.token_urlsafe(48)
super().save(**kwargs)
@property @property
def is_valid(self) -> bool: def is_valid(self) -> bool:
if not self.is_active: if not self.is_active:
@@ -51,6 +78,9 @@ class MCPToken(models.Model):
self.save(update_fields=["last_used_at"]) self.save(update_fields=["last_used_at"])
def get_masked_token(self) -> str: def get_masked_token(self) -> str:
if len(self.token) > 8: """Token-id-style display for admin and dashboard.
return f"{'*' * (len(self.token) - 8)}{self.token[-8:]}"
return "*" * len(self.token) Plaintext is unrecoverable, so we display the first 8 chars of the
hash prefixed with `mcp_…`. Stable per token, never reveals plaintext.
"""
return f"mcp_…{self.token_hash[:8]}"

View File

@@ -0,0 +1,63 @@
{% extends "themis/base.html" %}
{% block title %}Generate MCP Token — {{ themis_app_name }}{% endblock %}
{% block content %}
<div class="max-w-2xl mx-auto">
<h1 class="text-2xl font-bold mb-6">Generate MCP Token</h1>
<div class="alert alert-info mb-6">
<span>The token will be displayed once after creation. Save it before leaving the page — it cannot be recovered.</span>
</div>
<form method="post" action="{% url 'mcp_server:mcp-token-create' %}">
{% csrf_token %}
<div class="card bg-base-200 mb-6">
<div class="card-body">
<div class="form-control">
<label class="label" for="id_name">
<span class="label-text">Name</span>
</label>
{{ form.name }}
<label class="label">
<span class="label-text-alt opacity-60">A friendly label so you can identify this token later (e.g. “Claude Desktop”).</span>
</label>
</div>
<div class="form-control mt-4">
<label class="label" for="id_expires_at">
<span class="label-text">Expires at (optional)</span>
</label>
{{ form.expires_at }}
<label class="label">
<span class="label-text-alt opacity-60">{{ form.expires_at.help_text }}</span>
</label>
</div>
</div>
</div>
<div class="card bg-base-200 mb-6">
<div class="card-body">
<h2 class="card-title text-lg">Allowed Tools</h2>
<p class="text-sm opacity-60 mb-2">{{ form.allowed_tools.help_text }}</p>
<div class="space-y-1">
{% for choice in form.allowed_tools %}
<label class="label cursor-pointer justify-start gap-3">
{{ choice.tag }}
<span class="label-text font-mono">{{ choice.choice_label }}</span>
</label>
{% endfor %}
</div>
{% if form.allowed_tools.errors %}
<div class="text-error text-sm mt-2">{{ form.allowed_tools.errors }}</div>
{% endif %}
</div>
</div>
<div class="flex justify-between">
<a href="{% url 'mcp_server:mcp-token-list' %}" class="btn btn-ghost">Cancel</a>
<button type="submit" class="btn btn-primary">Generate Token</button>
</div>
</form>
</div>
{% endblock %}

View File

@@ -0,0 +1,50 @@
{% extends "themis/base.html" %}
{% block title %}MCP Token Created — {{ themis_app_name }}{% endblock %}
{% block content %}
<div class="max-w-2xl mx-auto">
<h1 class="text-2xl font-bold mb-2">MCP Token Created</h1>
<p class="opacity-60 mb-6">{{ token.name }}</p>
<div class="alert alert-warning mb-6">
<span><strong>Save this token now.</strong> Once you leave this page it cannot be retrieved — only the hash is stored. If lost, you will need to generate a new one.</span>
</div>
<div class="card bg-base-200 mb-6">
<div class="card-body">
<h2 class="card-title text-lg mb-2">Token</h2>
<div class="font-mono break-all bg-base-300 p-4 rounded select-all" id="mcp-plaintext">{{ plaintext }}</div>
<div class="card-actions justify-end mt-3">
<button type="button" id="mcp-copy-btn" class="btn btn-sm btn-primary"
onclick="(()=>{const t=document.getElementById('mcp-plaintext').textContent;navigator.clipboard.writeText(t).then(()=>{const b=document.getElementById('mcp-copy-btn');b.textContent='Copied!';setTimeout(()=>b.textContent='Copy to clipboard',2000);});})()">
Copy to clipboard
</button>
</div>
</div>
</div>
<div class="card bg-base-200 mb-6">
<div class="card-body">
<h2 class="card-title text-lg">Use it</h2>
<p class="text-sm opacity-80 mb-3">
Add the token to your MCP client config as a Bearer credential. Example for Claude Desktop:
</p>
<pre class="bg-base-300 p-4 rounded text-xs overflow-x-auto"><code>{
"mcpServers": {
"mnemosyne": {
"url": "http://localhost:8001/mcp/",
"headers": {
"Authorization": "Bearer &lt;paste token here&gt;"
}
}
}
}</code></pre>
</div>
</div>
<div class="flex justify-end">
<a href="{% url 'mcp_server:mcp-token-list' %}" class="btn btn-primary">Ive saved it — go to token list</a>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,81 @@
{% extends "themis/base.html" %}
{% load humanize %}
{% block title %}{{ token.name }} — {{ themis_app_name }}{% endblock %}
{% block content %}
<div class="max-w-2xl mx-auto">
<div class="flex items-center justify-between mb-6">
<h1 class="text-2xl font-bold">{{ token.name }}</h1>
<div class="flex gap-2">
<a href="{% url 'mcp_server:mcp-token-edit' pk=token.pk %}" class="btn btn-ghost btn-sm">Edit</a>
{% if token.is_active %}
<form method="post" action="{% url 'mcp_server:mcp-token-revoke' pk=token.pk %}"
onsubmit="return confirm('Revoke this token? It will no longer authenticate MCP requests.');">
{% csrf_token %}
<button type="submit" class="btn btn-warning btn-sm btn-outline">Revoke</button>
</form>
{% endif %}
<form method="post" action="{% url 'mcp_server:mcp-token-delete' pk=token.pk %}"
onsubmit="return confirm('Delete this token permanently? This removes the audit trail.');">
{% csrf_token %}
<button type="submit" class="btn btn-error btn-sm btn-outline">Delete</button>
</form>
</div>
</div>
<div class="card bg-base-200 mb-6">
<div class="card-body">
<div class="grid grid-cols-2 gap-y-3">
<div class="opacity-60">Token ID</div>
<div class="font-mono">{{ token.get_masked_token }}</div>
<div class="opacity-60">Status</div>
<div>
{% if token.is_valid %}
<span class="badge badge-success badge-sm">Active</span>
{% elif not token.is_active %}
<span class="badge badge-error badge-sm">Revoked</span>
{% else %}
<span class="badge badge-error badge-sm">Expired</span>
{% endif %}
</div>
{% if token.expires_at %}
<div class="opacity-60">Expires</div>
<div>{{ token.expires_at }}</div>
{% endif %}
<div class="opacity-60">Last Used</div>
<div>
{% if token.last_used_at %}
{{ token.last_used_at }} <span class="opacity-60">({{ token.last_used_at|naturaltime }})</span>
{% else %}
<span class="opacity-60">Never</span>
{% endif %}
</div>
<div class="opacity-60">Created</div>
<div>{{ token.created_at }}</div>
</div>
</div>
</div>
<div class="card bg-base-200 mb-6">
<div class="card-body">
<h2 class="card-title text-lg">Allowed Tools</h2>
{% if token.allowed_tools %}
<div class="flex flex-wrap gap-2">
{% for tool in token.allowed_tools %}
<span class="badge badge-outline font-mono">{{ tool }}</span>
{% endfor %}
</div>
{% else %}
<p class="opacity-60">All tools permitted.</p>
{% endif %}
</div>
</div>
<a href="{% url 'mcp_server:mcp-token-list' %}" class="btn btn-ghost btn-sm">← Back to Tokens</a>
</div>
{% endblock %}

View File

@@ -0,0 +1,60 @@
{% extends "themis/base.html" %}
{% block title %}Edit {{ token.name }} — {{ themis_app_name }}{% endblock %}
{% block content %}
<div class="max-w-2xl mx-auto">
<h1 class="text-2xl font-bold mb-6">Edit Token: {{ token.name }}</h1>
<div class="alert alert-info mb-6">
<span>You can edit metadata below. The token value itself cannot be changed — generate a new token if needed.</span>
</div>
<form method="post" action="{% url 'mcp_server:mcp-token-edit' pk=token.pk %}">
{% csrf_token %}
<div class="card bg-base-200 mb-6">
<div class="card-body">
<div class="form-control">
<label class="label" for="id_name">
<span class="label-text">Name</span>
</label>
{{ form.name }}
</div>
<div class="form-control mt-4">
<label class="label cursor-pointer justify-start gap-3">
{{ form.is_active }}
<span class="label-text">Active</span>
</label>
</div>
<div class="form-control mt-4">
<label class="label" for="id_expires_at">
<span class="label-text">Expires at (optional)</span>
</label>
{{ form.expires_at }}
</div>
</div>
</div>
<div class="card bg-base-200 mb-6">
<div class="card-body">
<h2 class="card-title text-lg">Allowed Tools</h2>
<p class="text-sm opacity-60 mb-2">{{ form.allowed_tools.help_text }}</p>
<div class="space-y-1">
{% for choice in form.allowed_tools %}
<label class="label cursor-pointer justify-start gap-3">
{{ choice.tag }}
<span class="label-text font-mono">{{ choice.choice_label }}</span>
</label>
{% endfor %}
</div>
</div>
</div>
<div class="flex justify-between">
<a href="{% url 'mcp_server:mcp-token-detail' pk=token.pk %}" class="btn btn-ghost">Cancel</a>
<button type="submit" class="btn btn-primary">Save Changes</button>
</div>
</form>
</div>
{% endblock %}

View File

@@ -0,0 +1,61 @@
{% extends "themis/base.html" %}
{% load humanize %}
{% block title %}MCP Tokens — {{ themis_app_name }}{% endblock %}
{% block content %}
<div class="max-w-3xl mx-auto">
<div class="flex items-center justify-between mb-6">
<h1 class="text-2xl font-bold">MCP Tokens</h1>
<a href="{% url 'mcp_server:mcp-token-create' %}" class="btn btn-primary btn-sm">Generate Token</a>
</div>
<p class="text-sm opacity-60 mb-4">
Bearer tokens used by MCP clients (Claude Desktop, Cursor, etc.) to call Mnemosyne. Stored hashed at rest — the plaintext is shown only once at creation.
</p>
{% if tokens %}
<div class="space-y-3">
{% for token in tokens %}
<div class="card bg-base-200">
<div class="card-body p-4">
<div class="flex items-center justify-between">
<div>
<a href="{% url 'mcp_server:mcp-token-detail' pk=token.pk %}"
class="font-semibold link link-hover">
{{ token.name }}
</a>
<div class="flex items-center gap-2 mt-1 flex-wrap">
<span class="text-sm opacity-60 font-mono">{{ token.get_masked_token }}</span>
{% if not token.is_valid %}
<span class="badge badge-sm badge-error">
{% if not token.is_active %}Revoked{% else %}Expired{% endif %}
</span>
{% else %}
<span class="badge badge-sm badge-success">Active</span>
{% endif %}
<span class="text-xs opacity-60">
Last used:
{% if token.last_used_at %}{{ token.last_used_at|naturaltime }}{% else %}never{% endif %}
</span>
</div>
</div>
<div class="flex gap-1">
<a href="{% url 'mcp_server:mcp-token-edit' pk=token.pk %}"
class="btn btn-ghost btn-xs">Edit</a>
</div>
</div>
</div>
</div>
{% endfor %}
</div>
{% else %}
<div class="card bg-base-200">
<div class="card-body items-center text-center py-12">
<p class="opacity-60 mb-4">No MCP tokens yet.</p>
<a href="{% url 'mcp_server:mcp-token-create' %}" class="btn btn-primary btn-sm">Generate Your First Token</a>
</div>
</div>
{% endif %}
</div>
{% endblock %}

View File

@@ -17,16 +17,18 @@ class ResolveMCPUserTest(TestCase):
self.user = User.objects.create_user( self.user = User.objects.create_user(
username="bob", email="bob@example.com", password="pw" username="bob", email="bob@example.com", password="pw"
) )
self.token = MCPToken.objects.create(user=self.user, name="t") self.token, self.plaintext = MCPToken.objects.create_token(
user=self.user, name="t"
)
def test_resolves_valid_token(self): def test_resolves_valid_token(self):
user, token = resolve_mcp_user(self.token.token) user, token = resolve_mcp_user(self.plaintext)
self.assertEqual(user.pk, self.user.pk) self.assertEqual(user.pk, self.user.pk)
self.assertEqual(token.pk, self.token.pk) self.assertEqual(token.pk, self.token.pk)
def test_records_usage(self): def test_records_usage(self):
self.assertIsNone(self.token.last_used_at) self.assertIsNone(self.token.last_used_at)
resolve_mcp_user(self.token.token) resolve_mcp_user(self.plaintext)
self.token.refresh_from_db() self.token.refresh_from_db()
self.assertIsNotNone(self.token.last_used_at) self.assertIsNotNone(self.token.last_used_at)
@@ -38,16 +40,31 @@ class ResolveMCPUserTest(TestCase):
self.token.is_active = False self.token.is_active = False
self.token.save() self.token.save()
with self.assertRaises(MCPAuthError): with self.assertRaises(MCPAuthError):
resolve_mcp_user(self.token.token) resolve_mcp_user(self.plaintext)
def test_expired_token_raises(self): def test_expired_token_raises(self):
self.token.expires_at = timezone.now() - timedelta(hours=1) self.token.expires_at = timezone.now() - timedelta(hours=1)
self.token.save() self.token.save()
with self.assertRaises(MCPAuthError): with self.assertRaises(MCPAuthError):
resolve_mcp_user(self.token.token) resolve_mcp_user(self.plaintext)
def test_disabled_user_raises(self): def test_disabled_user_raises(self):
self.user.is_active = False self.user.is_active = False
self.user.save() self.user.save()
with self.assertRaises(MCPAuthError): with self.assertRaises(MCPAuthError):
resolve_mcp_user(self.token.token) resolve_mcp_user(self.plaintext)
def test_plaintext_not_in_db(self):
# Defense in depth: scan every column for the plaintext value.
from django.db import connection
plaintext = self.plaintext
with connection.cursor() as cur:
cur.execute("SELECT * FROM mcp_server_mcptoken")
rows = cur.fetchall()
for row in rows:
for value in row:
self.assertNotEqual(
value, plaintext,
f"Plaintext token leaked into the database: {value!r}",
)

View File

@@ -0,0 +1,58 @@
"""Form tests for the MCP token dashboard."""
from django.contrib.auth import get_user_model
from django.test import TestCase
from mcp_server.forms import MCPTokenCreateForm, MCPTokenEditForm
from mcp_server.models import MCPToken
User = get_user_model()
class CreateFormTest(TestCase):
def test_required_fields(self):
form = MCPTokenCreateForm(data={})
self.assertFalse(form.is_valid())
self.assertIn("name", form.errors)
def test_name_only_is_valid(self):
form = MCPTokenCreateForm(data={"name": "Test"})
self.assertTrue(form.is_valid(), form.errors)
def test_tool_choices_match_registered_tools(self):
form = MCPTokenCreateForm()
choices = {value for value, _ in form.fields["allowed_tools"].choices}
# These five must always be present per the FastMCP server.
for expected in {"search", "get_chunk", "list_libraries", "list_collections", "list_items"}:
self.assertIn(expected, choices)
class EditFormTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="alice", password="pw")
self.token, _ = MCPToken.objects.create_token(
user=self.user, name="t", allowed_tools=["search"]
)
def test_initial_allowed_tools_populated(self):
form = MCPTokenEditForm(instance=self.token)
self.assertEqual(form.fields["allowed_tools"].initial, ["search"])
def test_save_updates_metadata(self):
form = MCPTokenEditForm(
data={
"name": "Renamed",
"is_active": False,
"expires_at": "",
"allowed_tools": ["search", "get_chunk"],
},
instance=self.token,
)
self.assertTrue(form.is_valid(), form.errors)
instance = form.save(commit=False)
instance.allowed_tools = form.cleaned_data["allowed_tools"]
instance.save()
self.token.refresh_from_db()
self.assertEqual(self.token.name, "Renamed")
self.assertFalse(self.token.is_active)
self.assertEqual(self.token.allowed_tools, ["search", "get_chunk"])

View File

@@ -6,7 +6,7 @@ from django.contrib.auth import get_user_model
from django.test import TestCase from django.test import TestCase
from django.utils import timezone from django.utils import timezone
from mcp_server.models import MCPToken from mcp_server.models import MCPToken, hash_token
User = get_user_model() User = get_user_model()
@@ -17,21 +17,33 @@ class MCPTokenModelTest(TestCase):
username="alice", email="alice@example.com", password="pw" username="alice", email="alice@example.com", password="pw"
) )
def test_token_auto_generated(self): def test_create_token_returns_plaintext_and_stores_hash(self):
token = MCPToken.objects.create(user=self.user, name="t") token, plaintext = MCPToken.objects.create_token(user=self.user, name="t")
self.assertTrue(token.token) self.assertTrue(plaintext)
self.assertGreater(len(token.token), 20) self.assertGreater(len(plaintext), 20)
# Database stores hash, not plaintext
self.assertEqual(len(token.token_hash), 64)
self.assertNotEqual(token.token_hash, plaintext)
self.assertEqual(token.token_hash, hash_token(plaintext))
def test_token_hash_never_equals_plaintext(self):
# Regression guard: if anyone ever wires plaintext back into token_hash,
# this fails.
token, plaintext = MCPToken.objects.create_token(user=self.user, name="t")
self.assertNotIn(plaintext, token.token_hash)
def test_active_token_is_valid(self): def test_active_token_is_valid(self):
token = MCPToken.objects.create(user=self.user, name="t") token, _ = MCPToken.objects.create_token(user=self.user, name="t")
self.assertTrue(token.is_valid) self.assertTrue(token.is_valid)
def test_inactive_token_not_valid(self): def test_inactive_token_not_valid(self):
token = MCPToken.objects.create(user=self.user, name="t", is_active=False) token, _ = MCPToken.objects.create_token(user=self.user, name="t")
token.is_active = False
token.save()
self.assertFalse(token.is_valid) self.assertFalse(token.is_valid)
def test_expired_token_not_valid(self): def test_expired_token_not_valid(self):
token = MCPToken.objects.create( token, _ = MCPToken.objects.create_token(
user=self.user, user=self.user,
name="t", name="t",
expires_at=timezone.now() - timedelta(hours=1), expires_at=timezone.now() - timedelta(hours=1),
@@ -39,25 +51,27 @@ class MCPTokenModelTest(TestCase):
self.assertFalse(token.is_valid) self.assertFalse(token.is_valid)
def test_unrestricted_permits_all(self): def test_unrestricted_permits_all(self):
token = MCPToken.objects.create(user=self.user, name="t") token, _ = MCPToken.objects.create_token(user=self.user, name="t")
self.assertTrue(token.can_use_tool("anything")) self.assertTrue(token.can_use_tool("anything"))
def test_tool_whitelist(self): def test_tool_whitelist(self):
token = MCPToken.objects.create( token, _ = MCPToken.objects.create_token(
user=self.user, name="t", allowed_tools=["search"] user=self.user, name="t", allowed_tools=["search"]
) )
self.assertTrue(token.can_use_tool("search")) self.assertTrue(token.can_use_tool("search"))
self.assertFalse(token.can_use_tool("get_chunk")) self.assertFalse(token.can_use_tool("get_chunk"))
def test_record_usage(self): def test_record_usage(self):
token = MCPToken.objects.create(user=self.user, name="t") token, _ = MCPToken.objects.create_token(user=self.user, name="t")
self.assertIsNone(token.last_used_at) self.assertIsNone(token.last_used_at)
token.record_usage() token.record_usage()
token.refresh_from_db() token.refresh_from_db()
self.assertIsNotNone(token.last_used_at) self.assertIsNotNone(token.last_used_at)
def test_masked_token(self): def test_masked_token_is_hash_prefix(self):
token = MCPToken.objects.create(user=self.user, name="t") token, plaintext = MCPToken.objects.create_token(user=self.user, name="t")
masked = token.get_masked_token() masked = token.get_masked_token()
self.assertTrue(masked.endswith(token.token[-8:])) self.assertTrue(masked.startswith("mcp_…"))
self.assertIn("*", masked) self.assertIn(token.token_hash[:8], masked)
# Plaintext must never leak through the masked display
self.assertNotIn(plaintext, masked)

View File

@@ -0,0 +1,177 @@
"""View tests for the MCP token self-service dashboard."""
from django.contrib.auth import get_user_model
from django.test import TestCase
from django.urls import reverse
from mcp_server.models import MCPToken
User = get_user_model()
class TokenListViewTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(
username="alice", email="alice@example.com", password="pw"
)
self.url = reverse("mcp_server:mcp-token-list")
def test_login_required(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 302)
self.assertIn("/login/", resp.url)
def test_list_shows_only_own_tokens(self):
other = User.objects.create_user(username="bob", password="pw")
MCPToken.objects.create_token(user=self.user, name="mine")
MCPToken.objects.create_token(user=other, name="theirs")
self.client.force_login(self.user)
resp = self.client.get(self.url)
self.assertContains(resp, "mine")
self.assertNotContains(resp, "theirs")
def test_empty_state(self):
self.client.force_login(self.user)
resp = self.client.get(self.url)
self.assertContains(resp, "No MCP tokens yet.")
class TokenCreateViewTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="alice", password="pw")
self.client.force_login(self.user)
self.url = reverse("mcp_server:mcp-token-create")
def test_get_renders_form(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, "Generate MCP Token")
def test_post_creates_token_and_shows_plaintext_once(self):
resp = self.client.post(self.url, {"name": "Claude Desktop"})
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, "Save this token now")
# Pull the created row, verify the response contained a plaintext that
# is NOT what we stored.
token = MCPToken.objects.get(user=self.user, name="Claude Desktop")
self.assertNotContains(resp, token.token_hash) # hash is not what we display
# And the detail page never renders the plaintext.
body = resp.content.decode()
# Find the plaintext from the response: the only long alphanumeric
# block inside the #mcp-plaintext div.
import re
m = re.search(r'id="mcp-plaintext">([A-Za-z0-9_\-]+)<', body)
self.assertIsNotNone(m, "plaintext block not found in response")
plaintext = m.group(1)
# Sanity: round-tripping the plaintext through hash_token reproduces
# what's stored.
from mcp_server.models import hash_token
self.assertEqual(hash_token(plaintext), token.token_hash)
# Detail page must NOT contain the plaintext.
detail_resp = self.client.get(
reverse("mcp_server:mcp-token-detail", args=[token.pk])
)
self.assertNotContains(detail_resp, plaintext)
def test_post_invalid_renders_form_again(self):
resp = self.client.post(self.url, {"name": ""})
self.assertEqual(resp.status_code, 200)
self.assertContains(resp, "Generate MCP Token")
self.assertEqual(MCPToken.objects.count(), 0)
class TokenDetailViewTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="alice", password="pw")
self.client.force_login(self.user)
self.token, _ = MCPToken.objects.create_token(user=self.user, name="t")
def test_renders_token(self):
resp = self.client.get(
reverse("mcp_server:mcp-token-detail", args=[self.token.pk])
)
self.assertContains(resp, self.token.name)
self.assertContains(resp, self.token.get_masked_token())
def test_cannot_view_other_users_token(self):
other = User.objects.create_user(username="bob", password="pw")
other_token, _ = MCPToken.objects.create_token(user=other, name="theirs")
resp = self.client.get(
reverse("mcp_server:mcp-token-detail", args=[other_token.pk])
)
self.assertEqual(resp.status_code, 404)
class TokenEditViewTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="alice", password="pw")
self.client.force_login(self.user)
self.token, _ = MCPToken.objects.create_token(user=self.user, name="t")
def test_post_updates_metadata(self):
resp = self.client.post(
reverse("mcp_server:mcp-token-edit", args=[self.token.pk]),
{
"name": "Renamed",
"is_active": "on",
"expires_at": "",
"allowed_tools": ["search"],
},
)
self.assertEqual(resp.status_code, 302)
self.token.refresh_from_db()
self.assertEqual(self.token.name, "Renamed")
self.assertEqual(self.token.allowed_tools, ["search"])
def test_cannot_edit_other_users_token(self):
other = User.objects.create_user(username="bob", password="pw")
other_token, _ = MCPToken.objects.create_token(user=other, name="theirs")
resp = self.client.post(
reverse("mcp_server:mcp-token-edit", args=[other_token.pk]),
{"name": "hacked"},
)
self.assertEqual(resp.status_code, 404)
class TokenRevokeViewTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="alice", password="pw")
self.client.force_login(self.user)
self.token, _ = MCPToken.objects.create_token(user=self.user, name="t")
def test_revoke_sets_inactive_keeps_row(self):
url = reverse("mcp_server:mcp-token-revoke", args=[self.token.pk])
resp = self.client.post(url)
self.assertEqual(resp.status_code, 302)
self.token.refresh_from_db()
self.assertFalse(self.token.is_active)
# Row still exists for audit trail.
self.assertTrue(MCPToken.objects.filter(pk=self.token.pk).exists())
def test_get_not_allowed(self):
url = reverse("mcp_server:mcp-token-revoke", args=[self.token.pk])
resp = self.client.get(url)
self.assertEqual(resp.status_code, 405)
class TokenDeleteViewTest(TestCase):
def setUp(self):
self.user = User.objects.create_user(username="alice", password="pw")
self.client.force_login(self.user)
self.token, _ = MCPToken.objects.create_token(user=self.user, name="t")
def test_delete_removes_row(self):
url = reverse("mcp_server:mcp-token-delete", args=[self.token.pk])
resp = self.client.post(url)
self.assertEqual(resp.status_code, 302)
self.assertFalse(MCPToken.objects.filter(pk=self.token.pk).exists())
def test_cannot_delete_other_users_token(self):
other = User.objects.create_user(username="bob", password="pw")
other_token, _ = MCPToken.objects.create_token(user=other, name="theirs")
url = reverse("mcp_server:mcp-token-delete", args=[other_token.pk])
resp = self.client.post(url)
self.assertEqual(resp.status_code, 404)
self.assertTrue(MCPToken.objects.filter(pk=other_token.pk).exists())

View File

@@ -0,0 +1,16 @@
"""URL routes for the MCP token self-service dashboard."""
from django.urls import path
from . import views
app_name = "mcp_server"
urlpatterns = [
path("profile/mcp-tokens/", views.mcp_token_list, name="mcp-token-list"),
path("profile/mcp-tokens/add/", views.mcp_token_create, name="mcp-token-create"),
path("profile/mcp-tokens/<int:pk>/", views.mcp_token_detail, name="mcp-token-detail"),
path("profile/mcp-tokens/<int:pk>/edit/", views.mcp_token_edit, name="mcp-token-edit"),
path("profile/mcp-tokens/<int:pk>/revoke/", views.mcp_token_revoke, name="mcp-token-revoke"),
path("profile/mcp-tokens/<int:pk>/delete/", views.mcp_token_delete, name="mcp-token-delete"),
]

View File

@@ -0,0 +1,95 @@
"""Self-service dashboard for MCP bearer tokens.
Mirrors the Themis API-keys flow visually but stores hashed tokens. Plaintext
is shown to the user exactly once (on the create-success page) and never
persisted.
"""
from __future__ import annotations
from django.contrib import messages
from django.contrib.auth.decorators import login_required
from django.http import HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404, redirect, render
from django.views.decorators.http import require_GET, require_http_methods, require_POST
from .forms import MCPTokenCreateForm, MCPTokenEditForm
from .models import MCPToken
@login_required
@require_GET
def mcp_token_list(request: HttpRequest) -> HttpResponse:
tokens = MCPToken.objects.filter(user=request.user).order_by("-created_at")
return render(request, "mcp_server/tokens/list.html", {"tokens": tokens})
@login_required
@require_http_methods(["GET", "POST"])
def mcp_token_create(request: HttpRequest) -> HttpResponse:
if request.method == "POST":
form = MCPTokenCreateForm(request.POST)
if form.is_valid():
token, plaintext = MCPToken.objects.create_token(
user=request.user,
name=form.cleaned_data["name"],
allowed_tools=form.cleaned_data.get("allowed_tools") or [],
expires_at=form.cleaned_data.get("expires_at") or None,
)
return render(
request,
"mcp_server/tokens/created.html",
{"token": token, "plaintext": plaintext},
)
else:
form = MCPTokenCreateForm()
return render(request, "mcp_server/tokens/create.html", {"form": form})
@login_required
@require_GET
def mcp_token_detail(request: HttpRequest, pk: int) -> HttpResponse:
token = get_object_or_404(MCPToken, pk=pk, user=request.user)
return render(request, "mcp_server/tokens/detail.html", {"token": token})
@login_required
@require_http_methods(["GET", "POST"])
def mcp_token_edit(request: HttpRequest, pk: int) -> HttpResponse:
token = get_object_or_404(MCPToken, pk=pk, user=request.user)
if request.method == "POST":
form = MCPTokenEditForm(request.POST, instance=token)
if form.is_valid():
instance = form.save(commit=False)
instance.allowed_tools = form.cleaned_data.get("allowed_tools") or []
instance.save()
messages.success(request, "MCP token updated.")
return redirect("mcp_server:mcp-token-detail", pk=token.pk)
else:
form = MCPTokenEditForm(instance=token)
return render(
request, "mcp_server/tokens/edit.html", {"form": form, "token": token}
)
@login_required
@require_POST
def mcp_token_revoke(request: HttpRequest, pk: int) -> HttpResponse:
token = get_object_or_404(MCPToken, pk=pk, user=request.user)
token.is_active = False
token.save(update_fields=["is_active", "updated_at"])
messages.success(request, f"Revoked “{token.name}”. The token can no longer be used.")
return redirect("mcp_server:mcp-token-detail", pk=token.pk)
@login_required
@require_POST
def mcp_token_delete(request: HttpRequest, pk: int) -> HttpResponse:
token = get_object_or_404(MCPToken, pk=pk, user=request.user)
name = token.name
token.delete()
messages.success(request, f"Deleted “{name}”.")
return redirect("mcp_server:mcp-token-list")

View File

@@ -25,4 +25,6 @@ urlpatterns = [
path("library/", include("library.urls")), path("library/", include("library.urls")),
# LLM Manager # LLM Manager
path("llm/", include("llm_manager.urls")), path("llm/", include("llm_manager.urls")),
# MCP server (token dashboard at /profile/mcp-tokens/)
path("", include("mcp_server.urls")),
] ]

View File

@@ -52,15 +52,20 @@ class PostgreSQLTestRunner(DiscoverRunner):
from django.db import connections from django.db import connections
db_cfg = self.pg_manager.get_django_database_config() db_cfg = self.pg_manager.get_django_database_config()
# Preserve Django's defaulted TEST sub-dict (CHARSET/MIRROR/MIGRATE…). # Preserve Django's defaulted top-level keys (ATOMIC_REQUESTS,
existing_test = connections["default"].settings_dict.get("TEST", {}) # AUTOCOMMIT, OPTIONS, …) and the TEST sub-dict (CHARSET, MIRROR,
merged_test = {**existing_test, **db_cfg.get("TEST", {})} # MIGRATE, …) — these are populated lazily by Django and absent from
db_cfg["TEST"] = merged_test # the user's raw settings.DATABASES, so a naive overwrite breaks
settings.DATABASES["default"] = db_cfg # request handling that consults them.
existing = connections["default"].settings_dict
existing_test = existing.get("TEST", {})
merged = {**existing, **db_cfg}
merged["TEST"] = {**existing_test, **db_cfg.get("TEST", {})}
settings.DATABASES["default"] = merged
# The default connection was instantiated at Django bootstrap; its # The default connection was instantiated at Django bootstrap; its
# settings_dict is independent of settings.DATABASES. Sync it # settings_dict is independent of settings.DATABASES. Sync it
# manually so test code talks to the container, not the dev DB. # manually so test code talks to the container, not the dev DB.
connections["default"].settings_dict.update(db_cfg) connections["default"].settings_dict.update(merged)
logger.info("PostgreSQL test DB ready on port %s", self.pg_manager.assigned_port) logger.info("PostgreSQL test DB ready on port %s", self.pg_manager.assigned_port)
# ── Neo4j ────────────────────────────────────────────────────── # ── Neo4j ──────────────────────────────────────────────────────

View File

@@ -0,0 +1,18 @@
# Generated by Django 5.2.13 on 2026-04-27 11:30
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('themis', '0003_alter_userprofile_current_timezone_and_more'),
]
operations = [
migrations.AlterField(
model_name='userapikey',
name='key_type',
field=models.CharField(choices=[('api', 'API Key'), ('dav', 'DAV Credentials'), ('token', 'Access Token'), ('secret', 'Secret Key'), ('other', 'Other')], default='api', help_text='Type of credential', max_length=30),
),
]

View File

@@ -228,7 +228,6 @@ class UserAPIKey(models.Model):
KEY_TYPE_CHOICES = [ KEY_TYPE_CHOICES = [
("api", "API Key"), ("api", "API Key"),
("mcp", "MCP Server"),
("dav", "DAV Credentials"), ("dav", "DAV Credentials"),
("token", "Access Token"), ("token", "Access Token"),
("secret", "Secret Key"), ("secret", "Secret Key"),

View File

@@ -38,6 +38,16 @@
API Keys API Keys
</a> </a>
</li> </li>
<li>
<a href="{% url 'mcp_server:mcp-token-list' %}">
<svg xmlns="http://www.w3.org/2000/svg" class="h-4 w-4" fill="none"
viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M9 12l2 2 4-4m5.618-4.016A11.955 11.955 0 0112 2.944a11.955 11.955 0 01-8.618 3.04A12.02 12.02 0 003 9c0 5.591 3.824 10.29 9 11.622 5.176-1.332 9-6.03 9-11.622 0-1.042-.133-2.052-.382-3.016z" />
</svg>
MCP Tokens
</a>
</li>
<div class="divider my-0"></div> <div class="divider my-0"></div>
<li> <li>
<form method="post" action="{% url 'logout' %}"> <form method="post" action="{% url 'logout' %}">

View File

@@ -248,7 +248,7 @@ class APIKeyEditFormTest(TestCase):
form = APIKeyEditForm( form = APIKeyEditForm(
data={ data={
"service_name": "Updated Service", "service_name": "Updated Service",
"key_type": "mcp", "key_type": "token",
"label": "Updated", "label": "Updated",
"instructions": "", "instructions": "",
"help_url": "", "help_url": "",
@@ -260,6 +260,6 @@ class APIKeyEditFormTest(TestCase):
form.save() form.save()
self.key.refresh_from_db() self.key.refresh_from_db()
self.assertEqual(self.key.service_name, "Updated Service") self.assertEqual(self.key.service_name, "Updated Service")
self.assertEqual(self.key.key_type, "mcp") self.assertEqual(self.key.key_type, "token")
self.assertFalse(self.key.is_active) self.assertFalse(self.key.is_active)
self.assertEqual(self.key.encrypted_value, original_encrypted) self.assertEqual(self.key.encrypted_value, original_encrypted)

View File

@@ -209,7 +209,6 @@ class UserAPIKeyModelTest(TestCase):
"""KEY_TYPE_CHOICES contains expected types.""" """KEY_TYPE_CHOICES contains expected types."""
type_keys = [t[0] for t in UserAPIKey.KEY_TYPE_CHOICES] type_keys = [t[0] for t in UserAPIKey.KEY_TYPE_CHOICES]
self.assertIn("api", type_keys) self.assertIn("api", type_keys)
self.assertIn("mcp", type_keys)
self.assertIn("dav", type_keys) self.assertIn("dav", type_keys)
self.assertIn("token", type_keys) self.assertIn("token", type_keys)
self.assertIn("secret", type_keys) self.assertIn("secret", type_keys)