Initial commit: pallas package extracted from mentor

This commit is contained in:
2026-04-02 12:41:53 +00:00
commit 9092afb532
8 changed files with 1094 additions and 0 deletions

31
.gitignore vendored Normal file
View File

@@ -0,0 +1,31 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
# Distribution / packaging
build/
dist/
*.egg-info/
.eggs/
MANIFEST
# Virtual environments
.venv/
venv/
env/
ENV/
# Environments
.env
# FastAgent runtime
fastagent.secrets.yaml
fastagent.jsonl
.fast-agent/sessions/
# Editor
.ruff_cache/
.mypy_cache/
.pypirc

114
README.md Normal file
View File

@@ -0,0 +1,114 @@
# Pallas — FastAgent MCP Bridge
Pallas is the generic runtime that turns [fast-agent](https://github.com/evalstate/fast-agent) agent definitions into StreamableHTTP MCP servers.
It is **completely deployment-agnostic**: all environment-specific values (agent names, ports, hosts, model) live in the calling project's `agents.yaml` and `fastagent.config.yaml`.
---
## Installation
```bash
pip install git+ssh://git@git.helu.ca:22022/r/pallas.git
```
Or as a project dependency in `pyproject.toml`:
```toml
dependencies = [
"pallas-mcp @ git+ssh://git@git.helu.ca:22022/r/pallas.git",
]
```
---
## Usage
Pallas reads configuration from the **working directory** at runtime.
```
my-project/
├── agents/
│ ├── __init__.py
│ └── jarvis.py # FastAgent definitions
├── agents.yaml # Deployment topology
├── fastagent.config.yaml # FastAgent + model config
└── fastagent.secrets.yaml # API keys (gitignored)
```
Run from your project root:
```bash
pallas # start all agents + registry
pallas --agent jarvis # start a single agent
```
Or via `python -m`:
```bash
python -m pallas.server
```
---
## `agents.yaml` format
```yaml
name: my-project # used in log prefixes and registry names
version: "1.0.0"
host: my-host.example.com # hostname for registry URLs
namespace: com.example.my-project
registry_port: 8200
agents:
jarvis:
module: agents.jarvis # importable Python module path
port: 8201
title: Jarvis
description: "My assistant agent"
depends_on: [research] # optional: start these first
research:
module: agents.research
port: 8250
title: Research Agent
description: "Web search and knowledge graph"
```
---
## `fastagent.config.yaml` extensions
Pallas reads two extra keys beyond the standard fast-agent config:
```yaml
default_model: openai.my-custom-model-name
# Explicit capability declarations — avoids brittle name-regex heuristics
model_capabilities:
vision: false
context_window: 200000
max_output_tokens: 32000
```
Capabilities are published in the registry and used to register unknown models
with fast-agent's `ModelDatabase`.
---
## Environment variable
| Variable | Default | Purpose |
|---|---|---|
| `PALLAS_AGENTS_CONFIG` | `agents.yaml` | Override path to deployment config |
---
## What Pallas provides
| Module | Purpose |
|---|---|
| `pallas.server` | CLI entry point and agent orchestration |
| `pallas.registry` | `GET /.well-known/mcp/server.json` registry server |
| `pallas.multimodal_server` | `MultimodalAgentMCPServer``AgentMCPServer` subclass with image support |
| `pallas.health` | LLM preflight validation + `get_health` MCP tool |

8
pallas/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
"""
Pallas — FastAgent MCP Bridge
Generic runtime for serving FastAgent agents over StreamableHTTP.
Reads deployment topology from agents.yaml in the working directory.
"""
__version__ = "0.1.0"

320
pallas/health.py Normal file
View File

@@ -0,0 +1,320 @@
"""
Health check module for Pallas.
Probes downstream MCP server connectivity and exposes a get_health MCP tool.
Validates LLM provider API keys and model availability at startup.
"""
import asyncio
import json
import os
import re
from datetime import datetime, timezone
from pathlib import Path
import httpx
import yaml
def _config_root() -> Path:
"""Return the working directory where agents.yaml and fastagent configs live."""
return Path.cwd()
def _load_deployment_name() -> str:
"""Read the deployment name from agents.yaml (or PALLAS_AGENTS_CONFIG override)."""
config_path = _config_root() / os.environ.get("PALLAS_AGENTS_CONFIG", "agents.yaml")
if config_path.exists():
data = yaml.safe_load(config_path.read_text()) or {}
return data.get("name", "pallas")
return "pallas"
_DEPLOY_NAME = _load_deployment_name()
_PREFIX = f"[{_DEPLOY_NAME}]"
# ── Provider API endpoints ───────────────────────────────────────────────────
_ANTHROPIC_API = "https://api.anthropic.com/v1"
_OPENAI_DEFAULT_API = "https://api.openai.com/v1"
# Populated by validate_llm_providers() at startup, read by get_health()
_llm_status: dict[str, dict] = {}
_active_provider: str = ""
def _load_dotenv() -> None:
"""Load .env file into os.environ (without overwriting existing vars)."""
env_path = _config_root() / ".env"
if not env_path.exists():
return
for line in env_path.read_text().splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
key = key.strip()
value = value.strip()
if key and key not in os.environ:
os.environ[key] = value
def _expand_env(value: str) -> str:
"""Replace ${VAR} placeholders with environment variable values."""
return re.sub(
r"\$\{([^}]+)\}",
lambda m: os.environ.get(m.group(1), ""),
value,
)
def _load_config() -> tuple[dict, dict]:
"""Load fastagent config and secrets YAML from the working directory."""
root = _config_root()
config = yaml.safe_load((root / "fastagent.config.yaml").read_text()) or {}
secrets_path = root / "fastagent.secrets.yaml"
secrets = yaml.safe_load(secrets_path.read_text()) if secrets_path.exists() else {}
return config, secrets
async def _check_anthropic(client: httpx.AsyncClient, api_key: str, model_id: str) -> str | None:
"""Validate an Anthropic model. Returns None on success, error message on failure."""
try:
resp = await client.get(
f"{_ANTHROPIC_API}/models/{model_id}",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
},
)
except Exception as exc:
return f"API unreachable ({type(exc).__name__})"
if resp.status_code == 200:
return None
if resp.status_code == 404:
return f"model '{model_id}' not found"
return f"API request failed ({resp.status_code})"
async def _check_openai(
client: httpx.AsyncClient, api_key: str, model_id: str, base_url: str
) -> str | None:
"""Validate an OpenAI-compatible model. Returns None on success, error message on failure."""
try:
resp = await client.get(
f"{base_url.rstrip('/')}/models/{model_id}",
headers={"Authorization": f"Bearer {api_key}"},
)
except Exception as exc:
return f"API unreachable ({type(exc).__name__})"
if resp.status_code == 200:
return None
if resp.status_code == 404:
return f"model '{model_id}' not found"
return f"API request failed ({resp.status_code})"
async def _list_openai_models(
client: httpx.AsyncClient, api_key: str, base_url: str
) -> tuple[str | None, list[str]]:
"""List models from an OpenAI-compatible API. Returns (error, model_ids)."""
try:
resp = await client.get(
f"{base_url.rstrip('/')}/models",
headers={"Authorization": f"Bearer {api_key}"},
)
except Exception as exc:
return f"API unreachable ({type(exc).__name__})", []
if resp.status_code != 200:
return f"API request failed ({resp.status_code})", []
data = resp.json()
models = [m["id"] for m in data.get("data", []) if "id" in m]
return None, models
async def validate_llm_providers(timeout: float = 5.0) -> dict[str, dict]:
"""
Validate configured LLM provider API keys and model availability.
Reads fastagent.config.yaml for default_model and fastagent.secrets.yaml
for API keys. Checks all providers that have keys configured.
Returns a dict keyed by provider name with validation results.
"""
_load_dotenv()
config, secrets = _load_config()
default_model = config.get("default_model", "")
# Parse provider and model from "provider.model-name" format
active_provider = default_model.split(".")[0] if "." in default_model else ""
active_model = default_model.split(".", 1)[1] if "." in default_model else default_model
# Resolve API keys from secrets (expanding ${ENV_VAR} references), falling
# back to env vars directly so that .env alone is sufficient.
anthropic_key = _expand_env(secrets.get("anthropic", {}).get("api_key", "")) or os.environ.get("ANTHROPIC_API_KEY", "")
openai_key = _expand_env(secrets.get("openai", {}).get("api_key", "")) or os.environ.get("OPENAI_API_KEY", "")
openai_base = (
_expand_env(secrets.get("openai", {}).get("base_url", ""))
or config.get("openai", {}).get("base_url", "")
or os.environ.get("OPENAI_BASE_URL", "")
or _OPENAI_DEFAULT_API
)
results: dict[str, dict] = {}
async with httpx.AsyncClient(timeout=timeout) as client:
# ── Anthropic ────────────────────────────────────────────────────
if anthropic_key:
model_id = active_model if active_provider == "anthropic" else None
if model_id:
err = await _check_anthropic(client, anthropic_key, model_id)
if err:
results["anthropic"] = {"status": "error", "model": model_id, "message": err}
print(f"{_PREFIX} WARNING: anthropic: {err}")
else:
results["anthropic"] = {"status": "ok", "model": model_id}
print(f"{_PREFIX} anthropic: {model_id}")
else:
# Key is set but Anthropic isn't the active provider — just verify API access
err = await _check_anthropic(client, anthropic_key, "claude-sonnet-4-5")
if err and "not found" not in err:
results["anthropic"] = {"status": "error", "message": err}
print(f"{_PREFIX} WARNING: anthropic: {err}")
else:
results["anthropic"] = {"status": "ok"}
print(f"{_PREFIX} anthropic: API key valid ✓")
elif active_provider == "anthropic":
results["anthropic"] = {"status": "error", "message": "API key not configured"}
print(f"{_PREFIX} WARNING: anthropic: API key not configured")
# ── OpenAI ───────────────────────────────────────────────────────
if openai_key:
model_id = active_model if active_provider == "openai" else None
err, models = await _list_openai_models(client, openai_key, openai_base)
if err:
results["openai"] = {"status": "error", "message": err}
print(f"{_PREFIX} WARNING: openai ({openai_base}): {err}")
elif model_id:
if model_id in models:
results["openai"] = {"status": "ok", "model": model_id}
print(f"{_PREFIX} openai ({openai_base}): {model_id}")
else:
label = ", ".join(models) if models else "none"
results["openai"] = {"status": "error", "model": model_id, "message": f"model '{model_id}' not found (available: {label})"}
print(f"{_PREFIX} WARNING: openai ({openai_base}): model '{model_id}' not found (available: {label})")
else:
results["openai"] = {"status": "ok", "models": models}
label = ", ".join(models) if models else "no models loaded"
print(f"{_PREFIX} openai ({openai_base}): {label}")
elif active_provider == "openai":
results["openai"] = {"status": "error", "message": "API key not configured"}
print(f"{_PREFIX} WARNING: openai: API key not configured")
_llm_status.clear()
_llm_status.update(results)
global _active_provider
_active_provider = active_provider
return results
async def check_downstream_health(
servers: dict[str, dict], timeout: float = 3.0
) -> dict:
"""
Probe downstream MCP servers and return aggregate health status.
Args:
servers: Mapping of server name to {"url": str, "headers": dict}.
Headers may contain ${ENV_VAR} placeholders which are expanded
before the request is sent.
timeout: Per-request timeout in seconds.
Returns:
{"status": "ok"|"degraded", "timestamp": "...", "message": "..."}
"""
_load_dotenv()
async def _probe(
client: httpx.AsyncClient, name: str, cfg: dict
) -> tuple[str, bool, str]:
url = cfg.get("url", "")
raw_headers = cfg.get("headers", {})
headers = {k: _expand_env(str(v)) for k, v in raw_headers.items()}
try:
common_headers = {
"Accept": "application/json, text/event-stream",
"Content-Type": "application/json",
**headers,
}
resp = await client.post(
url,
headers=common_headers,
json={
"jsonrpc": "2.0",
"method": "initialize",
"id": 1,
"params": {
"protocolVersion": "2025-03-26",
"capabilities": {},
"clientInfo": {
"name": f"{_DEPLOY_NAME}-health",
"version": "1.0.0",
},
},
},
)
if resp.status_code >= 400:
return name, False, f"HTTP {resp.status_code}"
# Tear down the session so we don't leak server-side state.
session_id = resp.headers.get("mcp-session-id")
if session_id:
try:
await client.delete(
url,
headers={**headers, "mcp-session-id": session_id},
)
except Exception:
pass # best-effort cleanup
return name, True, ""
except Exception as exc:
return name, False, type(exc).__name__
async with httpx.AsyncClient(timeout=timeout) as client:
results = await asyncio.gather(
*(_probe(client, name, cfg) for name, cfg in servers.items())
)
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
failures = sorted(
(f"{name} ({reason})" if reason else name)
for name, ok, reason in results
if not ok
)
if not failures:
return {"status": "ok", "timestamp": now}
return {
"status": "degraded",
"timestamp": now,
"message": f"Unreachable: {', '.join(failures)}",
}
def register_health_tool(mcp_server, servers: dict[str, dict]) -> None:
"""Register a get_health MCP tool on the given FastMCP server instance."""
@mcp_server.tool(
name="get_health",
description="Returns the health status of this agent and its downstream dependencies.",
)
async def get_health() -> str:
result = await check_downstream_health(servers)
# Include LLM provider status from startup preflight (active provider only)
active = _llm_status.get(_active_provider, {})
if active.get("status") != "ok" and _active_provider:
err_msg = f"LLM: {_active_provider}: {active.get('message', 'error')}"
result["status"] = "degraded"
existing = result.get("message", "")
result["message"] = f"{existing}; {err_msg}" if existing else err_msg
return json.dumps(result)

