Production-Ready Standards for Building Django-Backed MCP Servers
📘 Purpose: This document defines standards for building MCP servers that integrate with Django REST APIs, using the Python MCP SDK (FastMCP) for consistent, maintainable, production-ready implementations.
Audience: Python developers building MCP servers that connect to Django backends.
AI Client (Claude Desktop)
↓ JWT Bearer Token
CORS Middleware
↓
Correlation ID Middleware (adds X-Correlation-ID)
↓
Rate Limit Middleware (per-client IP throttling)
↓
FastMCP Server (tools/resources routing)
↓ Context object (correlation_id, session, request_id)
Tool/Resource Handler
↓ API Key + Correlation ID
Django API Client (async httpx)
↓ Authorization: Api-Key {key}
Django REST API
↓
Database
from mcp.server.fastmcp import FastMCP
from mcp.server.auth.settings import AuthSettings
from pydantic import AnyHttpUrl
from security import SecureTokenVerifier
# Create token verifier
token_verifier = SecureTokenVerifier(
jwt_secret=config.security.jwt_secret,
jwt_algorithm=config.security.jwt_algorithm
)
# Create FastMCP server with authentication
mcp = FastMCP(
name="my-django-mcp-server",
token_verifier=token_verifier,
auth=AuthSettings(
issuer_url=AnyHttpUrl("https://auth.mydomain.com"),
resource_server_url=AnyHttpUrl(f"http://{config.server.host}:{config.server.port}"),
required_scopes=["read", "write"]
),
stateless_http=True, # Enable Streamable HTTP transport
json_response=True # Return JSON responses (not SSE streams)
)
# For testing in trusted networks only
mcp = FastMCP(
name="my-django-mcp-server",
stateless_http=True,
json_response=True
)
from mcp.server.fastmcp import Context
from pydantic import Field
from typing import Optional, Dict, Any
@mcp.tool()
async def get_opportunities(
ctx: Context,
status: Optional[str] = None,
client_id: Optional[int] = None,
limit: int = Field(default=10, ge=1, le=100)
) -> Dict[str, Any]:
"""Retrieve business opportunities with optional filtering.
Args:
ctx: Context object with correlation_id, session, request_id
status: Filter by status (active, won, lost, dropped)
client_id: Filter by client ID
limit: Maximum number of results (1-100)
Returns:
Dictionary with count and opportunities list
"""
# Extract correlation ID from context
correlation_id = getattr(ctx.request_context, 'correlation_id', None)
try:
# Build query parameters
params = {"limit": limit}
if status and status in ["active", "won", "lost", "dropped"]:
params["status"] = status
if client_id and client_id > 0:
params["client"] = client_id
# Call Django API with correlation tracking
result = await self.api_client.request(
"GET",
"/api/v1/orbit/opportunities/",
params=params,
correlation_id=correlation_id
)
opportunities = result.get("results", [])
return {
"count": len(opportunities),
"opportunities": [
{
"id": opp["id"],
"name": opp["name"],
"status": opp["status"],
"client": opp.get("client", {}).get("name"),
"value": opp.get("value"),
"stage": opp.get("stage")
}
for opp in opportunities
]
}
except Exception as e:
logger.error(
f"Error retrieving opportunities: {str(e)}",
extra={"correlation_id": correlation_id}
)
return {
"error": "Failed to retrieve opportunities",
"count": 0,
"opportunities": []
}
ALWAYS include ctx: Context as the first parameter in tool functions to access:
ctx.request_context.correlation_id - Request tracking IDctx.request_id - MCP request IDctx.session - Session for sending notificationsctx.request_context - Full request context@mcp.resource("client://{client_id}")
async def get_client_details(client_id: int) -> str:
"""Get detailed information about a specific client.
Returns formatted text content for display.
"""
try:
if client_id <= 0:
return "Error: Invalid client ID"
result = await self.api_client.request(
"GET",
f"/api/v1/orbit/clients/{client_id}/"
)
client = result
output = f"Client Details: {client['name']}\n"
output += "=" * (len(client['name']) + 16) + "\n\n"
output += f"ID: {client['id']}\n"
output += f"Legal Name: {client.get('legal_name', 'N/A')}\n"
output += f"Type: {client.get('client_type', 'N/A')}\n"
output += f"Vertical: {client.get('vertical', 'N/A')}\n"
if client.get('overview'):
output += f"\nOverview:\n{client['overview']}\n"
return output
except Exception as e:
logger.error(f"Error retrieving client {client_id}: {str(e)}")
return "Error: Unable to retrieve client details"
"""Django API client with retry logic and correlation tracking."""
import logging
import time
from typing import Any, Dict, Optional
import httpx
logger = logging.getLogger(__name__)
class SecureAPIClient:
"""Async HTTP client for Django REST API communication."""
def __init__(
self,
base_url: str,
api_key: str,
timeout: int = 30,
max_retries: int = 3,
connection_pool_size: int = 20
):
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.max_retries = max_retries
self.connection_pool_size = connection_pool_size
self.client: Optional[httpx.AsyncClient] = None
self._health_status = True
self._last_health_check = 0.0
async def __aenter__(self):
"""Async context manager entry."""
self.client = httpx.AsyncClient(
base_url=self.base_url,
timeout=httpx.Timeout(self.timeout),
limits=httpx.Limits(max_connections=self.connection_pool_size),
headers={
"Authorization": f"Api-Key {self.api_key}",
"User-Agent": "My-MCP-Server/1.0.0",
"Accept": "application/json",
"Content-Type": "application/json"
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
if self.client:
await self.client.aclose()
async def health_check(self) -> bool:
"""Check API health using a lightweight endpoint."""
try:
if not self.client:
return False
# Use Django's stats or health endpoint
response = await self.client.get("/api/v1/core/stats/", timeout=5.0)
self._health_status = response.status_code == 200
self._last_health_check = time.time()
return self._health_status
except Exception as e:
logger.error(f"Health check failed: {str(e)}")
self._health_status = False
return False
async def request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
correlation_id: Optional[str] = None
) -> Dict[str, Any]:
"""Make authenticated request to Django API with retry logic."""
import asyncio
if not self.client:
raise RuntimeError("Client not initialized")
# Add correlation ID to headers
headers = {}
if correlation_id:
headers["X-Correlation-ID"] = correlation_id
# Retry with exponential backoff
for attempt in range(self.max_retries + 1):
try:
if method.upper() == "GET":
response = await self.client.get(
endpoint,
params=params,
headers=headers
)
elif method.upper() == "POST":
response = await self.client.post(
endpoint,
json=json_data,
headers=headers
)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP {e.response.status_code} from {endpoint}",
extra={"correlation_id": correlation_id}
)
if e.response.status_code < 500:
# Client error - don't retry
raise
if attempt == self.max_retries:
raise
except Exception as e:
logger.error(
f"Request failed (attempt {attempt + 1}/{self.max_retries + 1}): {str(e)}",
extra={"correlation_id": correlation_id}
)
if attempt == self.max_retries:
raise
# Exponential backoff
if attempt < self.max_retries:
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
Use Django REST Framework's built-in API key authentication:
# In Django settings.py
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework.authentication.TokenAuthentication',
],
}
# Authorization header format
Authorization: Api-Key {django_api_key}
Propagate correlation IDs from MCP context to Django API for end-to-end tracing:
from starlette.middleware.base import BaseHTTPMiddleware
import uuid
class CorrelationMiddleware(BaseHTTPMiddleware):
"""Add correlation IDs to all requests for tracing."""
async def dispatch(self, request, call_next):
# Generate or extract correlation ID
correlation_id = request.headers.get(
"x-correlation-id",
str(uuid.uuid4())
)
# Add to request state for tool handlers
request.state.correlation_id = correlation_id
response = await call_next(request)
# Add to response headers
response.headers["X-Correlation-ID"] = correlation_id
return response
"""JWT token verifier for FastMCP authentication."""
import jwt
from typing import Set
from datetime import datetime, UTC
from mcp.server.auth.provider import AccessToken, TokenVerifier
class SecureTokenVerifier(TokenVerifier):
"""Secure JWT token verifier implementation."""
def __init__(self, jwt_secret: str, jwt_algorithm: str = "HS256"):
self.jwt_secret = jwt_secret
self.jwt_algorithm = jwt_algorithm
self.blocked_tokens: Set[str] = set()
async def verify_token(self, token: str) -> AccessToken | None:
"""Verify JWT token and return access token if valid."""
try:
# Check if token is blocked
if token in self.blocked_tokens:
return None
# Decode and verify JWT
payload = jwt.decode(
token,
self.jwt_secret,
algorithms=[self.jwt_algorithm]
)
# Validate expiration
exp = payload.get("exp")
if exp and datetime.fromtimestamp(exp, tz=UTC) < datetime.now(UTC):
return None
# Return access token
return AccessToken(
access_token=token,
scopes=payload.get("scopes", []),
user_id=payload.get("sub")
)
except jwt.InvalidTokenError:
return None
except Exception as e:
logger.error(f"Token verification error: {str(e)}")
return None
"""Rate limiter with middleware integration."""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
@dataclass
class RateLimitEntry:
"""Rate limiting entry for tracking requests."""
count: int = 0
window_start: float = field(default_factory=time.time)
def is_expired(self, window_size: int) -> bool:
return time.time() - self.window_start > window_size
def increment(self) -> None:
self.count += 1
class RateLimiter:
"""Thread-safe rate limiter."""
def __init__(self, max_requests: int, window_size: int):
self.max_requests = max_requests
self.window_size = window_size
self.clients: Dict[str, RateLimitEntry] = {}
self._lock = asyncio.Lock()
async def is_allowed(self, client_id: str) -> bool:
"""Check if client is allowed to make a request."""
async with self._lock:
now = time.time()
# Clean up expired entries
expired = [
cid for cid, entry in self.clients.items()
if entry.is_expired(self.window_size)
]
for cid in expired:
del self.clients[cid]
# Check current client
if client_id not in self.clients:
self.clients[client_id] = RateLimitEntry()
entry = self.clients[client_id]
# Reset window if expired
if entry.is_expired(self.window_size):
entry.count = 0
entry.window_start = now
# Check limit
if entry.count >= self.max_requests:
return False
entry.increment()
return True
class RateLimitMiddleware(BaseHTTPMiddleware):
"""Rate limiting middleware."""
def __init__(self, app, rate_limiter: RateLimiter):
super().__init__(app)
self.rate_limiter = rate_limiter
async def dispatch(self, request, call_next):
# Get client ID from IP address
client_ip = request.client.host if request.client else "unknown"
# Skip health check endpoints
if request.url.path in ["/health", "/live/", "/ready/", "/metrics"]:
return await call_next(request)
# Check rate limit
if not await self.rate_limiter.is_allowed(client_ip):
return JSONResponse(
{
"error": "Rate limit exceeded",
"message": "Too many requests. Please try again later."
},
status_code=429
)
return await call_next(request)
"""Configuration models with validation."""
from typing import List
from pydantic import BaseModel, Field, AnyHttpUrl, field_validator
class SecurityConfig(BaseModel):
"""Security configuration settings."""
jwt_secret: str = Field(..., min_length=32, description="JWT signing secret")
jwt_algorithm: str = Field(default="HS256")
rate_limit_requests: int = Field(default=100, ge=1)
rate_limit_window: int = Field(default=3600, ge=60)
allowed_origins: List[str] = Field(default=["*"])
class DatabaseConfig(BaseModel):
"""Django API configuration."""
base_url: AnyHttpUrl = Field(..., description="Django API base URL")
api_key: str = Field(..., description="Django API key")
timeout: int = Field(default=30, ge=5, le=300)
max_retries: int = Field(default=3, ge=0, le=10)
connection_pool_size: int = Field(default=20, ge=1, le=100)
@field_validator('base_url')
@classmethod
def validate_base_url(cls, v):
return str(v).rstrip('/')
class ServerConfig(BaseModel):
"""Server configuration."""
name: str = Field(default="my-mcp-server")
version: str = Field(default="1.0.0")
host: str = Field(default="0.0.0.0")
port: int = Field(default=8080, ge=1024, le=65535)
log_level: str = Field(default="INFO")
environment: str = Field(default="production")
class Config(BaseModel):
"""Complete application configuration."""
security: SecurityConfig
database: DatabaseConfig
server: ServerConfig = ServerConfig()
"""Load configuration from environment variables."""
import os
import secrets
from pathlib import Path
def load_config(config_path: Optional[str] = None) -> Config:
"""Load configuration from file or environment variables."""
# Try loading from JSON file first
if config_path and Path(config_path).exists():
with open(config_path) as f:
config_data = json.load(f)
return Config(**config_data)
# Load from environment variables
return Config(
security=SecurityConfig(
jwt_secret=os.getenv("JWT_SECRET", secrets.token_hex(32)),
rate_limit_requests=int(os.getenv("RATE_LIMIT_REQUESTS", "100")),
rate_limit_window=int(os.getenv("RATE_LIMIT_WINDOW", "3600")),
),
database=DatabaseConfig(
base_url=os.getenv("DJANGO_BASE_URL", "http://localhost:8000"),
api_key=os.getenv("DJANGO_API_KEY", ""),
),
server=ServerConfig(
host=os.getenv("MCP_HOST", "0.0.0.0"),
port=int(os.getenv("PORT", "8080")),
log_level=os.getenv("LOG_LEVEL", "INFO"),
environment=os.getenv("ENVIRONMENT", "production"),
)
)
# Required Variables
DJANGO_BASE_URL=https://api.mydomain.com
DJANGO_API_KEY=your-django-api-key
JWT_SECRET=minimum-32-character-secret-here
# Standard MCP Docker Spec Variables
PORT=8080
LOG_LEVEL=INFO
LOG_FORMAT=json
ENVIRONMENT=production
# Optional: Security
RATE_LIMIT_REQUESTS=100
RATE_LIMIT_WINDOW=3600
AUTH_ENABLED=true
# Optional: Testing/Development
DEBUG=false
"""Health check endpoints following Kubernetes standards."""
from datetime import datetime, UTC
from starlette.responses import JSONResponse
from starlette.routing import Route
async def live_endpoint(request):
"""Liveness probe - is the process alive?"""
return JSONResponse({
"status": "alive",
"timestamp": datetime.now(UTC).isoformat(),
"version": "1.0.0"
})
async def ready_endpoint(request):
"""Readiness probe - is the service ready to serve traffic?"""
checks = {}
# Check Django API health
api_client = request.app.state.api_client
api_healthy = await api_client.health_check() if api_client else False
checks["api_backend"] = "ok" if api_healthy else "error"
# Add more checks as needed (database, cache, etc.)
all_ok = all(v == "ok" for v in checks.values())
status = "ready" if all_ok else "not_ready"
code = 200 if all_ok else 503
return JSONResponse({
"status": status,
"checks": checks,
"timestamp": datetime.now(UTC).isoformat()
}, status_code=code)
async def health_endpoint(request):
"""Combined health check endpoint."""
api_client = request.app.state.api_client
api_healthy = await api_client.health_check() if api_client else False
return JSONResponse({
"status": "healthy" if api_healthy else "degraded",
"timestamp": datetime.now(UTC).isoformat(),
"version": "1.0.0",
"api_status": "healthy" if api_healthy else "unhealthy"
}, status_code=200 if api_healthy else 503)
"""Prometheus metrics endpoint."""
from starlette.responses import PlainTextResponse
async def metrics_endpoint(request):
"""Prometheus-format metrics endpoint."""
# Accept text/plain for Prometheus scraping
want_prom = "text/plain" in request.headers.get("accept", "")
timestamp = int(datetime.now(UTC).timestamp())
# Collect metrics
metrics = {
"requests_total": getattr(request.app.state, "request_count", 0),
"errors_total": getattr(request.app.state, "error_count", 0),
"rate_limit_clients": len(request.app.state.rate_limiter.clients),
}
if want_prom:
lines = [
'# HELP mcp_requests_total Total HTTP requests processed',
'# TYPE mcp_requests_total counter',
f'mcp_requests_total {metrics["requests_total"]}',
'# HELP mcp_errors_total Total HTTP errors',
'# TYPE mcp_errors_total counter',
f'mcp_errors_total {metrics["errors_total"]}',
'# HELP mcp_rate_limit_clients Number of rate limited clients',
'# TYPE mcp_rate_limit_clients gauge',
f'mcp_rate_limit_clients {metrics["rate_limit_clients"]}',
'# HELP mcp_metrics_timestamp Metrics timestamp',
'# TYPE mcp_metrics_timestamp gauge',
f'mcp_metrics_timestamp {timestamp}'
]
return PlainTextResponse("\n".join(lines) + "\n", media_type="text/plain")
else:
return JSONResponse(metrics)
"""Structured JSON logging for production."""
import json
import logging
from datetime import datetime, UTC
class JSONFormatter(logging.Formatter):
"""JSON formatter for structured logging."""
def __init__(self, service_name="mcp-server", version="1.0.0"):
super().__init__()
self.service_name = service_name
self.version = version
self.environment = os.getenv("ENVIRONMENT", "unknown")
def format(self, record):
"""Format log record as JSON."""
log_data = {
"timestamp": datetime.fromtimestamp(record.created, tz=UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"environment": self.environment,
"service": self.service_name,
"version": self.version,
}
# Add correlation_id if available
if hasattr(record, 'correlation_id'):
log_data["correlation_id"] = record.correlation_id
# Add exception info if present
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
# Configure logging
log_format = os.getenv("LOG_FORMAT", "json").lower()
if log_format == "json":
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logging.root.addHandler(handler)
logging.root.setLevel(logging.INFO)
else:
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# In tool handlers
logger.error(
f"Error retrieving data: {str(e)}",
extra={"correlation_id": correlation_id}
)
# Production-ready Dockerfile for Django MCP Server
FROM python:3.12-slim
WORKDIR /app
# Install runtime dependencies
RUN apt-get update --yes --quiet && \
apt-get install --yes --quiet --no-install-recommends curl && \
rm -rf /var/lib/apt/lists/*
# Create non-root user
RUN useradd --system --create-home --uid 1000 mcpuser && \
chown -R mcpuser:mcpuser /app
# Copy and install dependencies
COPY --chown=mcpuser:mcpuser pyproject.toml .
RUN pip install --upgrade pip setuptools wheel && \
pip install --no-cache-dir .
# Copy application
COPY --chown=mcpuser:mcpuser . .
# Create /tmp volume for MCP spec compliance
RUN mkdir -p /tmp/mcp_server && \
chown mcpuser:mcpuser /tmp/mcp_server && \
chmod 755 /tmp/mcp_server
# Declare volume
VOLUME ["/tmp/mcp_server"]
USER mcpuser
EXPOSE 8080
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8080/health || exit 1
# Run application
CMD ["python", "server.py"]
version: '3.8'
services:
mcp-server:
build: .
image: my-django-mcp-server:latest
container_name: mcp-server
ports:
- "8080:8080"
env_file:
- .env
volumes:
- mcp_tmp:/tmp/mcp_server
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 10s
volumes:
mcp_tmp:
"""Background task for /tmp cleanup."""
import asyncio
import shutil
import time
from pathlib import Path
async def cleanup_tmp_files(
tmp_dir: Path,
max_age_hours: int = 24,
interval_hours: int = 6
):
"""Background task to clean up old temporary files."""
while True:
try:
await asyncio.sleep(interval_hours * 3600)
cutoff_time = time.time() - (max_age_hours * 3600)
cleaned_count = 0
cleaned_size = 0
for item in tmp_dir.iterdir():
try:
stat = item.stat()
if stat.st_mtime < cutoff_time:
size = stat.st_size
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
cleaned_count += 1
cleaned_size += size
except (PermissionError, OSError):
continue
if cleaned_count > 0:
logger.info(
f"Cleaned {cleaned_count} items from {tmp_dir} "
f"({cleaned_size / 1024 / 1024:.2f} MB)"
)
except Exception as e:
logger.error(f"Cleanup error: {str(e)}")
#!/usr/bin/env python3
"""Django MCP Server - Production Implementation."""
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Optional
import uvicorn
from pydantic import AnyHttpUrl
from starlette.applications import Starlette
from starlette.middleware.cors import CORSMiddleware
from starlette.routing import Route
from mcp.server.fastmcp import FastMCP, Context
from mcp.server.auth.settings import AuthSettings
from config import load_config
from security import (
RateLimiter,
SecureTokenVerifier,
CorrelationMiddleware,
RateLimitMiddleware
)
from api_client import SecureAPIClient
from logging_config import setup_logging
logger = logging.getLogger(__name__)
class DjangoMCPServer:
"""Django-backed MCP Server implementation."""
def __init__(self, config):
self.config = config
self.api_client: Optional[SecureAPIClient] = None
self._cleanup_task: Optional[asyncio.Task] = None
# Security components
self.rate_limiter = RateLimiter(
config.security.rate_limit_requests,
config.security.rate_limit_window
)
self.token_verifier = SecureTokenVerifier(
config.security.jwt_secret,
config.security.jwt_algorithm
)
# Create FastMCP server
auth_enabled = os.getenv("AUTH_ENABLED", "true").lower() == "true"
if auth_enabled:
self.mcp = FastMCP(
name=config.server.name,
token_verifier=self.token_verifier,
auth=AuthSettings(
issuer_url=AnyHttpUrl("https://auth.mydomain.com"),
resource_server_url=AnyHttpUrl(
f"http://{config.server.host}:{config.server.port}"
),
required_scopes=["read", "write"]
),
stateless_http=True,
json_response=True
)
else:
logger.warning("AUTH DISABLED - Testing mode only!")
self.mcp = FastMCP(
name=config.server.name,
stateless_http=True,
json_response=True
)
self._setup_tools()
self._setup_resources()
def _setup_tools(self):
"""Register MCP tools."""
# Tools implementation here...
pass
def _setup_resources(self):
"""Register MCP resources."""
# Resources implementation here...
pass
@asynccontextmanager
async def lifespan(self, app: Starlette):
"""Manage application lifecycle."""
# Startup
logger.info(f"Starting {self.config.server.name}...")
self.api_client = SecureAPIClient(
base_url=self.config.database.base_url,
api_key=self.config.database.api_key,
timeout=self.config.database.timeout,
max_retries=self.config.database.max_retries
)
await self.api_client.__aenter__()
# Health check
if not await self.api_client.health_check():
logger.warning("Django API health check failed")
# Start cleanup task
tmp_dir = Path("/tmp/mcp_server")
tmp_dir.mkdir(exist_ok=True)
self._cleanup_task = asyncio.create_task(
cleanup_tmp_files(tmp_dir)
)
async with self.mcp.session_manager.run():
logger.info(f"Server ready on {self.config.server.host}:{self.config.server.port}")
try:
yield
finally:
# Shutdown
logger.info("Shutting down...")
if self._cleanup_task:
self._cleanup_task.cancel()
try:
await self._cleanup_task
except asyncio.CancelledError:
pass
if self.api_client:
await self.api_client.__aexit__(None, None, None)
logger.info("Shutdown complete")
def create_app(self) -> Starlette:
"""Create Starlette application."""
app = self.mcp.streamable_http_app()
# Add health endpoints
app.routes.insert(0, Route("/live/", self.live_endpoint, methods=["GET"]))
app.routes.insert(0, Route("/ready/", self.ready_endpoint, methods=["GET"]))
app.routes.insert(0, Route("/health", self.health_endpoint, methods=["GET"]))
app.routes.insert(0, Route("/metrics", self.metrics_endpoint, methods=["GET"]))
# Replace lifespan
app.router.lifespan_context = self.lifespan
# Add middleware
app.add_middleware(CorrelationMiddleware)
app.add_middleware(RateLimitMiddleware, rate_limiter=self.rate_limiter)
app.add_middleware(
CORSMiddleware,
allow_origins=self.config.security.allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "OPTIONS"],
allow_headers=["*"]
)
# Store references
app.state.api_client = self.api_client
app.state.rate_limiter = self.rate_limiter
return app
def main():
"""Main entry point."""
from dotenv import load_dotenv
load_dotenv()
# Load configuration
config = load_config()
# Setup logging
setup_logging(config)
# Validate required variables
if not config.database.api_key:
logger.error("DJANGO_API_KEY is required")
return 1
# Create server
server = DjangoMCPServer(config)
app = server.create_app()
# Run server
uvicorn.run(
app,
host=config.server.host,
port=config.server.port,
log_level=config.server.log_level.lower(),
access_log=False
)
if __name__ == "__main__":
exit(main())