Files
pallas/pallas/server.py
Robert Helewka f92f3fc710 feat(log): add service/project/component labels for Loki
Inject stable JSON fields (`service`, `project`, `component`) on every
log record via a `logging.Filter` so Alloy-forwarded logs share the
same label shape as Mnemosyne/Daedalus. The `project` label is set
once from `agents.yaml`, and `component` is stored in a ContextVar
so each agent's asyncio task carries its own value without leaking
across sibling agents.

Also switch the opt-in console sink from `sys.__stderr__` to
`sys.__stdout__` (`PALLAS_LOG_STDOUT=1`) for cleaner journald capture
in systemd deployments.
2026-05-11 13:52:25 -04:00

373 lines
14 KiB
Python

"""
Pallas — FastAgent MCP Bridge
Reads agent topology from agents.yaml in the working directory and exposes
each agent as a StreamableHTTP MCP endpoint with an optional registry server.
Usage:
pallas # all agents + registry
pallas --agent jarvis # single agent mode
"""
import argparse
import asyncio
import importlib
import logging
import os
from pathlib import Path
import yaml
from pallas.log import set_agent_component, set_project, setup_logging
from pallas.multimodal_server import MultimodalAgentMCPServer
logger = logging.getLogger(__name__)
def _config_root() -> Path:
"""Return the working directory where agents.yaml and fastagent configs live."""
return Path.cwd()
# ── Configuration ─────────────────────────────────────────────────────────────
def _load_deployment_config() -> dict:
"""Load agents.yaml — single source of truth for deployment topology."""
config_path = _config_root() / os.environ.get("PALLAS_AGENTS_CONFIG", "agents.yaml")
if not config_path.exists():
raise SystemExit(f"deployment config not found: {config_path}")
with open(config_path) as f:
config = yaml.safe_load(f) or {}
if "agents" not in config:
raise SystemExit(f"no 'agents' section in {config_path}")
return config
def _build_agents_table(config: dict) -> dict[str, dict]:
"""Build {name: {module, port, model?, model_capabilities?}} from agents.yaml.
The ``model`` and ``model_capabilities`` fields are optional. When ``model``
is set, Pallas overrides the agent's ``AgentConfig.model`` at startup so
fast-agent routes that agent to the specified model/provider. When
``model_capabilities`` is set, those capabilities are used to register the
model with fast-agent's ``ModelDatabase`` instead of the top-level defaults
from ``fastagent.config.yaml``.
"""
return {
name: {
"module": agent["module"],
"port": agent["port"],
"model": agent.get("model"),
"model_capabilities": agent.get("model_capabilities"),
}
for name, agent in config["agents"].items()
}
def _build_agent_deps(config: dict) -> dict[str, list[str]]:
"""Build dependency graph from agents.yaml depends_on fields."""
agents = config["agents"]
deps: dict[str, list[str]] = {}
for name, agent in agents.items():
dep_list = agent.get("depends_on", [])
if dep_list:
deps[name] = list(dep_list)
return deps
# ── Downstream MCP server helpers ─────────────────────────────────────────────
def _resolve_downstream_servers(fast_instance) -> dict[str, dict]:
"""Collect downstream MCP server configs referenced by any agent."""
server_names = set()
for agent_data in fast_instance.agents.values():
config = agent_data.get("config")
if config:
for s in getattr(config, "servers", []):
server_names.add(s)
mcp_servers = fast_instance.config.get("mcp", {}).get("servers", {})
servers = {}
for name in sorted(server_names):
cfg = mcp_servers.get(name, {})
if isinstance(cfg, dict):
url = cfg.get("url")
headers = cfg.get("headers") or {}
else:
url = getattr(cfg, "url", None)
headers = getattr(cfg, "headers", None) or {}
if url:
servers[name] = {"url": str(url), "headers": dict(headers)}
return servers
def _preflight_mcp_servers(agent_name: str, servers: dict[str, dict]) -> None:
"""Warn at startup if MCP server auth headers have unresolved env vars."""
import re
for server_name, cfg in servers.items():
for header_key, header_val in cfg.get("headers", {}).items():
unresolved = re.findall(r"\$\{([^}]+)\}", str(header_val))
for var in unresolved:
val = os.environ.get(var, "")
if not val:
logger.warning(
"%s%s: %s references ${%s} but it is not set",
agent_name, server_name, header_key, var,
)
# ── Model registration ────────────────────────────────────────────────────────
def _register_one_model(model_spec: str, capabilities: dict) -> None:
"""Register a single model with fast-agent's ModelDatabase if unknown."""
from fast_agent.llm.model_database import ModelDatabase, ModelParameters
model_name = model_spec.split(".", 1)[-1] if "." in model_spec else model_spec
if ModelDatabase.get_model_params(model_name) is not None:
return
is_vision = capabilities.get("vision", False)
context_window = capabilities.get("context_window", 131072)
max_output_tokens = capabilities.get("max_output_tokens", 16384)
if is_vision:
tokenizes = list(ModelDatabase.QWEN_MULTIMODAL)
logger.info("Registered model '%s' with vision capabilities", model_name)
else:
tokenizes = list(ModelDatabase.TEXT_ONLY)
logger.info("Registered model '%s' as text-only", model_name)
ModelDatabase.register_runtime_model_params(
model_name,
ModelParameters(
context_window=context_window,
max_output_tokens=max_output_tokens,
tokenizes=tokenizes,
),
)
def _register_unknown_models(deployment_config: dict) -> None:
"""Register runtime model params for models not in fast-agent's ModelDatabase.
Registers the ``default_model`` from ``fastagent.config.yaml`` plus every
per-agent ``model`` declared in ``agents.yaml``. Capabilities are resolved
per model: if the agent carries its own ``model_capabilities`` block, those
take effect; otherwise the top-level ``model_capabilities`` from
``fastagent.config.yaml`` apply.
"""
fastagent_config_path = _config_root() / "fastagent.config.yaml"
if not fastagent_config_path.exists():
return
with open(fastagent_config_path) as f:
fa_config = yaml.safe_load(f) or {}
default_model = fa_config.get("default_model", "")
default_capabilities = fa_config.get("model_capabilities", {})
seen: set[str] = set()
if default_model:
_register_one_model(default_model, default_capabilities)
seen.add(default_model)
for agent_name, agent in deployment_config.get("agents", {}).items():
agent_model = agent.get("model")
if not agent_model or agent_model in seen:
continue
agent_caps = agent.get("model_capabilities") or default_capabilities
_register_one_model(agent_model, agent_caps)
seen.add(agent_model)
# ── Agent lifecycle ───────────────────────────────────────────────────────────
async def _preflight(deployment_config: dict) -> None:
from pallas.health import validate_llm_providers
_register_unknown_models(deployment_config)
await validate_llm_providers()
async def _start_agent(name: str, agents: dict[str, dict]) -> None:
from pallas.health import register_health_tool
# Tag every log record emitted from this asyncio task with the agent
# name (``component`` field). ContextVar scope is per-task, so each
# agent started from ``asyncio.gather`` carries its own value and the
# registry's own task sees ``component="runtime"`` by default. Fast-
# agent, fastmcp, anthropic and openai records all inherit this tag
# because the _StaticFieldsFilter is attached to every handler.
set_agent_component(name)
entry = agents[name]
module_path = entry["module"]
port = entry["port"]
model_override = entry.get("model")
module = importlib.import_module(module_path)
fast_instance = module.fast
if model_override:
for agent_data in fast_instance.agents.values():
agent_cfg = agent_data.get("config")
if agent_cfg is not None:
agent_cfg.model = model_override
logger.info("%s model override → %s", name, model_override)
logger.info("Starting %s agent on port %d", name, port)
async with fast_instance.run():
primary_instance = fast_instance._server_managed_instances[0]
# Stateless per request: each MCP `tools/call` gets a freshly-created
# agent instance which is disposed immediately after the response.
# Conversation history is owned by the caller (Daedalus) and supplied
# on every turn via the `history` argument on `send_message` — see
# multimodal_server.MultimodalAgentMCPServer.register_agent_tools.
#
# Why this matters:
# * "shared" leaks one conversation's history into the next
# because all callers see the same `agent.message_history`.
# * "shared" also silently loses everything on process restart,
# breaking the "Pallas is ephemeral" contract.
# With "request" the Pallas process holds no per-conversation state
# and the LLM sees exactly what Daedalus asks it to see.
server = MultimodalAgentMCPServer(
primary_instance=primary_instance,
create_instance=fast_instance._server_instance_factory,
dispose_instance=fast_instance._server_instance_dispose,
instance_scope="request",
server_name=f"{fast_instance.name}-MCP-Server",
host="0.0.0.0",
get_registry_version=fast_instance._get_registry_version,
)
downstream_servers = _resolve_downstream_servers(fast_instance)
_preflight_mcp_servers(name, downstream_servers)
register_health_tool(server.mcp_server, downstream_servers)
await server.run_async(transport="http", host="0.0.0.0", port=port)
async def _wait_for_agent(
name: str,
agents: dict[str, dict],
timeout: float = 60.0,
) -> None:
import httpx
port = agents[name]["port"]
url = f"http://127.0.0.1:{port}/mcp"
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
try:
async with httpx.AsyncClient(timeout=2.0) as client:
await client.get(url)
logger.info("%s is ready", name)
return
except Exception:
await asyncio.sleep(1.0)
logger.warning("%s did not become ready within %.0fs", name, timeout)
async def _run_single(name: str, agents: dict[str, dict], deployment_config: dict) -> None:
await _preflight(deployment_config)
await _start_agent(name, agents)
async def _start_all(config: dict) -> None:
from pallas.registry import run_registry
agents = _build_agents_table(config)
agent_deps = _build_agent_deps(config)
registry_port = config.get("registry_port", 24200)
await _preflight(config)
# Identify subagents that must start first.
subagents: set[str] = set()
for dep_name, dep_list in agent_deps.items():
if dep_name in agents:
for sub in dep_list:
if sub in agents:
subagents.add(sub)
long_running: list[asyncio.Task] = []
long_running.append(
asyncio.create_task(run_registry(port=registry_port))
)
for name in sorted(subagents):
long_running.append(asyncio.create_task(_start_agent(name, agents)))
await asyncio.gather(*[_wait_for_agent(n, agents) for n in subagents])
for name in agents:
if name not in subagents:
long_running.append(asyncio.create_task(_start_agent(name, agents)))
await asyncio.gather(*long_running)
# ── CLI ───────────────────────────────────────────────────────────────────────
def main() -> None:
config = _load_deployment_config()
agents = _build_agents_table(config)
registry_port = config.get("registry_port", 24200)
deploy_name = config.get("name", "pallas")
parser = argparse.ArgumentParser(
description=f"{deploy_name.title()} — FastAgent MCP Bridge",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Port assignments:\n"
+ "\n".join(
f" {name:16s} port {entry['port']}"
for name, entry in agents.items()
)
+ f"\n {'registry':16s} port {registry_port}"
),
)
parser.add_argument(
"--agent",
choices=list(agents.keys()),
metavar="AGENT",
help="Start a specific agent (default: all). Choices: %(choices)s",
)
args = parser.parse_args()
# Project label (e.g. "kottos", "mentor", "iolaus") is read from
# ``agents.yaml`` and stamped onto every log record via
# ``pallas.log._StaticFieldsFilter``. Must be set before
# ``setup_logging()`` so the "log file: …" bootstrap record already
# carries the right project label.
set_project(deploy_name)
setup_logging()
if args.agent:
port = agents[args.agent]["port"]
logger.info("Starting %s agent on port %d", args.agent, port)
asyncio.run(_run_single(args.agent, agents, config))
else:
logger.info("Starting all agents + registry for %s", deploy_name)
logger.info(
"registry → http://0.0.0.0:%d/.well-known/mcp/server.json", registry_port
)
for name, entry in agents.items():
logger.info("%-16s → http://0.0.0.0:%d/mcp", name, entry["port"])
asyncio.run(_start_all(config))
if __name__ == "__main__":
main()