163
pallas/multimodal_server.py Normal file
View File

@@ -0,0 +1,163 @@
"""
MultimodalAgentMCPServer — AgentMCPServer subclass with images support.
Overrides register_agent_tools to accept an optional ``images`` parameter
on each agent's ``send_message`` tool, enabling callers to attach base64-
encoded images alongside the text message.
Drop-in replacement for AgentMCPServer:
from pallas.multimodal_server import MultimodalAgentMCPServer
server = MultimodalAgentMCPServer(
primary_instance=...,
create_instance=...,
dispose_instance=...,
instance_scope="shared",
)
"""
import time
import fast_agent.core.prompt
from fast_agent.core.logging.logger import get_logger
from fast_agent.mcp.auth.context import request_bearer_token
from fast_agent.mcp.server import AgentMCPServer
from fast_agent.mcp.tool_progress import MCPToolProgressManager
from fast_agent.types import PromptMessageExtended, RequestParams
from fastmcp import Context as MCPContext
from fastmcp.prompts import Message
from mcp.types import ImageContent, TextContent
logger = get_logger(__name__)
def _get_request_bearer_token() -> str | None:
"""Return the authenticated bearer token for the current MCP request."""
try:
from fastmcp.server.dependencies import get_access_token
access_token = get_access_token()
if access_token is not None:
return access_token.token
except Exception:
pass
return None
def _history_to_fastmcp_messages(
message_history: list[PromptMessageExtended],
) -> list[Message]:
"""Convert stored agent history into FastMCP prompt messages."""
from fast_agent.mcp.prompts.prompt_server import convert_to_fastmcp_messages
prompt_messages = fast_agent.core.prompt.Prompt.from_multipart(message_history)
return convert_to_fastmcp_messages(prompt_messages)
class MultimodalAgentMCPServer(AgentMCPServer):
"""AgentMCPServer with optional image attachment support on send_message."""
def register_agent_tools(self, agent_name: str) -> None:
"""Register a send_message tool that accepts text + optional images."""
self._registered_agents.add(agent_name)
tool_description = (
self._tool_description.format(agent=agent_name)
if self._tool_description and "{agent}" in self._tool_description
else self._tool_description
)
agent_obj = self.primary_instance.agents.get(agent_name)
agent_description = None
if agent_obj is not None:
config = getattr(agent_obj, "config", None)
agent_description = getattr(config, "description", None)
tool_name = self._tool_name_template.format(agent=agent_name)
@self.mcp_server.tool(
name=tool_name,
description=tool_description
or agent_description
or f"Send a message to the {agent_name} agent",
)
async def send_message(
message: str,
ctx: MCPContext,
images: list[dict] | None = None,
) -> str:
saved_token = request_bearer_token.set(_get_request_bearer_token())
report_progress = self._build_progress_reporter(ctx)
request_params = RequestParams(
tool_execution_handler=MCPToolProgressManager(report_progress),
emit_loop_progress=True,
)
try:
instance = await self._acquire_instance(ctx)
agent = instance.app[agent_name]
agent_context = getattr(agent, "context", None)
if images:
content: list = [TextContent(type="text", text=message)]
for img in images:
content.append(
ImageContent(
type="image",
data=img["data"],
mimeType=img["mime_type"],
)
)
payload: str | PromptMessageExtended = PromptMessageExtended(
role="user", content=content
)
else:
payload = message
async def execute_send() -> str:
start = time.perf_counter()
logger.info(
f"MCP request received for agent '{agent_name}'",
name="mcp_request_start",
agent=agent_name,
session=self._session_identifier(ctx),
)
response = await agent.send(payload, request_params=request_params)
duration = time.perf_counter() - start
logger.info(
f"Agent '{agent_name}' completed MCP request",
name="mcp_request_complete",
agent=agent_name,
duration=duration,
session=self._session_identifier(ctx),
)
return response
try:
if agent_context and ctx:
return await self.with_bridged_context(
agent_context, ctx, execute_send
)
return await execute_send()
finally:
await self._release_instance(ctx, instance)
finally:
request_bearer_token.reset(saved_token)
if self._instance_scope == "request":
return
@self.mcp_server.prompt(
name=f"{agent_name}_history",
description=f"Conversation history for the {agent_name} agent",
)
async def get_history_prompt(ctx: MCPContext) -> list[Message]:
instance = await self._acquire_instance(ctx)
agent = instance.app[agent_name]
try:
multipart_history = agent.message_history
if not multipart_history:
return []
return _history_to_fastmcp_messages(multipart_history)
finally:
await self._release_instance(ctx, instance, reuse_connection=True)

