docs: replace daedalus-service basic auth with per-user DRF tokens
All checks were successful
CVE Scan & Docker Build / security-scan (push) Successful in 56s
CVE Scan & Docker Build / build-and-push (push) Successful in 3m30s

This commit is contained in:
2026-05-22 22:59:59 -04:00
parent 7296b8c42f
commit 409da7d109
17 changed files with 364 additions and 163 deletions

View File

@@ -367,9 +367,12 @@ Mnemosyne validates the JWT against `MCPSigningKey` keyed by `kid`.
## 7. REST API — Mnemosyne team lifecycle
All endpoints live under `/mcp_server/api/teams/` and are protected
by the existing `daedalus-service` HTTP Basic account (same auth as
`/library/api/workspaces/` and `/library/api/ingest/`).
All endpoints live under `/mcp_server/api/teams/` and are authenticated
as the Mnemosyne user the team belongs to via a per-user DRF token
(`Authorization: Token <key>`, surfaced on `/profile/settings/`). Each
team has an `owner` FK; non-owners receive 404 (never 403) so a team's
existence isn't disclosed across users. `/library/api/workspaces/` and
`/library/api/ingest/` use the same per-user auth model.
### 7.1 `POST /mcp_server/api/teams/`
Create a team.
@@ -733,7 +736,8 @@ escape hatch for hard compartmentalization.
* `TeamWorkspaceAssignment` PUT is idempotent and replaces, not
unions.
* `/mcp_server/api/teams/` endpoints: create, delete, rotate,
workspaces PUT, all authenticated as `daedalus-service`.
workspaces PUT, all authenticated with a per-user DRF token and
scoped to the team's `owner` (non-owner requests return 404).
### 14.2 Daedalus test surface
* `on_pallas_registered` populates `team_jwt_encrypted` and transitions

View File

