refactor: remove forward_inbound_auth, add traceback capture patches
Retire the per-turn bearer-token forwarding mechanism in favor of transparent authentication via operator-configured headers in fastagent.secrets.yaml. Agents now rely on long-lived team JWTs configured per downstream MCP server. Replace the token-forwarding patches with debug-only traceback-capture wrappers around three opaque fast-agent catch-sites that previously flattened exceptions to bare strings, making downstream transport errors diagnosable. Update README with authentication guidance and deprecation notice for the retired `forward_inbound_auth: true` flag (now silently ignored).
This commit is contained in:
40
README.md
40
README.md
@@ -110,5 +110,43 @@ with fast-agent's `ModelDatabase`.
|
|||||||
|---|---|
|
|---|---|
|
||||||
| `pallas.server` | CLI entry point and agent orchestration |
|
| `pallas.server` | CLI entry point and agent orchestration |
|
||||||
| `pallas.registry` | `GET /.well-known/mcp/server.json` registry server |
|
| `pallas.registry` | `GET /.well-known/mcp/server.json` registry server |
|
||||||
| `pallas.multimodal_server` | `MultimodalAgentMCPServer` — `AgentMCPServer` subclass with image support |
|
| `pallas.multimodal_server` | `MultimodalAgentMCPServer` — `AgentMCPServer` subclass with image + history support |
|
||||||
| `pallas.health` | LLM preflight validation + `get_health` MCP tool |
|
| `pallas.health` | LLM preflight validation + `get_health` MCP tool |
|
||||||
|
| `pallas._fastagent_patch` | Traceback-capture wrappers around three opaque fast-agent catch-sites (debug-only) |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Authentication
|
||||||
|
|
||||||
|
Pallas is **transparent** to downstream authentication. Whatever the operator
|
||||||
|
places under each downstream MCP server's `headers:` block in
|
||||||
|
`fastagent.config.yaml` (typically loaded from `fastagent.secrets.yaml`) is what
|
||||||
|
fast-agent sends — Pallas does not intercept, rewrite, or forward the inbound
|
||||||
|
`Authorization` header of the MCP request that triggered the agent turn.
|
||||||
|
|
||||||
|
For agents that talk to Mnemosyne, the convention is a long-lived team JWT
|
||||||
|
minted from Mnemosyne's admin UI and pasted into the agent project's
|
||||||
|
`fastagent.secrets.yaml`:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
mcp:
|
||||||
|
servers:
|
||||||
|
mnemosyne:
|
||||||
|
transport: http
|
||||||
|
url: https://mnemosyne.example.com/mcp/
|
||||||
|
headers:
|
||||||
|
Authorization: "Bearer eyJ…team-jwt…"
|
||||||
|
```
|
||||||
|
|
||||||
|
See
|
||||||
|
[`mnemosyne/docs/DAEDALUS_PALLAS_INTEGRATION_v1.md`](https://git.helu.ca/r/mnemosyne/src/branch/main/docs/DAEDALUS_PALLAS_INTEGRATION_v1.md)
|
||||||
|
for the three credential types Mnemosyne recognises, how team JWTs are
|
||||||
|
minted and rotated, and the data model that ties a team to a set of
|
||||||
|
libraries.
|
||||||
|
|
||||||
|
> Earlier versions of Pallas shipped a `forward_inbound_auth: true`
|
||||||
|
> mechanism that captured the per-turn `Authorization` header and
|
||||||
|
> propagated it to opted-in downstream servers. That mechanism has been
|
||||||
|
> retired — opt-in flags in old `fastagent.config.yaml` files are now
|
||||||
|
> silently ignored and can be removed at your convenience.
|
||||||
|
|
||||||
|
|||||||
@@ -1,349 +1,50 @@
|
|||||||
"""Forward the inbound bearer token to opted-in downstream MCP servers.
|
"""fast-agent runtime patches — traceback capture on three opaque catch-sites.
|
||||||
|
|
||||||
fast-agent (≤0.6.19) captures the inbound ``Authorization: Bearer <X>`` into
|
fast-agent's transport layer catches every downstream-transport exception at
|
||||||
the ``request_bearer_token`` ContextVar, but does NOT propagate that value to
|
several nesting levels, logs only ``str(exc)`` (no ``exc_info=True``), and
|
||||||
outgoing MCP transport calls — ``_prepare_headers_and_auth`` only reads
|
re-raises. By the time the exception surfaces to the MCP tool result, the
|
||||||
``server_config.headers``. This module patches ``_prepare_headers_and_auth``
|
traceback has been flattened to a bare string — the canonical symptom being
|
||||||
so a downstream server marked ``forward_inbound_auth: true`` in
|
``"object NoneType can't be used in 'await' expression"`` with no stack
|
||||||
``fastagent.config.yaml`` receives the same bearer the FastAgent itself was
|
attached. This module wraps three of those catch-sites so Pallas emits
|
||||||
called with.
|
``logger.exception(...)`` with the full frame before fast-agent's swallowing
|
||||||
|
``except`` runs. Behaviour is otherwise unchanged: every wrapper re-raises
|
||||||
|
the exception it caught.
|
||||||
|
|
||||||
Opt-in is per-server because a FastAgent with multiple downstream MCP
|
The three wrapped entry points are:
|
||||||
attachments (e.g. Mnemosyne + a public weather server) must not leak its
|
|
||||||
credentials to every endpoint.
|
|
||||||
|
|
||||||
Why the simple "read from ``request_bearer_token``" approach does NOT work
|
1. ``MCPAgentClientSession.send_request`` — the lowest-level send call;
|
||||||
----------------------------------------------------------------------------
|
2. ``MCPAgentClientSession.call_tool`` — the session-side wrapper around
|
||||||
``MCPConnectionManager.launch_server`` spawns the server's transport task in
|
meta merge, permission handling, progress callback factory, and the
|
||||||
``self._tg`` — a long-lived ``anyio.TaskGroup`` created at manager startup.
|
send_request invocation itself;
|
||||||
``TaskGroup.start_soon`` copies the owning task's ``contextvars.Context`` at
|
3. ``MCPAggregator._execute_on_server`` — the aggregator's setup around
|
||||||
spawn time, which is the *startup* context, not the per-request context.
|
the client call (server lookup, session factory, tracer span,
|
||||||
The transport-preparation code therefore sees ``request_bearer_token.get()``
|
``try_execute`` harness).
|
||||||
as ``None`` even when the MCP request handler has just ``set`` it a few
|
|
||||||
frames up. Worse, ``launch_server`` runs once per downstream and the
|
|
||||||
persistent connection is reused, so the very first request's (often
|
|
||||||
empty) context is cached forever.
|
|
||||||
|
|
||||||
The fix is to hand the bearer through the only object that *is* shared
|
Any one wrapper being triggered while the other two stay silent pinpoints
|
||||||
between the two tasks: the ``MCPServerSettings`` instance that both paths
|
which frame is swallowing the exception, which is how we debug opaque
|
||||||
pass into ``_prepare_headers_and_auth``. ``pallas.multimodal_server``
|
transport failures.
|
||||||
registers the inbound bearer against ``id(server_config)`` in a
|
|
||||||
process-wide registry for the duration of each MCP request; this patch
|
|
||||||
reads it there and forges an ``Authorization`` header onto the outgoing
|
|
||||||
transport. Cleanup is guaranteed in the request handler's ``finally``.
|
|
||||||
|
|
||||||
TODO: drop after the equivalent change lands in fast-agent upstream.
|
Historical note: this file used to also carry the bearer-forwarding patch
|
||||||
|
that propagated inbound ``Authorization`` headers to opted-in downstream
|
||||||
|
MCP servers. That mechanism was retired once Mnemosyne moved to static
|
||||||
|
team JWTs carried in ``fastagent.config.yaml`` ``headers:`` entries — see
|
||||||
|
``mnemosyne/docs/DAEDALUS_PALLAS_INTEGRATION_v1.md``. Pallas is now
|
||||||
|
transparent to auth: whatever the operator places in each downstream
|
||||||
|
server's ``headers.Authorization`` is what fast-agent sends, full stop.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
|
||||||
import threading
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
|
|
||||||
from fast_agent.mcp import mcp_connection_manager as _mcm
|
|
||||||
from fast_agent.mcp import mcp_agent_client_session as _macs
|
|
||||||
from fast_agent.mcp import mcp_aggregator as _magg
|
from fast_agent.mcp import mcp_aggregator as _magg
|
||||||
from fast_agent.mcp.auth.context import request_bearer_token
|
from fast_agent.mcp import mcp_agent_client_session as _macs
|
||||||
|
|
||||||
logger = logging.getLogger("pallas.forward")
|
logger = logging.getLogger("pallas.forward")
|
||||||
_trace_logger = logging.getLogger("pallas.forward.trace")
|
_trace_logger = logging.getLogger("pallas.forward.trace")
|
||||||
|
|
||||||
|
|
||||||
class _DynamicBearerAuth(httpx.Auth):
|
|
||||||
"""Per-request ``Authorization`` injection for persistent MCP connections.
|
|
||||||
|
|
||||||
fast-agent's ``create_mcp_http_client(headers=..., auth=...)`` snapshots
|
|
||||||
the ``headers`` dict at client construction time — every subsequent
|
|
||||||
``tools/call`` reuses the *same* open connection, so a static
|
|
||||||
``Authorization`` header set at handshake is the only one the downstream
|
|
||||||
server ever sees. For workspace-scoped forwarding that's fatal: the
|
|
||||||
first request (often a startup probe) has no bearer, and every later
|
|
||||||
request that *does* carry a bearer inherits the probe's empty header.
|
|
||||||
|
|
||||||
httpx's ``auth`` parameter, however, is consulted on **every** outgoing
|
|
||||||
request via ``Auth.sync_auth_flow`` / ``async_auth_flow``. We use that
|
|
||||||
to look up the current ``_pending_bearers`` entry for ``server_config``
|
|
||||||
and stamp ``Authorization`` onto each request individually — no stale
|
|
||||||
caching, no handshake/tool-call skew.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Per-connection-reuse so ``httpx.AsyncClient`` can share us across
|
|
||||||
# streams; the lookup is keyed by ``id(server_config)`` so different
|
|
||||||
# servers (even same-named clones) stay isolated.
|
|
||||||
requires_request_body = False
|
|
||||||
requires_response_body = False
|
|
||||||
|
|
||||||
def __init__(self, server_config: Any) -> None:
|
|
||||||
self._server_config = server_config
|
|
||||||
self._server_name = getattr(server_config, "name", "?")
|
|
||||||
|
|
||||||
def _current_token(self) -> str | None:
|
|
||||||
return _lookup_bearer(self._server_config)
|
|
||||||
|
|
||||||
def _inject(self, request: httpx.Request) -> None:
|
|
||||||
token = self._current_token()
|
|
||||||
if token:
|
|
||||||
request.headers["Authorization"] = f"Bearer {token}"
|
|
||||||
logger.debug(
|
|
||||||
"forward.applied server=%s token_len=%d prefix=%s via=auth_flow",
|
|
||||||
self._server_name,
|
|
||||||
len(token),
|
|
||||||
token[:8],
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.debug(
|
|
||||||
"forward.skipped server=%s reason=no_inbound_bearer via=auth_flow",
|
|
||||||
self._server_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
def auth_flow(self, request: httpx.Request):
|
|
||||||
# Both ``sync_auth_flow`` and ``async_auth_flow`` on httpx.Auth
|
|
||||||
# delegate to ``auth_flow`` when subclasses override only the
|
|
||||||
# generic path, which is exactly what we want: one implementation
|
|
||||||
# that works for both sync and async clients. httpx drives this
|
|
||||||
# as a *plain* generator (the async side resolves the yielded
|
|
||||||
# request via its own await machinery), so do NOT mark this
|
|
||||||
# ``async def`` — that triggers
|
|
||||||
# ``object NoneType can't be used in 'await' expression``.
|
|
||||||
self._inject(request)
|
|
||||||
yield request
|
|
||||||
|
|
||||||
# ── Opt-in server names discovered from raw YAML ──────────────────────────────
|
|
||||||
# Fast-agent's ``Settings(**merged)`` pipeline silently discards unknown keys
|
|
||||||
# on nested ``MCPServerSettings`` instances — even with ``extra="allow"`` set
|
|
||||||
# on the parent and the model rebuilt — because ``nested_model_default_partial_update``
|
|
||||||
# takes a path through ``model_construct`` that drops ``model_extra``.
|
|
||||||
#
|
|
||||||
# Rather than fight pydantic's nested-model plumbing, we parse the YAML
|
|
||||||
# directly ourselves at patch-install time and build a set of server names
|
|
||||||
# that carry ``forward_inbound_auth: true``. The patched
|
|
||||||
# ``_prepare_headers_and_auth`` looks up the name (stable and authoritative
|
|
||||||
# regardless of Pydantic gymnastics) instead of asking the config object.
|
|
||||||
_FORWARD_SERVERS: set[str] = set()
|
|
||||||
|
|
||||||
_original_prepare = _mcm._prepare_headers_and_auth
|
|
||||||
|
|
||||||
# ── Per-request bearer registry ──────────────────────────────────────────────
|
|
||||||
# Keyed by ``id(server_config)`` so a request handler can publish the bearer
|
|
||||||
# that applies to each opted-in downstream server. The registry survives the
|
|
||||||
# context-var-loss hop across anyio task groups because ``id()`` is stable and
|
|
||||||
# the config object itself is held by fast-agent's ServerRegistry.
|
|
||||||
#
|
|
||||||
# A threading.Lock (not asyncio) is used because both the publishing side
|
|
||||||
# (request handler) and the reading side (``_prepare_headers_and_auth``, run
|
|
||||||
# inside the connection manager's task group) may execute on different anyio
|
|
||||||
# worker threads under uvicorn's default thread-portal setup. Access is
|
|
||||||
# microsecond-scoped — no contention concerns.
|
|
||||||
_pending_bearers: dict[int, str] = {}
|
|
||||||
_pending_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def publish_bearer(server_config: Any, token: str) -> None:
|
|
||||||
"""Register ``token`` as the inbound bearer to forward to this server.
|
|
||||||
|
|
||||||
Called by ``pallas.multimodal_server.send_message`` for every downstream
|
|
||||||
whose config carries ``forward_inbound_auth: true``. Must be paired with
|
|
||||||
``revoke_bearer`` in the same ``try/finally``.
|
|
||||||
"""
|
|
||||||
if not token:
|
|
||||||
return
|
|
||||||
with _pending_lock:
|
|
||||||
_pending_bearers[id(server_config)] = token
|
|
||||||
logger.debug(
|
|
||||||
"forward.published server=%s token_len=%d prefix=%s",
|
|
||||||
getattr(server_config, "name", "?"),
|
|
||||||
len(token),
|
|
||||||
token[:8],
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def revoke_bearer(server_config: Any) -> None:
|
|
||||||
"""Clear any bearer previously published for ``server_config``.
|
|
||||||
|
|
||||||
Always safe to call — a missing key is silently ignored, so request
|
|
||||||
handlers can ``finally: revoke_bearer(cfg)`` without pre-checks.
|
|
||||||
"""
|
|
||||||
with _pending_lock:
|
|
||||||
_pending_bearers.pop(id(server_config), None)
|
|
||||||
logger.debug(
|
|
||||||
"forward.revoked server=%s",
|
|
||||||
getattr(server_config, "name", "?"),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _lookup_bearer(server_config: Any) -> str | None:
|
|
||||||
"""Resolve the bearer to forward for ``server_config``.
|
|
||||||
|
|
||||||
Tries the per-request registry first (works across task groups) and
|
|
||||||
falls back to the ContextVar for cases where the caller lives in the
|
|
||||||
same task (e.g. fast-agent's own non-persistent probe path).
|
|
||||||
"""
|
|
||||||
with _pending_lock:
|
|
||||||
token = _pending_bearers.get(id(server_config))
|
|
||||||
if token:
|
|
||||||
return token
|
|
||||||
try:
|
|
||||||
return request_bearer_token.get()
|
|
||||||
except LookupError:
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _prepare_headers_and_auth_with_forward(server_config, **kwargs):
|
|
||||||
headers, oauth_auth, user_auth_keys = _original_prepare(server_config, **kwargs)
|
|
||||||
|
|
||||||
server_name = getattr(server_config, "name", None) or "?"
|
|
||||||
forward_flag = server_name in _FORWARD_SERVERS
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
"forward.check server=%s forward_inbound_auth=%s",
|
|
||||||
server_name,
|
|
||||||
forward_flag,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not forward_flag:
|
|
||||||
return headers, oauth_auth, user_auth_keys
|
|
||||||
|
|
||||||
if user_auth_keys:
|
|
||||||
logger.debug(
|
|
||||||
"forward.skipped server=%s reason=user_auth_present keys=%s",
|
|
||||||
server_name,
|
|
||||||
sorted(user_auth_keys),
|
|
||||||
)
|
|
||||||
return headers, oauth_auth, user_auth_keys
|
|
||||||
|
|
||||||
if oauth_auth is not None:
|
|
||||||
logger.debug(
|
|
||||||
"forward.skipped server=%s reason=oauth_active",
|
|
||||||
server_name,
|
|
||||||
)
|
|
||||||
return headers, oauth_auth, user_auth_keys
|
|
||||||
|
|
||||||
# Install a dynamic ``httpx.Auth`` instead of baking a static header into
|
|
||||||
# the returned ``headers`` dict. Fast-agent passes the auth object to
|
|
||||||
# ``create_mcp_http_client(auth=...)`` which forwards to
|
|
||||||
# ``httpx.AsyncClient(auth=...)``; httpx then consults it on every
|
|
||||||
# outgoing request via ``async_auth_flow``, reading the *current*
|
|
||||||
# ``_pending_bearers`` entry.
|
|
||||||
#
|
|
||||||
# This dodges the fatal "first handshake wins forever" problem:
|
|
||||||
# persistent MCP connections reuse the open socket across hundreds of
|
|
||||||
# tool-call requests, but the auth flow re-runs per request, so we can
|
|
||||||
# stamp the correct per-turn bearer onto each ``tools/call`` even though
|
|
||||||
# the initial ``initialize`` ran with no bearer at startup.
|
|
||||||
#
|
|
||||||
# We also report it through ``user_auth_keys`` so OAuth scrubbing (see
|
|
||||||
# ``_prepare_headers_and_auth`` upstream) treats Authorization as
|
|
||||||
# caller-owned and doesn't try to kick off an OAuth flow.
|
|
||||||
auth = _DynamicBearerAuth(server_config)
|
|
||||||
user_auth_keys = set(user_auth_keys) | {"Authorization"}
|
|
||||||
logger.debug(
|
|
||||||
"forward.bound server=%s auth=%s",
|
|
||||||
server_name,
|
|
||||||
type(auth).__name__,
|
|
||||||
)
|
|
||||||
# Current token may or may not be set — we don't require one at bind
|
|
||||||
# time because the auth flow will resolve it per-request; logging a
|
|
||||||
# preview when available helps trace the startup probe path.
|
|
||||||
inbound = _lookup_bearer(server_config)
|
|
||||||
if inbound:
|
|
||||||
logger.debug(
|
|
||||||
"forward.applied server=%s token_len=%d prefix=%s via=bind",
|
|
||||||
server_name,
|
|
||||||
len(inbound),
|
|
||||||
inbound[:8],
|
|
||||||
)
|
|
||||||
return headers, auth, user_auth_keys
|
|
||||||
|
|
||||||
|
|
||||||
def _candidate_config_paths() -> list[Path]:
|
|
||||||
"""Paths to scan for ``fastagent.config.yaml``.
|
|
||||||
|
|
||||||
Order matters: the first existing file wins. We mirror fast-agent's
|
|
||||||
``find_config`` discovery rule (cwd then ancestors) and additionally
|
|
||||||
honour a ``FASTAGENT_CONFIG_PATH`` override so tests / ansible-managed
|
|
||||||
deployments can point Pallas at a specific file.
|
|
||||||
"""
|
|
||||||
override = os.environ.get("FASTAGENT_CONFIG_PATH")
|
|
||||||
if override:
|
|
||||||
return [Path(override).expanduser()]
|
|
||||||
|
|
||||||
paths: list[Path] = []
|
|
||||||
cwd = Path.cwd()
|
|
||||||
for p in (cwd, *cwd.parents):
|
|
||||||
paths.append(p / "fastagent.config.yaml")
|
|
||||||
return paths
|
|
||||||
|
|
||||||
|
|
||||||
def _refresh_forward_servers() -> None:
|
|
||||||
"""Populate ``_FORWARD_SERVERS`` from the raw YAML config.
|
|
||||||
|
|
||||||
Parses the YAML ourselves (bypassing fast-agent's ``Settings`` pipeline)
|
|
||||||
because nested pydantic validation silently drops unknown keys on
|
|
||||||
``MCPServerSettings`` — so by the time we'd see the config object,
|
|
||||||
``forward_inbound_auth`` is gone.
|
|
||||||
|
|
||||||
Called both at ``install()`` time and lazily from
|
|
||||||
``_prepare_headers_and_auth_with_forward`` so hot-reloaded configs or
|
|
||||||
late-bound working directories still work. Failure is non-fatal: we
|
|
||||||
simply log and leave ``_FORWARD_SERVERS`` unchanged.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
import yaml
|
|
||||||
except ImportError:
|
|
||||||
logger.warning("pyyaml not available; cannot scan forward_inbound_auth opt-ins")
|
|
||||||
return
|
|
||||||
|
|
||||||
for path in _candidate_config_paths():
|
|
||||||
if not path.exists():
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
with open(path) as fh:
|
|
||||||
data = yaml.safe_load(fh) or {}
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("forward.config_parse_failed path=%s error=%s", path, exc)
|
|
||||||
continue
|
|
||||||
|
|
||||||
servers = (data.get("mcp") or {}).get("servers") or {}
|
|
||||||
if not isinstance(servers, dict):
|
|
||||||
return
|
|
||||||
|
|
||||||
names: set[str] = set()
|
|
||||||
for server_name, server_cfg in servers.items():
|
|
||||||
if not isinstance(server_cfg, dict):
|
|
||||||
continue
|
|
||||||
if server_cfg.get("forward_inbound_auth"):
|
|
||||||
names.add(server_name)
|
|
||||||
|
|
||||||
if names != _FORWARD_SERVERS:
|
|
||||||
_FORWARD_SERVERS.clear()
|
|
||||||
_FORWARD_SERVERS.update(names)
|
|
||||||
logger.info(
|
|
||||||
"forward.opt_in servers=%s source=%s",
|
|
||||||
sorted(_FORWARD_SERVERS),
|
|
||||||
path,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.debug("forward.no_config_found searched=%s", _candidate_config_paths())
|
|
||||||
|
|
||||||
|
|
||||||
# ── send_request traceback capture ───────────────────────────────────────────
|
# ── send_request traceback capture ───────────────────────────────────────────
|
||||||
# fast-agent's ``MCPAgentClientSession.send_request`` catches every
|
|
||||||
# downstream-transport exception, logs ``"send_request failed: <str(e)>"``
|
|
||||||
# *without* ``exc_info=True``, and re-raises — which means the exception
|
|
||||||
# propagates up to the agent loop where it is serialised as a tool result
|
|
||||||
# string (``"object NoneType can't be used in 'await' expression"`` is the
|
|
||||||
# canonical symptom) with no traceback anywhere.
|
|
||||||
#
|
|
||||||
# We wrap ``send_request`` so Pallas can emit ``logger.exception(...)`` with
|
|
||||||
# the full stack before re-raising. The original logger still fires its
|
|
||||||
# one-line summary; our wrapper adds the frames next to it in pallas.log.
|
|
||||||
# No behavioural change — we re-raise the same exception.
|
|
||||||
_original_send_request = _macs.MCPAgentClientSession.send_request
|
_original_send_request = _macs.MCPAgentClientSession.send_request
|
||||||
|
|
||||||
|
|
||||||
@@ -373,30 +74,7 @@ def _patch_send_request() -> None:
|
|||||||
logger.info("send_request traceback-capture patch installed")
|
logger.info("send_request traceback-capture patch installed")
|
||||||
|
|
||||||
|
|
||||||
# ── call_tool / _execute_on_server traceback capture ─────────────────────────
|
# ── call_tool traceback capture ──────────────────────────────────────────────
|
||||||
# The "object NoneType can't be used in 'await' expression" error surfaces
|
|
||||||
# via ``EnrichedMCPToolProgressManager.on_tool_complete`` (message=error),
|
|
||||||
# which is driven by ``MCPAggregator`` at line 2287 catching a generic
|
|
||||||
# ``Exception`` and passing ``str(e)`` downstream. The ``send_request``
|
|
||||||
# wrapper above proved — by its silence — that the exception is NOT raised
|
|
||||||
# inside ``send_request``, so the failing ``await X()`` (where X returns
|
|
||||||
# ``None``) must live in one of the frames between:
|
|
||||||
# * ``MCPAgentClientSession.call_tool`` (override, ~985)
|
|
||||||
# * ``MCPAggregator._execute_on_server.try_execute`` (~1612)
|
|
||||||
# * anything between call_tool and send_request (``_merge_experimental_session_meta``,
|
|
||||||
# the permission handler, the progress-callback factory, span creation, …)
|
|
||||||
#
|
|
||||||
# We install two outer wrappers to triangulate:
|
|
||||||
# 1. ``MCPAgentClientSession.call_tool`` — catches anything raised in the
|
|
||||||
# session's override (meta merge, params construction, send_request invocation
|
|
||||||
# itself, ...);
|
|
||||||
# 2. ``MCPAggregator._execute_on_server`` — catches everything the aggregator
|
|
||||||
# sets up around the client call (get_server, session factory, permission
|
|
||||||
# check, tracer span, progress callback, ``try_execute`` itself).
|
|
||||||
#
|
|
||||||
# Both emit ``logger.exception(...)`` (full stack) before re-raising; the
|
|
||||||
# original control flow is untouched. Once the offending frame is identified
|
|
||||||
# from the resulting traceback, these wrappers can be removed.
|
|
||||||
_original_session_call_tool = _macs.MCPAgentClientSession.call_tool
|
_original_session_call_tool = _macs.MCPAgentClientSession.call_tool
|
||||||
|
|
||||||
|
|
||||||
@@ -425,6 +103,7 @@ def _patch_session_call_tool() -> None:
|
|||||||
logger.info("session.call_tool traceback-capture patch installed")
|
logger.info("session.call_tool traceback-capture patch installed")
|
||||||
|
|
||||||
|
|
||||||
|
# ── _execute_on_server traceback capture ─────────────────────────────────────
|
||||||
_original_execute_on_server = _magg.MCPAggregator._execute_on_server
|
_original_execute_on_server = _magg.MCPAggregator._execute_on_server
|
||||||
|
|
||||||
|
|
||||||
@@ -464,25 +143,13 @@ def _patch_execute_on_server() -> None:
|
|||||||
|
|
||||||
|
|
||||||
def install() -> None:
|
def install() -> None:
|
||||||
# NOTE: we do NOT short-circuit on "already patched" at the top of this
|
"""Install all three trace-capture wrappers.
|
||||||
# function — each individual ``_patch_*`` helper owns its own idempotency
|
|
||||||
# guard, and we want all three trace-capture patches to be applied even
|
Each ``_patch_*`` helper is individually idempotent (guarded on a
|
||||||
# when the bearer-forwarding patch was installed in a previous reload.
|
``_pallas_trace_patched`` attribute), so ``install()`` is safe to call
|
||||||
# Previously a top-level guard on ``_prepare_headers_and_auth`` would
|
repeatedly — e.g. from ``pallas/__init__.py`` on import + again from
|
||||||
# return immediately on a reinstall, leaving the trace wrappers missing
|
a test harness — without stacking wrappers.
|
||||||
# silently — which is exactly the failure we chased.
|
"""
|
||||||
if not getattr(
|
|
||||||
_mcm._prepare_headers_and_auth, "_pallas_forward_patched", False
|
|
||||||
):
|
|
||||||
_refresh_forward_servers()
|
|
||||||
_prepare_headers_and_auth_with_forward._pallas_forward_patched = True # type: ignore[attr-defined]
|
|
||||||
_mcm._prepare_headers_and_auth = _prepare_headers_and_auth_with_forward
|
|
||||||
# INFO so it always appears in the journal at boot — greppable proof
|
|
||||||
# that the patch ran before any agent started.
|
|
||||||
logger.info(
|
|
||||||
"bearer-forwarding patch installed "
|
|
||||||
"(forward_inbound_auth-aware _prepare_headers_and_auth)"
|
|
||||||
)
|
|
||||||
_patch_send_request()
|
_patch_send_request()
|
||||||
_patch_session_call_tool()
|
_patch_session_call_tool()
|
||||||
_patch_execute_on_server()
|
_patch_execute_on_server()
|
||||||
|
|||||||
@@ -34,7 +34,9 @@ class _JSONFormatter(logging.Formatter):
|
|||||||
``traceback`` field. Without this, every ``logger.error`` in
|
``traceback`` field. Without this, every ``logger.error`` in
|
||||||
fast-agent / fastmcp / the MCP SDK loses its stack trace and we end
|
fast-agent / fastmcp / the MCP SDK loses its stack trace and we end
|
||||||
up guessing from the single-line message — which is exactly the
|
up guessing from the single-line message — which is exactly the
|
||||||
rabbit hole we spent hours in during the bearer-forwarding debug.
|
rabbit hole we spent hours in chasing opaque MCP transport failures
|
||||||
|
(see ``pallas._fastagent_patch`` for the trace-capture wrappers that
|
||||||
|
grew out of the same debugging session).
|
||||||
|
|
||||||
Also pulls in ``record.exc_text`` if the formatter was already
|
Also pulls in ``record.exc_text`` if the formatter was already
|
||||||
populated upstream (e.g. by another handler), avoiding duplicate
|
populated upstream (e.g. by another handler), avoiding duplicate
|
||||||
@@ -126,7 +128,7 @@ def _resolve_log_file() -> Path:
|
|||||||
The parent directory is created lazily (``mkdir -p``) so a fresh host
|
The parent directory is created lazily (``mkdir -p``) so a fresh host
|
||||||
doesn't need any prep work. We avoid ``/tmp`` because systemd's
|
doesn't need any prep work. We avoid ``/tmp`` because systemd's
|
||||||
``PrivateTmp=yes`` makes it per-unit-invisible — learned the hard way
|
``PrivateTmp=yes`` makes it per-unit-invisible — learned the hard way
|
||||||
during the bearer-forwarding debug saga.
|
during the MCP-transport debug saga.
|
||||||
"""
|
"""
|
||||||
override = os.environ.get("PALLAS_LOG_FILE")
|
override = os.environ.get("PALLAS_LOG_FILE")
|
||||||
if override:
|
if override:
|
||||||
@@ -143,13 +145,13 @@ def setup_logging() -> None:
|
|||||||
|
|
||||||
1. ``PALLAS_LOG_LEVEL`` environment variable — explicit override.
|
1. ``PALLAS_LOG_LEVEL`` environment variable — explicit override.
|
||||||
2. ``logger.level`` in ``fastagent.config.yaml`` — unified control knob
|
2. ``logger.level`` in ``fastagent.config.yaml`` — unified control knob
|
||||||
so bumping fast-agent's level also flips on Pallas's bearer-forwarding
|
so bumping fast-agent's level also flips on Pallas's own
|
||||||
diagnostics.
|
diagnostics.
|
||||||
3. ``INFO``.
|
3. ``INFO``.
|
||||||
|
|
||||||
``DEBUG`` unlocks diagnostics on the ``pallas.forward`` and
|
``DEBUG`` unlocks traceback capture on the ``pallas.forward.trace``
|
||||||
``pallas.auth`` loggers — essential when troubleshooting Mnemosyne /
|
logger (see ``pallas._fastagent_patch``) — essential when
|
||||||
workspace-scoped agent calls.
|
troubleshooting opaque MCP transport failures.
|
||||||
|
|
||||||
Scope of handlers:
|
Scope of handlers:
|
||||||
|
|
||||||
|
|||||||
@@ -19,17 +19,14 @@ fast-agent instance whose ``message_history`` is seeded from the caller's
|
|||||||
memory, no restart amnesia.
|
memory, no restart amnesia.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
|
||||||
import time
|
import time
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import fast_agent.core.prompt
|
import fast_agent.core.prompt
|
||||||
from fast_agent.core.logging.logger import get_logger
|
from fast_agent.core.logging.logger import get_logger
|
||||||
from fast_agent.mcp.auth.context import request_bearer_token
|
|
||||||
from fast_agent.mcp.server import AgentMCPServer
|
from fast_agent.mcp.server import AgentMCPServer
|
||||||
from fast_agent.types import PromptMessageExtended, RequestParams
|
from fast_agent.types import PromptMessageExtended, RequestParams
|
||||||
|
|
||||||
from pallas._fastagent_patch import _FORWARD_SERVERS, publish_bearer, revoke_bearer
|
|
||||||
from pallas.progress import EnrichedMCPToolProgressManager
|
from pallas.progress import EnrichedMCPToolProgressManager
|
||||||
from fastmcp import Context as MCPContext
|
from fastmcp import Context as MCPContext
|
||||||
from fastmcp.prompts import Message
|
from fastmcp.prompts import Message
|
||||||
@@ -39,79 +36,6 @@ from starlette.responses import JSONResponse, Response
|
|||||||
|
|
||||||
logger = get_logger(__name__)
|
logger = get_logger(__name__)
|
||||||
|
|
||||||
# Separate stdlib logger for bearer-token diagnostics — routed through
|
|
||||||
# ``pallas.log`` JSON handler to stdout / systemd journal. Gated at DEBUG so
|
|
||||||
# it is off by default in production but trivially flipped on via
|
|
||||||
# ``PALLAS_LOG_LEVEL=DEBUG`` for troubleshooting agent auth issues.
|
|
||||||
_auth_log = logging.getLogger("pallas.auth")
|
|
||||||
|
|
||||||
|
|
||||||
def _get_request_bearer_token() -> str | None:
|
|
||||||
"""Return the raw bearer token from the current MCP request's Authorization header.
|
|
||||||
|
|
||||||
Reads the header directly rather than going through get_access_token() because
|
|
||||||
Pallas runs without FastMCP auth middleware — there is no AuthenticatedUser in
|
|
||||||
the request scope, so get_access_token() always returns None here. The token
|
|
||||||
is an opaque string forwarded to opted-in downstream servers by
|
|
||||||
``pallas._fastagent_patch``.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
from fastmcp.server.dependencies import get_http_request
|
|
||||||
|
|
||||||
request = get_http_request()
|
|
||||||
auth = request.headers.get("authorization", "")
|
|
||||||
if auth.lower().startswith("bearer "):
|
|
||||||
token = auth[7:]
|
|
||||||
_auth_log.debug(
|
|
||||||
"bearer.captured len=%d prefix=%s", len(token), token[:8]
|
|
||||||
)
|
|
||||||
return token
|
|
||||||
_auth_log.debug("bearer.absent has_auth=%s", bool(auth))
|
|
||||||
except Exception as exc:
|
|
||||||
_auth_log.debug("bearer.error %s", exc)
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _forwardable_server_configs(agent) -> list[Any]:
|
|
||||||
"""Return the ``MCPServerSettings`` objects the agent is entitled to and
|
|
||||||
which are listed in ``_FORWARD_SERVERS`` (opt-in via
|
|
||||||
``forward_inbound_auth: true`` in ``fastagent.config.yaml``).
|
|
||||||
|
|
||||||
Restricting to the intersection of (agent-attached-servers ∩
|
|
||||||
opted-in-servers) ensures a request never publishes its bearer against
|
|
||||||
a server the calling agent does not use — e.g. a Harper→Mnemosyne call
|
|
||||||
must not flag Scotty→Mnemosyne's config.
|
|
||||||
|
|
||||||
We read the opt-in set from ``pallas._fastagent_patch._FORWARD_SERVERS``
|
|
||||||
rather than a pydantic attribute on the config object because fast-agent's
|
|
||||||
``Settings(**merged)`` validation silently drops unknown keys on nested
|
|
||||||
``MCPServerSettings`` instances (see ``_fastagent_patch._refresh_forward_servers``
|
|
||||||
for the gory details).
|
|
||||||
|
|
||||||
Safe to call before the agent is constructed: returns an empty list if
|
|
||||||
any attribute lookup fails.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
agent_servers = set(getattr(agent.config, "servers", []) or [])
|
|
||||||
if not agent_servers:
|
|
||||||
return []
|
|
||||||
opt_in = agent_servers & _FORWARD_SERVERS
|
|
||||||
if not opt_in:
|
|
||||||
return []
|
|
||||||
registry = agent.context.server_registry
|
|
||||||
if registry is None:
|
|
||||||
return []
|
|
||||||
configs: list[Any] = []
|
|
||||||
for name in opt_in:
|
|
||||||
cfg = registry.registry.get(name)
|
|
||||||
if cfg is not None:
|
|
||||||
configs.append(cfg)
|
|
||||||
return configs
|
|
||||||
except Exception as exc:
|
|
||||||
_auth_log.debug("bearer.registry_lookup_failed %s", exc)
|
|
||||||
return []
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def _history_to_fastmcp_messages(
|
def _history_to_fastmcp_messages(
|
||||||
message_history: list[PromptMessageExtended],
|
message_history: list[PromptMessageExtended],
|
||||||
@@ -277,34 +201,15 @@ class MultimodalAgentMCPServer(AgentMCPServer):
|
|||||||
Optional opaque identifier, logged for trace correlation.
|
Optional opaque identifier, logged for trace correlation.
|
||||||
Pallas does not interpret it.
|
Pallas does not interpret it.
|
||||||
"""
|
"""
|
||||||
inbound_bearer = _get_request_bearer_token()
|
|
||||||
saved_token = request_bearer_token.set(inbound_bearer)
|
|
||||||
report_progress = self._build_progress_reporter(ctx)
|
report_progress = self._build_progress_reporter(ctx)
|
||||||
request_params = RequestParams(
|
request_params = RequestParams(
|
||||||
tool_execution_handler=EnrichedMCPToolProgressManager(report_progress),
|
tool_execution_handler=EnrichedMCPToolProgressManager(report_progress),
|
||||||
emit_loop_progress=True,
|
emit_loop_progress=True,
|
||||||
)
|
)
|
||||||
# Track which downstream server configs we publish the bearer
|
instance = await self._acquire_instance(ctx)
|
||||||
# against so the ``finally`` block below can revoke every one of
|
agent = instance.app[agent_name]
|
||||||
# them even if the agent send raises halfway through.
|
agent_context = getattr(agent, "context", None)
|
||||||
published_configs: list[Any] = []
|
|
||||||
try:
|
try:
|
||||||
instance = await self._acquire_instance(ctx)
|
|
||||||
agent = instance.app[agent_name]
|
|
||||||
agent_context = getattr(agent, "context", None)
|
|
||||||
|
|
||||||
# Register the inbound bearer against each downstream server
|
|
||||||
# config the agent is allowed to reach and which opts-in via
|
|
||||||
# ``forward_inbound_auth: true``. This is how the bearer
|
|
||||||
# crosses the anyio task-group boundary that ContextVars
|
|
||||||
# cannot hop — see ``pallas._fastagent_patch`` for the
|
|
||||||
# full explanation.
|
|
||||||
if inbound_bearer:
|
|
||||||
for srv_cfg in _forwardable_server_configs(agent):
|
|
||||||
publish_bearer(srv_cfg, inbound_bearer)
|
|
||||||
published_configs.append(srv_cfg)
|
|
||||||
|
|
||||||
|
|
||||||
# Seed the freshly-created instance's message_history from the
|
# Seed the freshly-created instance's message_history from the
|
||||||
# caller-supplied history so the agent sees the full
|
# caller-supplied history so the agent sees the full
|
||||||
# conversation the caller is tracking. Safe no-op when the
|
# conversation the caller is tracking. Safe no-op when the
|
||||||
@@ -359,21 +264,13 @@ class MultimodalAgentMCPServer(AgentMCPServer):
|
|||||||
)
|
)
|
||||||
return response
|
return response
|
||||||
|
|
||||||
try:
|
if agent_context and ctx:
|
||||||
if agent_context and ctx:
|
return await self.with_bridged_context(
|
||||||
return await self.with_bridged_context(
|
agent_context, ctx, execute_send
|
||||||
agent_context, ctx, execute_send
|
)
|
||||||
)
|
return await execute_send()
|
||||||
return await execute_send()
|
|
||||||
finally:
|
|
||||||
await self._release_instance(ctx, instance)
|
|
||||||
finally:
|
finally:
|
||||||
# Always revoke every bearer we published, then restore the
|
await self._release_instance(ctx, instance)
|
||||||
# ContextVar — order matters only for tidiness; a revoke that
|
|
||||||
# finds nothing is a no-op.
|
|
||||||
for srv_cfg in published_configs:
|
|
||||||
revoke_bearer(srv_cfg)
|
|
||||||
request_bearer_token.reset(saved_token)
|
|
||||||
|
|
||||||
|
|
||||||
if self._instance_scope == "request":
|
if self._instance_scope == "request":
|
||||||
|
|||||||
Reference in New Issue
Block a user