Refine the phase-2 integration spec to reflect implementation details: - Change `resolved_libraries` from `set[str]` to ordered `list[str]` - Document `MCPToken.allowed_libraries` as JSONField (not M2M) since Library lives in Neo4j, not Django's ORM - Clarify that `Library.workspace_id` is a content-routing attribute, not an authorization axis - Describe retirement of the three-branch `_WORKSPACE_SCOPE_CLAUSE` in favor of a single `lib.uid IN $resolved_libraries` check - Specify team JWT resolution via `TeamWorkspaceAssignment` DB join - Note admin UI materializes full Library UID list explicitly
102 lines
3.2 KiB
Python
102 lines
3.2 KiB
Python
"""Team JWT minting.
|
|
|
|
Team tokens are the ``typ=team`` variant described in §5.2 of
|
|
``docs/DAEDALUS_PALLAS_INTEGRATION_v1.md``: long-lived (10 years),
|
|
no per-request scope claims, revocable via ``Team.active`` and
|
|
``Team.active_jti``. They are signed with whichever
|
|
:class:`~mcp_server.models.MCPSigningKey` is currently
|
|
``MCPSigningKey.objects.current()`` — the same keyring the legacy
|
|
per-turn path uses.
|
|
|
|
Typical flow from the REST API:
|
|
|
|
team = Team.objects.get(pk=...)
|
|
team.rotate_jti() # installs a fresh active_jti
|
|
jwt_string = mint_team_jwt(team) # signs with current kid + new jti
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
|
|
import jwt as pyjwt
|
|
|
|
from .models import MCPSigningKey, Team
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Team JWT lifetime. 10 years in seconds — matches the "operator cannot
|
|
# tolerate a silent expiry-induced outage" rationale in §13.1.
|
|
_TEAM_TOKEN_LIFETIME_SECONDS = 10 * 365 * 24 * 60 * 60
|
|
|
|
|
|
class TeamJWTError(Exception):
|
|
"""Raised when a team JWT cannot be minted (e.g. no active signing key)."""
|
|
|
|
|
|
def mint_team_jwt(team: Team) -> str:
|
|
"""Mint a team JWT for ``team`` using the current active signing key.
|
|
|
|
Requires:
|
|
|
|
* An active :class:`MCPSigningKey` to exist (seed one via
|
|
``python manage.py seed_signing_key``).
|
|
* ``team.active_jti`` to be set — callers should invoke
|
|
:meth:`Team.rotate_jti` immediately before minting so each
|
|
issuance ties to a fresh UUID, invalidating whatever token
|
|
preceded it.
|
|
|
|
The claim shape mirrors §5.2:
|
|
|
|
* ``iss`` = ``"mnemosyne"`` — distinguishes from Daedalus per-turn.
|
|
* ``aud`` = ``"mnemosyne"`` — informational; not enforced at verify.
|
|
* ``typ`` = ``"team"`` — the single discriminator the middleware
|
|
uses to pick the team branch.
|
|
* ``sub`` = ``"team:<uuid>"`` — carries the team's primary key.
|
|
* ``jti`` = ``str(active_jti)`` — validated against the DB on every
|
|
request, so rotate → old token dies.
|
|
"""
|
|
key = MCPSigningKey.objects.current()
|
|
if key is None:
|
|
raise TeamJWTError(
|
|
"No active MCPSigningKey to sign the team JWT. "
|
|
"Seed one via `python manage.py seed_signing_key`."
|
|
)
|
|
if team.active_jti is None:
|
|
raise TeamJWTError(
|
|
f"Team {team.id} has no active_jti; call rotate_jti() before mint."
|
|
)
|
|
|
|
try:
|
|
secret = bytes.fromhex(key.secret_hex)
|
|
except ValueError as exc:
|
|
raise TeamJWTError(
|
|
f"Stored secret for kid={key.kid!r} is not valid hex: {exc}"
|
|
)
|
|
|
|
now = int(time.time())
|
|
payload = {
|
|
"iss": "mnemosyne",
|
|
"aud": "mnemosyne",
|
|
"sub": f"team:{team.id}",
|
|
"typ": "team",
|
|
"iat": now,
|
|
"exp": now + _TEAM_TOKEN_LIFETIME_SECONDS,
|
|
"jti": str(team.active_jti),
|
|
}
|
|
token = pyjwt.encode(
|
|
payload,
|
|
secret,
|
|
algorithm="HS256",
|
|
headers={"kid": key.kid},
|
|
)
|
|
logger.info(
|
|
"team_jwt_minted team_id=%s kid=%s jti=%s",
|
|
team.id,
|
|
key.kid,
|
|
team.active_jti,
|
|
)
|
|
return token
|