135
pallas/registry.py Normal file
View File

@@ -0,0 +1,135 @@
"""
Agent Registry
Serves GET /.well-known/mcp/server.json for agent discovery.
Reads agent topology from agents.yaml and model capabilities from
fastagent.config.yaml in the working directory.
"""
import os
from datetime import datetime, timezone
from pathlib import Path
import yaml
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Route
def _config_root() -> Path:
"""Return the working directory where agents.yaml and fastagent configs live."""
return Path.cwd()
def _load_deployment_config() -> dict:
"""Load agents.yaml — single source of truth for deployment topology."""
config_path = _config_root() / os.environ.get("PALLAS_AGENTS_CONFIG", "agents.yaml")
if not config_path.exists():
return {}
with open(config_path) as f:
return yaml.safe_load(f) or {}
def _load_model_capabilities() -> dict:
"""Read model info and capabilities from the active fastagent.config.yaml."""
config_path = _config_root() / "fastagent.config.yaml"
if not config_path.exists():
return {}
with open(config_path) as f:
config = yaml.safe_load(f) or {}
default_model = config.get("default_model", "")
capabilities = config.get("model_capabilities", {})
if not default_model and not capabilities:
return {}
model_name = (
default_model.split(".", 1)[-1] if "." in default_model else default_model
)
return {
"model": model_name or None,
"vision": capabilities.get("vision", False),
"context_window": capabilities.get("context_window", None),
"max_output_tokens": capabilities.get("max_output_tokens", None),
}
def _build_registry(config: dict) -> dict:
"""Build the registry JSON from agents.yaml + fastagent.config.yaml."""
now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
model_caps = _load_model_capabilities()
host = config.get("host", "localhost")
namespace = config.get("namespace", "")
version = config.get("version", "1.0.0")
agents = config.get("agents", {})
entries = []
for name, agent in agents.items():
# Build registry name: namespace/slug (e.g. ca.helu.mentor/jarvis)
slug = name.replace("_", "-")
registry_name = f"{namespace}/{slug}" if namespace else slug
server_entry: dict = {
"$schema": "https://static.modelcontextprotocol.io/schemas/2025-12-11/server.schema.json",
"name": registry_name,
"title": agent.get("title", name.title()),
"description": agent.get("description", ""),
"version": version,
"remotes": [
{
"type": "streamable-http",
"url": f"http://{host}:{agent['port']}/mcp",
}
],
}
if model_caps:
server_entry["capabilities"] = model_caps
entries.append(
{
"server": server_entry,
"_meta": {
"io.modelcontextprotocol.registry/official": {
"status": "active",
"updatedAt": now,
"isLatest": True,
}
},
}
)
return {"servers": entries}
# ── Starlette app ─────────────────────────────────────────────────────────────
_deployment_config = _load_deployment_config()
async def server_json(request):
return JSONResponse(_build_registry(_deployment_config))
app = Starlette(
routes=[Route("/.well-known/mcp/server.json", server_json)],
)
async def run_registry(
host: str = "0.0.0.0",
port: int = 24200,
) -> None:
"""Run the registry server."""
import uvicorn
deploy_name = _deployment_config.get("name", "pallas")
print(f"[{deploy_name}] Registry on port {port}")
config = uvicorn.Config(app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
await server.serve()

301
pallas/server.py Normal file
View File

@@ -0,0 +1,301 @@
"""
Pallas — FastAgent MCP Bridge
Reads agent topology from agents.yaml in the working directory and exposes
each agent as a StreamableHTTP MCP endpoint with an optional registry server.
Usage:
pallas # all agents + registry
pallas --agent jarvis # single agent mode
"""
import argparse
import asyncio
import importlib
import logging
import os
from pathlib import Path
import yaml
from pallas.multimodal_server import MultimodalAgentMCPServer
def _config_root() -> Path:
"""Return the working directory where agents.yaml and fastagent configs live."""
return Path.cwd()
# ── Configuration ─────────────────────────────────────────────────────────────
def _load_deployment_config() -> dict:
"""Load agents.yaml — single source of truth for deployment topology."""
config_path = _config_root() / os.environ.get("PALLAS_AGENTS_CONFIG", "agents.yaml")
if not config_path.exists():
raise SystemExit(f"[pallas] ERROR: deployment config not found: {config_path}")
with open(config_path) as f:
config = yaml.safe_load(f) or {}
if "agents" not in config:
raise SystemExit(f"[pallas] ERROR: no 'agents' section in {config_path}")
return config
def _build_agents_table(config: dict) -> dict[str, tuple[str, int]]:
"""Build {name: (module_path, port)} from agents.yaml."""
return {
name: (agent["module"], agent["port"])
for name, agent in config["agents"].items()
}
def _build_agent_deps(config: dict) -> dict[str, list[str]]:
"""Build dependency graph from agents.yaml depends_on fields."""
agents = config["agents"]
deps: dict[str, list[str]] = {}
for name, agent in agents.items():
dep_list = agent.get("depends_on", [])
if dep_list:
deps[name] = list(dep_list)
return deps
# ── Downstream MCP server helpers ─────────────────────────────────────────────
def _resolve_downstream_servers(fast_instance) -> dict[str, dict]:
"""Collect downstream MCP server configs referenced by any agent."""
server_names = set()
for agent_data in fast_instance.agents.values():
config = agent_data.get("config")
if config:
for s in getattr(config, "servers", []):
server_names.add(s)
mcp_servers = fast_instance.config.get("mcp", {}).get("servers", {})
servers = {}
for name in sorted(server_names):
cfg = mcp_servers.get(name, {})
if isinstance(cfg, dict):
url = cfg.get("url")
headers = cfg.get("headers") or {}
else:
url = getattr(cfg, "url", None)
headers = getattr(cfg, "headers", None) or {}
if url:
servers[name] = {"url": str(url), "headers": dict(headers)}
return servers
def _preflight_mcp_servers(agent_name: str, servers: dict[str, dict]) -> None:
"""Warn at startup if MCP server auth headers have unresolved env vars."""
import re
for server_name, cfg in servers.items():
for header_key, header_val in cfg.get("headers", {}).items():
unresolved = re.findall(r"\$\{([^}]+)\}", str(header_val))
for var in unresolved:
val = os.environ.get(var, "")
if not val:
print(
f"[pallas] WARNING: {agent_name}{server_name}: "
f"{header_key} references ${{{var}}} but it is not set"
)
# ── Model registration ────────────────────────────────────────────────────────
def _register_unknown_models() -> None:
"""Register runtime model params for models not in fast-agent's ModelDatabase.
Reads ``default_model`` and ``model_capabilities`` from the active
fastagent.config.yaml. Capabilities (vision, context_window,
max_output_tokens) are declared explicitly in the config rather than
inferred from the model name, since naming conventions vary across
model families.
"""
config_path = _config_root() / "fastagent.config.yaml"
if not config_path.exists():
return
from fast_agent.llm.model_database import ModelDatabase, ModelParameters
with open(config_path) as f:
config = yaml.safe_load(f) or {}
default_model = config.get("default_model", "")
if not default_model:
return
model_name = default_model.split(".", 1)[-1] if "." in default_model else default_model
if ModelDatabase.get_model_params(model_name) is not None:
return
capabilities = config.get("model_capabilities", {})
is_vision = capabilities.get("vision", False)
context_window = capabilities.get("context_window", 131072)
max_output_tokens = capabilities.get("max_output_tokens", 16384)
if is_vision:
tokenizes = list(ModelDatabase.QWEN_MULTIMODAL)
print(f"[pallas] Registered model '{model_name}' with vision capabilities")
else:
tokenizes = list(ModelDatabase.TEXT_ONLY)
print(f"[pallas] Registered model '{model_name}' as text-only")
ModelDatabase.register_runtime_model_params(
model_name,
ModelParameters(
context_window=context_window,
max_output_tokens=max_output_tokens,
tokenizes=tokenizes,
),
)
# ── Agent lifecycle ───────────────────────────────────────────────────────────
async def _preflight() -> None:
from pallas.health import validate_llm_providers
_register_unknown_models()
await validate_llm_providers()
async def _start_agent(name: str, agents: dict[str, tuple[str, int]]) -> None:
from pallas.health import register_health_tool
module_path, port = agents[name]
module = importlib.import_module(module_path)
fast_instance = module.fast
print(f"[pallas] Starting {name} agent on port {port} ...")
async with fast_instance.run():
primary_instance = fast_instance._server_managed_instances[0]
server = MultimodalAgentMCPServer(
primary_instance=primary_instance,
create_instance=fast_instance._server_instance_factory,
dispose_instance=fast_instance._server_instance_dispose,
instance_scope="shared",
server_name=f"{fast_instance.name}-MCP-Server",
host="0.0.0.0",
get_registry_version=fast_instance._get_registry_version,
)
downstream_servers = _resolve_downstream_servers(fast_instance)
_preflight_mcp_servers(name, downstream_servers)
register_health_tool(server.mcp_server, downstream_servers)
await server.run_async(transport="http", host="0.0.0.0", port=port)
async def _wait_for_agent(
name: str,
agents: dict[str, tuple[str, int]],
timeout: float = 60.0,
) -> None:
import httpx
_, port = agents[name]
url = f"http://127.0.0.1:{port}/mcp"
deadline = asyncio.get_event_loop().time() + timeout
while asyncio.get_event_loop().time() < deadline:
try:
async with httpx.AsyncClient(timeout=2.0) as client:
await client.get(url)
print(f"[pallas] {name} is ready ✓")
return
except Exception:
await asyncio.sleep(1.0)
print(f"[pallas] WARNING: {name} did not become ready within {timeout}s")
async def _run_single(name: str, agents: dict[str, tuple[str, int]]) -> None:
await _preflight()
await _start_agent(name, agents)
async def _start_all(config: dict) -> None:
from pallas.registry import run_registry
agents = _build_agents_table(config)
agent_deps = _build_agent_deps(config)
registry_port = config.get("registry_port", 24200)
await _preflight()
# Identify subagents that must start first.
subagents: set[str] = set()
for dep_name, dep_list in agent_deps.items():
if dep_name in agents:
for sub in dep_list:
if sub in agents:
subagents.add(sub)
long_running: list[asyncio.Task] = []
long_running.append(
asyncio.create_task(run_registry(port=registry_port))
)
for name in sorted(subagents):
long_running.append(asyncio.create_task(_start_agent(name, agents)))
await asyncio.gather(*[_wait_for_agent(n, agents) for n in subagents])
for name in agents:
if name not in subagents:
long_running.append(asyncio.create_task(_start_agent(name, agents)))
await asyncio.gather(*long_running)
# ── CLI ───────────────────────────────────────────────────────────────────────
def main() -> None:
config = _load_deployment_config()
agents = _build_agents_table(config)
registry_port = config.get("registry_port", 24200)
deploy_name = config.get("name", "pallas")
parser = argparse.ArgumentParser(
description=f"{deploy_name.title()} — FastAgent MCP Bridge",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Port assignments:\n"
+ "\n".join(
f" {name:16s} port {port}"
for name, (_, port) in agents.items()
)
+ f"\n {'registry':16s} port {registry_port}"
),
)
parser.add_argument(
"--agent",
choices=list(agents.keys()),
metavar="AGENT",
help="Start a specific agent (default: all). Choices: %(choices)s",
)
args = parser.parse_args()
logging.getLogger("httpx").setLevel(logging.DEBUG)
logging.getLogger("httpcore").setLevel(logging.DEBUG)
if args.agent:
_, port = agents[args.agent]
print(f"[{deploy_name}] Starting {args.agent} agent on port {port} ...")
asyncio.run(_run_single(args.agent, agents))
else:
print(f"[{deploy_name}] Starting all agents + registry ...")
print(f" {'registry':16s} → http://0.0.0.0:{registry_port}/.well-known/mcp/server.json")
for name, (_, port) in agents.items():
print(f" {name:16s} → http://0.0.0.0:{port}/mcp")
asyncio.run(_start_all(config))
if __name__ == "__main__":
main()

22
pyproject.toml Normal file
View File

@@ -0,0 +1,22 @@
[project]
name = "pallas-mcp"
version = "0.1.0"
description = "FastAgent MCP Bridge — generic runtime for serving FastAgent agents over StreamableHTTP"
requires-python = ">=3.13"
dependencies = [
"fast-agent-mcp>=0.6.10",
"httpx",
"pyyaml",
"starlette",
"uvicorn",
]
[project.scripts]
pallas = "pallas.server:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["pallas"]