- Rename MCPToken to UserToken across models, views, and tests - Update URL names from mcp-token-* to token-* - Add Daedalus/Pallas integration design doc (v2) - Switch docker-compose to build local mnemosyne:local image via shared build config instead of pulling from git.helu.ca
268 lines
9.3 KiB
Python
268 lines
9.3 KiB
Python
"""Tests for the Team / LibraryMembership / TeamWorkspaceAssignment models.
|
|
|
|
``UserToken``'s hash-at-rest semantics live in ``test_token.py``; this
|
|
module exercises the new Phase 2 tables introduced by
|
|
``docs/DAEDALUS_PALLAS_INTEGRATION_v1.md`` §4 plus the
|
|
``allowed_libraries`` JSONField attached to the existing
|
|
:class:`~mcp_server.models.UserToken`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import uuid
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.db import IntegrityError, transaction
|
|
from django.test import TestCase
|
|
|
|
from mcp_server.models import (
|
|
LibraryMembership,
|
|
MCPSigningKey,
|
|
UserToken,
|
|
Team,
|
|
TeamWorkspaceAssignment,
|
|
)
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# LibraryMembership
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class LibraryMembershipTest(TestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(username="u", password="pw")
|
|
|
|
def test_role_choices_exposed(self):
|
|
self.assertEqual(LibraryMembership.Role.OWNER, "owner")
|
|
self.assertEqual(LibraryMembership.Role.MANAGER, "manager")
|
|
self.assertEqual(LibraryMembership.Role.READER, "reader")
|
|
|
|
def test_unique_per_user_and_library(self):
|
|
LibraryMembership.objects.create(
|
|
user=self.user,
|
|
library_uid="lib-1",
|
|
role=LibraryMembership.Role.OWNER,
|
|
)
|
|
# Second membership on the same (user, library_uid) must fail
|
|
# even if the role differs — callers consolidate to the
|
|
# higher role rather than stacking rows.
|
|
with transaction.atomic():
|
|
with self.assertRaises(IntegrityError):
|
|
LibraryMembership.objects.create(
|
|
user=self.user,
|
|
library_uid="lib-1",
|
|
role=LibraryMembership.Role.READER,
|
|
)
|
|
|
|
def test_same_library_different_users_allowed(self):
|
|
other = User.objects.create_user(username="u2", password="pw")
|
|
LibraryMembership.objects.create(
|
|
user=self.user,
|
|
library_uid="lib-1",
|
|
role=LibraryMembership.Role.OWNER,
|
|
)
|
|
LibraryMembership.objects.create(
|
|
user=other,
|
|
library_uid="lib-1",
|
|
role=LibraryMembership.Role.READER,
|
|
)
|
|
self.assertEqual(LibraryMembership.objects.count(), 2)
|
|
|
|
def test_same_user_different_libraries_allowed(self):
|
|
LibraryMembership.objects.create(
|
|
user=self.user,
|
|
library_uid="lib-1",
|
|
role=LibraryMembership.Role.OWNER,
|
|
)
|
|
LibraryMembership.objects.create(
|
|
user=self.user,
|
|
library_uid="lib-2",
|
|
role=LibraryMembership.Role.MANAGER,
|
|
)
|
|
self.assertEqual(
|
|
set(
|
|
LibraryMembership.objects.filter(user=self.user)
|
|
.values_list("library_uid", flat=True)
|
|
),
|
|
{"lib-1", "lib-2"},
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# UserToken.allowed_libraries
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class UserTokenAllowedLibrariesTest(TestCase):
|
|
def setUp(self):
|
|
self.user = User.objects.create_user(username="u", password="pw")
|
|
|
|
def test_defaults_to_empty_list(self):
|
|
token, _ = UserToken.objects.create_token(user=self.user, name="t")
|
|
self.assertEqual(token.allowed_libraries, [])
|
|
|
|
def test_create_token_accepts_allowed_libraries(self):
|
|
token, _ = UserToken.objects.create_token(
|
|
user=self.user, name="t", allowed_libraries=["lib-a", "lib-b"]
|
|
)
|
|
self.assertEqual(token.allowed_libraries, ["lib-a", "lib-b"])
|
|
|
|
def test_allowed_libraries_round_trips(self):
|
|
token, _ = UserToken.objects.create_token(
|
|
user=self.user,
|
|
name="t",
|
|
allowed_libraries=["lib-a", "lib-b", "lib-c"],
|
|
)
|
|
token.refresh_from_db()
|
|
self.assertEqual(
|
|
token.allowed_libraries, ["lib-a", "lib-b", "lib-c"]
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Team + rotate_jti / deactivate
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TeamTest(TestCase):
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
cls.owner = get_user_model().objects.create_user(
|
|
username="alice", password="pw"
|
|
)
|
|
|
|
def test_create_with_explicit_uuid(self):
|
|
tid = uuid.uuid4()
|
|
team = Team.objects.create(id=tid, name="Harper", owner=self.owner)
|
|
self.assertEqual(team.id, tid)
|
|
self.assertTrue(team.active)
|
|
self.assertIsNone(team.active_jti)
|
|
|
|
def test_rotate_jti_installs_fresh_uuid(self):
|
|
team = Team.objects.create(id=uuid.uuid4(), name="t", owner=self.owner)
|
|
first = team.rotate_jti()
|
|
self.assertIsInstance(first, uuid.UUID)
|
|
self.assertEqual(team.active_jti, first)
|
|
|
|
second = team.rotate_jti()
|
|
self.assertNotEqual(first, second)
|
|
self.assertEqual(team.active_jti, second)
|
|
|
|
def test_rotate_jti_persists(self):
|
|
team = Team.objects.create(id=uuid.uuid4(), name="t", owner=self.owner)
|
|
team.rotate_jti()
|
|
# Reload from DB and make sure the UUID was committed.
|
|
reloaded = Team.objects.get(pk=team.id)
|
|
self.assertEqual(reloaded.active_jti, team.active_jti)
|
|
|
|
def test_deactivate_clears_active_jti(self):
|
|
team = Team.objects.create(id=uuid.uuid4(), name="t", owner=self.owner)
|
|
team.rotate_jti()
|
|
self.assertTrue(team.active)
|
|
team.deactivate()
|
|
self.assertFalse(team.active)
|
|
self.assertIsNone(team.active_jti)
|
|
# And it persisted.
|
|
reloaded = Team.objects.get(pk=team.id)
|
|
self.assertFalse(reloaded.active)
|
|
self.assertIsNone(reloaded.active_jti)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# TeamWorkspaceAssignment
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TeamWorkspaceAssignmentTest(TestCase):
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
cls.owner = get_user_model().objects.create_user(
|
|
username="alice", password="pw"
|
|
)
|
|
|
|
def setUp(self):
|
|
self.team = Team.objects.create(
|
|
id=uuid.uuid4(), name="t", owner=self.owner
|
|
)
|
|
|
|
def test_unique_team_workspace_pair(self):
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=self.team, workspace_id="ws-a"
|
|
)
|
|
with transaction.atomic():
|
|
with self.assertRaises(IntegrityError):
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=self.team, workspace_id="ws-a"
|
|
)
|
|
|
|
def test_same_workspace_different_teams_allowed(self):
|
|
other = Team.objects.create(
|
|
id=uuid.uuid4(), name="t2", owner=self.owner
|
|
)
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=self.team, workspace_id="ws-a"
|
|
)
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=other, workspace_id="ws-a"
|
|
)
|
|
self.assertEqual(TeamWorkspaceAssignment.objects.count(), 2)
|
|
|
|
def test_cascade_on_team_delete(self):
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=self.team, workspace_id="ws-a"
|
|
)
|
|
self.team.delete()
|
|
self.assertEqual(TeamWorkspaceAssignment.objects.count(), 0)
|
|
|
|
def test_related_name_workspace_assignments(self):
|
|
# auth._libraries_for_team reaches through ``team.workspace_assignments``;
|
|
# make sure that attribute actually works.
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=self.team, workspace_id="ws-a"
|
|
)
|
|
TeamWorkspaceAssignment.objects.create(
|
|
team=self.team, workspace_id="ws-b"
|
|
)
|
|
ws_ids = sorted(
|
|
self.team.workspace_assignments.values_list(
|
|
"workspace_id", flat=True
|
|
)
|
|
)
|
|
self.assertEqual(ws_ids, ["ws-a", "ws-b"])
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# MCPSigningKey.objects.current() — used by mint_team_jwt
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class MCPSigningKeyCurrentTest(TestCase):
|
|
def test_none_when_no_keys(self):
|
|
self.assertIsNone(MCPSigningKey.objects.current())
|
|
|
|
def test_returns_active_not_retired(self):
|
|
retired = MCPSigningKey.objects.create(
|
|
kid="old", secret_hex="a" * 64, is_active=False
|
|
)
|
|
active = MCPSigningKey.objects.create(
|
|
kid="new", secret_hex="b" * 64, is_active=True
|
|
)
|
|
self.assertEqual(MCPSigningKey.objects.current().pk, active.pk)
|
|
self.assertNotEqual(MCPSigningKey.objects.current().pk, retired.pk)
|
|
|
|
def test_returns_newest_when_multiple_active(self):
|
|
older = MCPSigningKey.objects.create(
|
|
kid="older", secret_hex="a" * 64, is_active=True
|
|
)
|
|
newer = MCPSigningKey.objects.create(
|
|
kid="newer", secret_hex="b" * 64, is_active=True
|
|
)
|
|
self.assertEqual(MCPSigningKey.objects.current().pk, newer.pk)
|
|
# Sanity: older is still active, just older.
|
|
self.assertTrue(
|
|
MCPSigningKey.objects.filter(pk=older.pk, is_active=True).exists()
|
|
)
|