@@ -92,14 +92,6 @@ docker compose -f /srv/mnemosyne/docker-compose.yaml \
docker compose -f /srv/mnemosyne/docker-compose.yaml \
run --rm app setup
# Create the daedalus-service user (HTTP Basic auth for ingest API)
# Pass --password from vault; idempotent if user already exists.
docker compose -f /srv/mnemosyne/docker-compose.yaml \
run --rm app \
python manage.py ensure_service_user \
--username daedalus-service \
--password "{{ vault_mnemosyne_daedalus_service_password }}"
# Seed the MCPSigningKey used to sign long-lived Pallas team JWTs.
# --retire-other deactivates any previously-active key. The hex
# emitted to stdout is persisted in Mnemosyne's database and is
@@ -321,13 +313,16 @@ curl -f http://puck.incus:23181/healthz
curl http://puck.incus:23181/metrics | head -5
```
### Verify the daedalus-service account
### Verify Daedalus auth (per-user API token)
Daedalus now authenticates as a Mnemosyne user via the DRF token shown
on `/profile/settings/`. To smoke-test from a deploy host:
```bash
curl -u daedalus-service:<password> \
https://mnemosyne.ouranos.helu.ca/library/api/workspaces/ \
curl -H "Authorization: Token <user-api-token>" \
https://mnemosyne.ouranos.helu.ca/library/api/workspaces/ws_smoke/ \
-o /dev/null -w "%{http_code}"
# Expect: 200
# Expect: 200 if the workspace exists for that user, 404 otherwise.
```
### Verify MCP connectivity (from a client with a valid MCPToken)
@@ -401,6 +396,5 @@ will report as a failure.
| `vault_daedalus_s3_read_secret` | `DAEDALUS_S3_SECRET_ACCESS_KEY` |
| `vault_rabbitmq_password` | embedded in `CELERY_BROKER_URL` |
| `vault_mnemosyne_llm_encryption_key` | `LLM_API_SECRETS_ENCRYPTION_KEY` |
| `vault_mnemosyne_daedalus_service_password` | passed to `ensure_service_user --password` |
| `vault_mnemosyne_casdoor_client_id` | `CASDOOR_CLIENT_ID` |
| `vault_mnemosyne_casdoor_client_secret` | `CASDOOR_CLIENT_SECRET` |

View File

@@ -8,7 +8,7 @@ This document describes Mnemosyne's role in the Daedalus + Pallas architecture a
Mnemosyne exposes two interfaces for the wider Ouranos ecosystem:
1. **REST API** (`/library/api/*`) — consumed by the Daedalus backend (HTTP Basic auth, service account `daedalus-service`) for workspace lifecycle and asynchronous file ingestion. Phase 1, **implemented**.
1. **REST API** (`/library/api/*`) — consumed by the Daedalus backend authenticated as the owning Mnemosyne user via a per-user DRF token (`Authorization: Token <key>`, surfaced on `/profile/settings/`) for workspace lifecycle and asynchronous file ingestion. Phase 1, **implemented**.
2. **MCP Server** (port 22091 internal, `/mcp/` via nginx on 23090) — exposes search, browse, and retrieval tools. Phase 5 of Mnemosyne's own roadmap, **implemented** with workspace-scoped access control via long-lived team JWTs. Consumed by Pallas FastAgents in production (Daedalus integration Phase 2, **implemented** — see [Phase 3 of this doc](#3-phase-3-long-lived-team-jwt-access-control-for-pallas-instances)).
### Phase status
@@ -105,7 +105,7 @@ Auth is controlled by `MCP_REQUIRE_AUTH` in `.env`. Production sets it to `True`
## 2. REST API for Daedalus
All endpoints require HTTP Basic auth as `daedalus-service`. They are consumed by the Daedalus FastAPI backend only — not by any frontend.
All endpoints require an `Authorization: Token <key>` header carrying the DRF token of the Mnemosyne user the workspace belongs to (surfaced on `/profile/settings/`). Workspaces are scoped to their creating user via the `Library.owner_username` property; cross-user access returns 404. They are consumed by the Daedalus FastAPI backend only — not by any frontend.
### Workspace lifecycle
@@ -354,7 +354,7 @@ mnemosyne_s3_operations_total{operation,status} counter
- [x] `GET /library/api/jobs/{job_id}/`, `POST .../retry/`, `GET /library/api/jobs/`
- [x] `library.tasks.ingest_from_daedalus` Celery task with content-hash-aware supersede logic
- [x] `library.services.daedalus_s3` cross-bucket fetch + copy
- [x] HTTP Basic auth via `daedalus-service` user
- [x] Per-user DRF token auth (`Authorization: Token <key>`); workspaces scoped to the owning user via `Library.owner_username`
### Phase 2 — MCP Server (Mnemosyne roadmap Phase 5) ✅ Implemented
- [x] `mcp_server/` module following the [Django MCP Pattern](Pattern_Django-MCP_V1-00.md)

View File

@@ -5,9 +5,12 @@ A "workspace" in Mnemosyne is a Library scoped to a Daedalus workspace UUID.
It uses the same Library node as a global library; the difference is that
`workspace_id` is set, and search must filter on it.
These endpoints are called by the Daedalus backend (HTTP Basic auth as
the `daedalus-service` user). Daedalus owns the workspace_id; Mnemosyne
just persists what Daedalus tells it.
These endpoints are called by the Daedalus backend authenticated as the
Mnemosyne user the workspace belongs to (per-user DRF token). The
workspace's owning user is recorded on the Library node as
``owner_username``; every read and mutation is scoped to that user.
Non-owners receive 404 so a workspace's existence isn't disclosed
across users.
"""
import logging
@@ -72,6 +75,17 @@ def workspace_create(request):
existing = None
if existing is not None:
if existing.owner_username and existing.owner_username != request.user.username:
# Same workspace_id under a different owner. Don't leak the
# collision shape; surface a generic conflict.
logger.warning(
"workspace_create owner_conflict workspace_id=%s caller=%s",
data["workspace_id"], request.user.username,
)
return Response(
{"detail": "Workspace id is already in use."},
status=status.HTTP_409_CONFLICT,
)
if existing.library_type != data["library_type"]:
return Response(
{
@@ -98,6 +112,7 @@ def workspace_create(request):
library_type=data["library_type"],
description=data.get("description", ""),
workspace_id=data["workspace_id"],
owner_username=request.user.username,
chunking_config=defaults["chunking_config"],
embedding_instruction=defaults["embedding_instruction"],
reranker_instruction=defaults["reranker_instruction"],
@@ -127,21 +142,26 @@ def workspace_detail_or_delete(request, workspace_id):
"""
from library.models import Library
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
lib = None
# Cross-user reads/writes look like "not found" — don't disclose
# existence across users.
if lib is not None and lib.owner_username != request.user.username:
lib = None
if request.method == "GET":
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
if lib is None:
return Response(
{"detail": "Workspace not found."},
status=status.HTTP_404_NOT_FOUND,
)
return Response(WorkspaceStatusSerializer(_serialize_workspace(lib)).data)
# DELETE — idempotent: a missing workspace returns 204.
try:
lib = Library.nodes.get(workspace_id=workspace_id)
except Library.DoesNotExist:
# DELETE — idempotent: a missing (or unowned) workspace returns 204.
if lib is None:
return Response(status=status.HTTP_204_NO_CONTENT)
library_uid = lib.uid

View File

@@ -87,6 +87,11 @@ class Library(StructuredNode):
# libraries. Unique-indexed so a workspace cannot have two libraries.
workspace_id = StringProperty(unique_index=True, required=False)
# For workspace-scoped libraries: the Mnemosyne username that owns
# the workspace. Mutations via the workspaces API are restricted to
# this user. Null for global libraries.
owner_username = StringProperty(required=False, index=True)
# Content-type configuration
chunking_config = JSONProperty(default={})
embedding_instruction = StringProperty(default="")

View File

@@ -2,8 +2,8 @@
These endpoints are the Daedalus → Mnemosyne control plane described
in §7 of ``docs/DAEDALUS_PALLAS_INTEGRATION_v1.md``. They are called
by the ``daedalus-service`` account, not by end users or bearer
tokens.
by Daedalus authenticated as the Mnemosyne user the team belongs to
(per-user DRF token).
"""
from __future__ import annotations

View File

@@ -2,9 +2,11 @@
Mounted under ``/mcp_server/api/teams/`` — see §7 of
``docs/DAEDALUS_PALLAS_INTEGRATION_v1.md``. Every endpoint is
``IsAuthenticated``-gated against the ``daedalus-service`` HTTP Basic
account (same surface as ``/library/api/workspaces/``). Endpoints are
designed to be idempotent where possible:
``IsAuthenticated``-gated and scoped to ``request.user``: the user that
created a team via ``POST /`` is the only user that can read, mutate,
or rotate it. Non-owners receive 404 (not 403) so a team's existence
isn't disclosed across users. Endpoints are designed to be idempotent
where possible:
* ``POST /`` — create a team by UUID; a second POST with
the same id returns 200 without a new JWT.
@@ -46,9 +48,14 @@ logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
def _get_team(team_id):
"""Load a team by UUID or None. Callers 404 on None."""
return Team.objects.filter(pk=team_id).first()
def _get_team(team_id, user):
"""Load a team by UUID *for this user* or None.
Returns None both when the team does not exist and when it exists
but is owned by another user. Callers 404 on None — they must not
distinguish the two cases.
"""
return Team.objects.filter(pk=team_id, owner=user).first()
def _mint_with_fresh_jti(team: Team) -> str:
@@ -75,8 +82,19 @@ def team_create(request):
serializer.is_valid(raise_exception=True)
data = serializer.validated_data
team = _get_team(data["id"])
if team is not None:
existing = Team.objects.filter(pk=data["id"]).first()
if existing is not None:
if existing.owner_id != request.user.id:
# Same id, different owner. Surfacing existence would leak
# one user's team id to another; treat as a generic conflict.
logger.warning(
"team_create owner_conflict team_id=%s caller=%s",
existing.id, request.user.id,
)
return Response(
{"detail": "Team id is already in use."},
status=status.HTTP_409_CONFLICT,
)
# Idempotent: surface current state without a new JWT. If the
# caller wants to reactivate a soft-deleted team they must do
# so explicitly via the admin or a future endpoint; re-POST is
@@ -84,15 +102,17 @@ def team_create(request):
# on every retry storm.
logger.info(
"team_create idempotent_hit team_id=%s name=%s active=%s",
team.id, team.name, team.active,
existing.id, existing.name, existing.active,
)
return Response(
TeamPublicSerializer(team).data, status=status.HTTP_200_OK
TeamPublicSerializer(existing).data, status=status.HTTP_200_OK
)
try:
with transaction.atomic():
team = Team.objects.create(id=data["id"], name=data["name"])
team = Team.objects.create(
id=data["id"], name=data["name"], owner=request.user
)
jwt_string = _mint_with_fresh_jti(team)
except TeamJWTError as exc:
# Rolling back the create is fine — we have no signing key yet
@@ -130,7 +150,7 @@ def team_detail(request, team_id):
in the database for audit; call POST ``/`` with the same id to
re-materialize a fresh team if needed (operator decision).
"""
team = _get_team(team_id)
team = _get_team(team_id, request.user)
if team is None:
return Response(
{"detail": "Team not found."}, status=status.HTTP_404_NOT_FOUND
@@ -163,7 +183,7 @@ def team_workspaces(request, team_id):
Diff is computed in-DB so we don't thrash rows that already match —
only the net-add and net-remove rows are touched.
"""
team = _get_team(team_id)
team = _get_team(team_id, request.user)
if team is None:
return Response(
{"detail": "Team not found."}, status=status.HTTP_404_NOT_FOUND
@@ -221,7 +241,7 @@ def team_rotate(request, team_id):
returns 409 so the operator is forced to go through the explicit
create/readd flow rather than quietly resurrecting a team.
"""
team = _get_team(team_id)
team = _get_team(team_id, request.user)
if team is None:
return Response(
{"detail": "Team not found."}, status=status.HTTP_404_NOT_FOUND

View File

@@ -1,8 +1,10 @@
"""URL patterns for the ``/mcp_server/api/`` DRF control-plane API.
These endpoints are called by the Daedalus backend (HTTP Basic auth
as ``daedalus-service``). End-user MCP traffic does NOT go through
this surface — that's ``mnemosyne.asgi`` / ``mcp_server/server.py``.
These endpoints are called by the Daedalus backend authenticated as
the owning Mnemosyne user (per-user DRF token). Every team is scoped
to its ``owner`` — cross-user access returns 404. End-user MCP traffic
does NOT go through this surface — that's ``mnemosyne.asgi`` /
``mcp_server/server.py``.
"""
from __future__ import annotations

View File

@@ -34,7 +34,6 @@ from collections import OrderedDict
import jwt as pyjwt
from asgiref.sync import sync_to_async
from django.conf import settings
from django.utils import timezone
from fastmcp.server.dependencies import get_http_request
from fastmcp.server.middleware import Middleware, MiddlewareContext
@@ -558,29 +557,39 @@ class MCPAuthMiddleware(Middleware):
def _resolve_jwt_actor(claims: dict):
"""Resolve the synthetic actor for a JWT-authenticated turn.
"""Resolve the acting user for a JWT-authenticated turn.
Returns the system service user (``MCP_JWT_SERVICE_USERNAME``, default
``daedalus-service``). The user must exist and be active. JWT tokens
are not tied to per-user accounts — claims encode all authorization.
For ``typ=team`` JWTs (the only kind we mint), the actor is the
``Team.owner`` — the Mnemosyne user that created the team. Usage
accounting and the audit trail attribute the turn to that user.
Used for both per-turn and team JWTs. The service user is a hook for
usage accounting (LLMUsage / search metrics) and for the audit trail;
authorization does not depend on it.
For legacy per-turn JWTs (``iss=daedalus``, retiring in Phase 4), no
user binding exists in the claims; we cannot attribute the turn to a
Mnemosyne user and the path is rejected. If a deployment still
accepts per-turn JWTs, that work needs to land first.
"""
from django.contrib.auth import get_user_model
User = get_user_model()
username = getattr(settings, "MCP_JWT_SERVICE_USERNAME", "daedalus-service")
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
if claims.get("typ") != "team":
raise MCPAuthError(
f"JWT service user {username!r} does not exist; provision via management command."
"Per-turn JWTs are no longer accepted; mint a team JWT."
)
if not user.is_active:
raise MCPAuthError(f"JWT service user {username!r} is disabled.")
return user
# resolve_mcp_jwt has already parsed the team UUID out of ``sub`` and
# stashed it as ``team_id`` for the team branch.
team_id = claims.get("team_id")
if team_id is None:
raise MCPAuthError("Team JWT missing team_id claim.")
try:
team = Team.objects.select_related("owner").get(pk=team_id)
except Team.DoesNotExist:
raise MCPAuthError("Team JWT references a team that no longer exists.")
if not team.active:
raise MCPAuthError("Team JWT references an inactive team.")
if not team.owner.is_active:
raise MCPAuthError(
f"Team owner {team.owner.username!r} is disabled."
)
return team.owner
def _resolved_libraries_for_jwt(claims: dict) -> list[str]:

View File

@@ -1,59 +0,0 @@
"""Idempotently create the service user that JWT-authenticated MCP requests act as.
Daedalus mints per-turn JWTs whose claims encode all authorization (workspace,
allowed libraries). The Django ``user`` field on the request still needs to
point at *something* — the service user is that something. It owns no data
and does not log in via the dashboard.
"""
import secrets
from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand
class Command(BaseCommand):
help = "Idempotently create or reactivate the JWT service user (default 'daedalus-service')."
def add_arguments(self, parser):
parser.add_argument("--username", default="daedalus-service")
parser.add_argument("--email", default="daedalus-service@local")
parser.add_argument(
"--password",
default=None,
help=(
"Password for HTTP Basic auth (Daedalus REST calls). "
"Omit to set a random unusable password (JWT-only mode)."
),
)
def handle(self, *args, **options):
User = get_user_model()
username = options["username"]
email = options["email"]
password = options["password"] or secrets.token_urlsafe(32)
user, created = User.objects.get_or_create(
username=username,
defaults={"email": email, "is_active": True},
)
if created:
user.set_password(password)
user.save(update_fields=["password"])
self.stdout.write(self.style.SUCCESS(f"Created service user {username!r}"))
else:
changed = False
if not user.is_active:
user.is_active = True
changed = True
if user.email != email:
user.email = email
changed = True
if options["password"]:
user.set_password(password)
changed = True
if changed:
user.save(update_fields=["is_active", "email", "password"])
self.stdout.write(self.style.SUCCESS(f"Updated service user {username!r}"))
else:
self.stdout.write(f"Service user {username!r} already provisioned")

View File

@@ -0,0 +1,36 @@
"""Add ``owner`` FK to ``Team``.
The Daedalus integration moved from a shared ``daedalus-service`` HTTP Basic
account to per-user DRF tokens. Each team is now scoped to the Mnemosyne
user that created it.
No production Team rows exist at the time of this migration, so the FK is
added as non-null without a backfill. Any pre-existing row in a dev /
staging database will break this migration — drop the ``mcp_server_team``
table (and its dependent ``mcp_server_teamworkspaceassignment``) before
applying, or recreate the database from scratch.
"""
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
("mcp_server", "0001_initial"),
]
operations = [
migrations.AddField(
model_name="team",
name="owner",
field=models.ForeignKey(
on_delete=django.db.models.deletion.PROTECT,
related_name="teams",
to=settings.AUTH_USER_MODEL,
),
),
]

View File

@@ -266,12 +266,21 @@ class Team(models.Model):
auth middleware compares the incoming ``jti`` against this value.
``active=False`` soft-deletes the team — every validation will
reject tokens whose ``sub`` resolves to this row, so revocation
survives restart without needing a cache purge.
reject tokens for an inactive team, so revocation survives restart
without needing a cache purge.
``owner`` is the Mnemosyne user that created the team. Team
management endpoints scope by ``owner`` so that one user cannot
manage another user's teams.
"""
id = models.UUIDField(primary_key=True, editable=False)
name = models.CharField(max_length=200)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.PROTECT,
related_name="teams",
)
active = models.BooleanField(default=True)
active_jti = models.UUIDField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)

View File

@@ -34,6 +34,7 @@ from mcp_server.auth import (
MCPAuthError,
_libraries_for_team,
_remember_jti,
_resolve_jwt_actor,
_resolved_libraries_for_jwt,
looks_like_jwt,
resolve_mcp_jwt,
@@ -314,11 +315,16 @@ class PerTurnReplayCacheTest(TestCase):
class ResolveTeamJWTTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = User.objects.create_user(username="alice", password="pw")
def setUp(self):
self.key = _make_signing_key()
self.team = Team.objects.create(
id=uuid.uuid4(),
name="Pallas-Harper",
owner=self.owner,
active=True,
active_jti=uuid.uuid4(),
)
@@ -362,10 +368,15 @@ class ResolveTeamJWTTest(TestCase):
class LibrariesForTeamTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = User.objects.create_user(username="alice", password="pw")
def setUp(self):
self.team = Team.objects.create(
id=uuid.uuid4(),
name="T",
owner=self.owner,
active=True,
active_jti=uuid.uuid4(),
)
@@ -415,6 +426,10 @@ class LibrariesForTeamTest(TestCase):
class ResolvedLibrariesForJWTDispatcherTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = User.objects.create_user(username="alice", password="pw")
def test_per_turn_claims_use_libs(self):
claims = {"libs": ["one", "two"]}
self.assertEqual(
@@ -425,6 +440,7 @@ class ResolvedLibrariesForJWTDispatcherTest(TestCase):
team = Team.objects.create(
id=uuid.uuid4(),
name="T",
owner=self.owner,
active=True,
active_jti=uuid.uuid4(),
)
@@ -442,3 +458,51 @@ class ResolvedLibrariesForJWTDispatcherTest(TestCase):
)
out = _resolved_libraries_for_jwt(claims)
self.assertEqual(out, ["lib-team"])
# ---------------------------------------------------------------------------
# _resolve_jwt_actor — team JWTs resolve to the team's owner
# ---------------------------------------------------------------------------
class ResolveJWTActorTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = User.objects.create_user(username="alice", password="pw")
def setUp(self):
self.team = Team.objects.create(
id=uuid.uuid4(),
name="T",
owner=self.owner,
active=True,
active_jti=uuid.uuid4(),
)
def test_team_jwt_resolves_to_team_owner(self):
claims = {"typ": "team", "team_id": self.team.id}
self.assertEqual(_resolve_jwt_actor(claims), self.owner)
def test_inactive_team_rejected(self):
self.team.deactivate()
claims = {"typ": "team", "team_id": self.team.id}
with self.assertRaises(MCPAuthError):
_resolve_jwt_actor(claims)
def test_disabled_owner_rejected(self):
self.owner.is_active = False
self.owner.save(update_fields=["is_active"])
claims = {"typ": "team", "team_id": self.team.id}
with self.assertRaises(MCPAuthError):
_resolve_jwt_actor(claims)
def test_per_turn_jwt_rejected(self):
# No more service-user fallback: per-turn JWTs can't be attributed
# to a Mnemosyne user, so the resolver refuses them.
with self.assertRaises(MCPAuthError):
_resolve_jwt_actor({"libs": ["x"]})
def test_unknown_team_rejected(self):
claims = {"typ": "team", "team_id": uuid.uuid4()}
with self.assertRaises(MCPAuthError):
_resolve_jwt_actor(claims)

View File

@@ -128,15 +128,21 @@ class MCPTokenAllowedLibrariesTest(TestCase):
class TeamTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = get_user_model().objects.create_user(
username="alice", password="pw"
)
def test_create_with_explicit_uuid(self):
tid = uuid.uuid4()
team = Team.objects.create(id=tid, name="Harper")
team = Team.objects.create(id=tid, name="Harper", owner=self.owner)
self.assertEqual(team.id, tid)
self.assertTrue(team.active)
self.assertIsNone(team.active_jti)
def test_rotate_jti_installs_fresh_uuid(self):
team = Team.objects.create(id=uuid.uuid4(), name="t")
team = Team.objects.create(id=uuid.uuid4(), name="t", owner=self.owner)
first = team.rotate_jti()
self.assertIsInstance(first, uuid.UUID)
self.assertEqual(team.active_jti, first)
@@ -146,14 +152,14 @@ class TeamTest(TestCase):
self.assertEqual(team.active_jti, second)
def test_rotate_jti_persists(self):
team = Team.objects.create(id=uuid.uuid4(), name="t")
team = Team.objects.create(id=uuid.uuid4(), name="t", owner=self.owner)
team.rotate_jti()
# Reload from DB and make sure the UUID was committed.
reloaded = Team.objects.get(pk=team.id)
self.assertEqual(reloaded.active_jti, team.active_jti)
def test_deactivate_clears_active_jti(self):
team = Team.objects.create(id=uuid.uuid4(), name="t")
team = Team.objects.create(id=uuid.uuid4(), name="t", owner=self.owner)
team.rotate_jti()
self.assertTrue(team.active)
team.deactivate()
@@ -171,8 +177,16 @@ class TeamTest(TestCase):
class TeamWorkspaceAssignmentTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = get_user_model().objects.create_user(
username="alice", password="pw"
)
def setUp(self):
self.team = Team.objects.create(id=uuid.uuid4(), name="t")
self.team = Team.objects.create(
id=uuid.uuid4(), name="t", owner=self.owner
)
def test_unique_team_workspace_pair(self):
TeamWorkspaceAssignment.objects.create(
@@ -185,7 +199,9 @@ class TeamWorkspaceAssignmentTest(TestCase):
)
def test_same_workspace_different_teams_allowed(self):
other = Team.objects.create(id=uuid.uuid4(), name="t2")
other = Team.objects.create(
id=uuid.uuid4(), name="t2", owner=self.owner
)
TeamWorkspaceAssignment.objects.create(
team=self.team, workspace_id="ws-a"
)

View File

@@ -1,10 +1,11 @@
"""Tests for the ``/mcp_server/api/teams/`` REST control plane.
This is the Daedalus-facing surface described in §7 of
``docs/DAEDALUS_PALLAS_INTEGRATION_v1.md``. We do NOT exercise HTTP
Basic auth here (that's part of DRF / the project's session auth
stack); instead we use :meth:`APIClient.force_authenticate` to focus
on the endpoints' own idempotence and state-transition rules.
``docs/DAEDALUS_PALLAS_INTEGRATION_v1.md``. We do NOT exercise the
DRF Token / Session auth machinery here (that's covered by DRF
itself); instead we use :meth:`APIClient.force_authenticate` to focus
on the endpoints' own idempotence, ownership, and state-transition
rules.
"""
from __future__ import annotations
@@ -36,20 +37,23 @@ def _seed_signing_key() -> MCPSigningKey:
class _AuthenticatedAPITest(TestCase):
"""Shared ``APIClient`` authenticated as the service user.
"""Shared ``APIClient`` authenticated as a regular user.
The real deployment has Daedalus hit these endpoints as
``daedalus-service`` over HTTP Basic, but the view decorator is a
plain ``IsAuthenticated`` — so for unit-test purposes we use
``force_authenticate`` with any active user.
The endpoints scope by ``request.user`` (every team has an
``owner``); ``self.user`` is the team owner and ``self.other_user``
is used for cross-user access tests.
"""
def setUp(self):
self.service_user = User.objects.create_user(
username="daedalus-service", password="pw"
@classmethod
def setUpTestData(cls):
cls.user = User.objects.create_user(username="alice", password="pw")
cls.other_user = User.objects.create_user(
username="bob", password="pw"
)
def setUp(self):
self.client = APIClient()
self.client.force_authenticate(user=self.service_user)
self.client.force_authenticate(user=self.user)
# ---------------------------------------------------------------------------
@@ -106,6 +110,29 @@ class TeamCreateTest(_AuthenticatedAPITest):
self.assertEqual(decoded["sub"], f"team:{tid}")
self.assertEqual(decoded["jti"], str(team.active_jti))
def test_create_sets_request_user_as_owner(self):
tid = uuid.uuid4()
self.client.post(
self.url, {"id": str(tid), "name": "Harper"}, format="json"
)
self.assertEqual(Team.objects.get(pk=tid).owner_id, self.user.id)
def test_same_id_under_other_owner_409s(self):
tid = uuid.uuid4()
# Alice creates the team first.
self.client.post(
self.url, {"id": str(tid), "name": "Harper"}, format="json"
)
# Bob then tries to create with the same id — must be a generic
# conflict, not idempotent and not 200.
self.client.force_authenticate(user=self.other_user)
resp = self.client.post(
self.url, {"id": str(tid), "name": "Bob's"}, format="json"
)
self.assertEqual(resp.status_code, status.HTTP_409_CONFLICT)
# Owner unchanged.
self.assertEqual(Team.objects.get(pk=tid).owner_id, self.user.id)
def test_idempotent_on_same_id_returns_200_without_jwt(self):
tid = uuid.uuid4()
first = self.client.post(
@@ -152,7 +179,11 @@ class TeamDetailTest(_AuthenticatedAPITest):
def setUp(self):
super().setUp()
self.team = Team.objects.create(
id=uuid.uuid4(), name="t", active=True, active_jti=uuid.uuid4()
id=uuid.uuid4(),
name="t",
owner=self.user,
active=True,
active_jti=uuid.uuid4(),
)
TeamWorkspaceAssignment.objects.create(
team=self.team, workspace_id="ws-a"
@@ -197,6 +228,18 @@ class TeamDetailTest(_AuthenticatedAPITest):
resp = self.client.delete(self.url)
self.assertEqual(resp.status_code, status.HTTP_204_NO_CONTENT)
def test_get_by_non_owner_returns_404(self):
self.client.force_authenticate(user=self.other_user)
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
def test_delete_by_non_owner_returns_404_and_no_op(self):
self.client.force_authenticate(user=self.other_user)
resp = self.client.delete(self.url)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
# Original team is still active — non-owner couldn't soft-delete it.
self.assertTrue(Team.objects.get(pk=self.team.id).active)
# ---------------------------------------------------------------------------
# PUT /mcp_server/api/teams/{id}/workspaces/
@@ -207,7 +250,11 @@ class TeamWorkspacesTest(_AuthenticatedAPITest):
def setUp(self):
super().setUp()
self.team = Team.objects.create(
id=uuid.uuid4(), name="t", active=True, active_jti=uuid.uuid4()
id=uuid.uuid4(),
name="t",
owner=self.user,
active=True,
active_jti=uuid.uuid4(),
)
self.url = reverse(
"mcp-server-api:team-workspaces",
@@ -308,6 +355,14 @@ class TeamWorkspacesTest(_AuthenticatedAPITest):
)
self.assertEqual(resp.status_code, status.HTTP_400_BAD_REQUEST)
def test_put_by_non_owner_returns_404(self):
self.client.force_authenticate(user=self.other_user)
resp = self.client.put(
self.url, {"workspace_ids": ["ws-x"]}, format="json"
)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(self._ws_ids(), [])
# ---------------------------------------------------------------------------
# POST /mcp_server/api/teams/{id}/rotate/
@@ -319,7 +374,11 @@ class TeamRotateTest(_AuthenticatedAPITest):
super().setUp()
_seed_signing_key()
self.team = Team.objects.create(
id=uuid.uuid4(), name="t", active=True, active_jti=uuid.uuid4()
id=uuid.uuid4(),
name="t",
owner=self.user,
active=True,
active_jti=uuid.uuid4(),
)
self.url = reverse(
"mcp-server-api:team-rotate",
@@ -357,3 +416,11 @@ class TeamRotateTest(_AuthenticatedAPITest):
self.assertEqual(
resp.status_code, status.HTTP_503_SERVICE_UNAVAILABLE
)
def test_rotate_by_non_owner_returns_404(self):
before = self.team.active_jti
self.client.force_authenticate(user=self.other_user)
resp = self.client.post(self.url)
self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND)
self.team.refresh_from_db()
self.assertEqual(self.team.active_jti, before)

View File

@@ -12,6 +12,7 @@ import time
import uuid
import jwt as pyjwt
from django.contrib.auth import get_user_model
from django.test import TestCase
from mcp_server.models import MCPSigningKey, Team
@@ -24,10 +25,11 @@ def _make_key(kid: str = "k", is_active: bool = True) -> MCPSigningKey:
)
def _make_team(**overrides) -> Team:
def _make_team(owner, **overrides) -> Team:
data = dict(
id=uuid.uuid4(),
name="t",
owner=owner,
active=True,
active_jti=uuid.uuid4(),
)
@@ -36,9 +38,15 @@ def _make_team(**overrides) -> Team:
class MintTeamJWTHappyPathTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = get_user_model().objects.create_user(
username="alice", password="pw"
)
def setUp(self):
self.key = _make_key("k-1")
self.team = _make_team()
self.team = _make_team(self.owner)
def test_returns_signed_jwt_string(self):
token = mint_team_jwt(self.team)
@@ -88,15 +96,21 @@ class MintTeamJWTHappyPathTest(TestCase):
class MintTeamJWTFailureModesTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.owner = get_user_model().objects.create_user(
username="alice", password="pw"
)
def test_no_signing_key_raises(self):
team = _make_team()
team = _make_team(self.owner)
with self.assertRaises(TeamJWTError) as ctx:
mint_team_jwt(team)
self.assertIn("signing", str(ctx.exception).lower())
def test_missing_active_jti_raises(self):
_make_key()
team = _make_team(active_jti=None)
team = _make_team(self.owner, active_jti=None)
with self.assertRaises(TeamJWTError) as ctx:
mint_team_jwt(team)
# Should name the thing the caller forgot to do.
@@ -106,6 +120,6 @@ class MintTeamJWTFailureModesTest(TestCase):
MCPSigningKey.objects.create(
kid="broken", secret_hex="not-hex!!", is_active=True
)
team = _make_team()
team = _make_team(self.owner)
with self.assertRaises(TeamJWTError):
mint_team_jwt(team)

View File

@@ -141,14 +141,14 @@
</div>
</form>
<!-- Daedalus API Token — separate form, outside the settings form -->
<!-- API Token — separate form, outside the settings form -->
<div class="card bg-base-200 mb-6">
<div class="card-body">
<h2 class="card-title text-lg">Daedalus API Token</h2>
<h2 class="card-title text-lg">API Token</h2>
<p class="text-sm opacity-70 mb-4">
Used by Daedalus to authenticate with Mnemosyne. Set
<code class="font-mono text-xs">DAEDALUS_MNEMOSYNE_API_KEY</code>
in your Daedalus environment to this value.
Authenticates programmatic clients (Daedalus, scripts, IDE
integrations) to Mnemosyne. Has the same access as your web
session — keep it secret.
</p>
<div class="flex items-center gap-3">
<code class="font-mono bg-base-300 px-3 py-2 rounded flex-1 break-all select-all text-sm">{{ api_token.key }}</code>
@@ -160,7 +160,7 @@
</div>
<div class="mt-3">
<form method="post" action="{% url 'themis:api-token-regenerate' %}"
onsubmit="return confirm('Regenerate token? Daedalus will stop working until you update DAEDALUS_MNEMOSYNE_API_KEY.')">
onsubmit="return confirm('Regenerate token? Any client using the current token will stop working until updated.')">
{% csrf_token %}
<button type="submit" class="btn btn-warning btn-sm">Regenerate</button>
</form>