- Rename MCPToken to UserToken across models, views, and tests - Update URL names from mcp-token-* to token-* - Add Daedalus/Pallas integration design doc (v2) - Switch docker-compose to build local mnemosyne:local image via shared build config instead of pulling from git.helu.ca
132 lines
5.1 KiB
Python
132 lines
5.1 KiB
Python
"""Tests for ``mcp_server.drf_auth.UserTokenAuthentication``.
|
|
|
|
Authenticates DRF endpoints using a per-user ``UserToken`` carried as
|
|
``Authorization: Bearer <plaintext>``. The class wraps
|
|
``resolve_mcp_user``; these tests assert the DRF-side behaviour
|
|
(header parsing, error mapping, integration with ``IsAuthenticated``)
|
|
on top of the resolver's own coverage in ``test_auth.py``.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import timedelta
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.test import TestCase, override_settings
|
|
from django.urls import path
|
|
from django.utils import timezone
|
|
from rest_framework import status
|
|
from rest_framework.decorators import api_view, permission_classes
|
|
from rest_framework.permissions import IsAuthenticated
|
|
from rest_framework.response import Response
|
|
from rest_framework.test import APIClient
|
|
|
|
from mcp_server.models import UserToken
|
|
|
|
|
|
User = get_user_model()
|
|
|
|
|
|
# A tiny endpoint mounted only for these tests so we can exercise the
|
|
# DRF auth pipeline without coupling to any real app's view contract.
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def _whoami(request):
|
|
return Response({"username": request.user.username})
|
|
|
|
|
|
urlpatterns = [
|
|
path("__test_whoami__/", _whoami),
|
|
]
|
|
|
|
|
|
@override_settings(ROOT_URLCONF=__name__)
|
|
class UserTokenAuthenticationTest(TestCase):
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
cls.user = User.objects.create_user(username="alice", password="pw")
|
|
|
|
def setUp(self):
|
|
self.client = APIClient()
|
|
self.token, self.plaintext = UserToken.objects.create_token(
|
|
user=self.user, name="t"
|
|
)
|
|
|
|
def _get(self, header=None):
|
|
kwargs = {}
|
|
if header is not None:
|
|
kwargs["HTTP_AUTHORIZATION"] = header
|
|
return self.client.get("/__test_whoami__/", **kwargs)
|
|
|
|
def test_no_header_returns_401_with_bearer_challenge(self):
|
|
resp = self._get()
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
# RFC 7235: anonymous request must include WWW-Authenticate so the
|
|
# client knows how to authenticate.
|
|
self.assertEqual(resp["WWW-Authenticate"], "Bearer")
|
|
|
|
def test_valid_bearer_authenticates(self):
|
|
resp = self._get(f"Bearer {self.plaintext}")
|
|
self.assertEqual(resp.status_code, status.HTTP_200_OK)
|
|
self.assertEqual(resp.json(), {"username": "alice"})
|
|
|
|
def test_invalid_bearer_returns_401(self):
|
|
resp = self._get("Bearer not-a-real-token")
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_inactive_token_returns_401(self):
|
|
self.token.is_active = False
|
|
self.token.save(update_fields=["is_active"])
|
|
resp = self._get(f"Bearer {self.plaintext}")
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_expired_token_returns_401(self):
|
|
self.token.expires_at = timezone.now() - timedelta(hours=1)
|
|
self.token.save(update_fields=["expires_at"])
|
|
resp = self._get(f"Bearer {self.plaintext}")
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_disabled_user_returns_401(self):
|
|
self.user.is_active = False
|
|
self.user.save(update_fields=["is_active"])
|
|
resp = self._get(f"Bearer {self.plaintext}")
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_wrong_keyword_falls_through(self):
|
|
# ``Token <plaintext>`` is the old DRF authtoken keyword. The new
|
|
# class only accepts ``Bearer``; a stale ``Token`` header is not
|
|
# ours to consume — we return None and let the next auth class
|
|
# try. SessionAuthentication doesn't accept it either, so the
|
|
# request lands anonymous and IsAuthenticated returns 401.
|
|
resp = self._get(f"Token {self.plaintext}")
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_malformed_header_too_many_parts_returns_401(self):
|
|
resp = self._get(f"Bearer {self.plaintext} extra")
|
|
self.assertEqual(resp.status_code, status.HTTP_401_UNAUTHORIZED)
|
|
|
|
def test_request_auth_stashes_token(self):
|
|
# The auth class returns (user, token); DRF places the token on
|
|
# request.auth. Re-use a UserToken-aware endpoint to verify.
|
|
@api_view(["GET"])
|
|
@permission_classes([IsAuthenticated])
|
|
def echo_token(request):
|
|
return Response({"token_name": request.auth.name})
|
|
|
|
from django.urls import path as _path
|
|
|
|
with override_settings(ROOT_URLCONF=__name__):
|
|
# Mount the extra endpoint via a per-test urlpatterns swap.
|
|
# Simpler: just call the resolver directly to confirm the
|
|
# auth class returns the (user, token) tuple it should.
|
|
from mcp_server.drf_auth import UserTokenAuthentication
|
|
from django.test import RequestFactory
|
|
|
|
request = RequestFactory().get(
|
|
"/__test_whoami__/",
|
|
HTTP_AUTHORIZATION=f"Bearer {self.plaintext}",
|
|
)
|
|
user, token = UserTokenAuthentication().authenticate(request)
|
|
self.assertEqual(user.pk, self.user.pk)
|
|
self.assertEqual(token.pk, self.token.pk)
|