""" Demeter Server — Prometheus Metrics Defines gauges and counters for sensor readings, device status, and CoAP communication. Exposes a /metrics endpoint for Prometheus scraping. Uses a dedicated CollectorRegistry to avoid conflicts when creating multiple app instances (e.g., in tests). """ from __future__ import annotations import logging from typing import Any from prometheus_client import ( CollectorRegistry, Counter, Gauge, Info, generate_latest, ) logger = logging.getLogger(__name__) class MetricsCollector: """Manages all Prometheus metrics for the Demeter server.""" def __init__(self, registry: CollectorRegistry | None = None) -> None: self.registry = registry or CollectorRegistry() # Server info self.server_info = Info( "demeter_server", "Demeter IoT management server information", registry=self.registry, ) # Sensor readings (the core metric) self.sensor_reading = Gauge( "demeter_sensor_reading", "Latest sensor reading value", ["device", "resource", "unit"], registry=self.registry, ) # Device online status self.device_online = Gauge( "demeter_device_online", "Whether the device is online (1) or offline (0)", ["device"], registry=self.registry, ) # Last reading timestamp (unix epoch) self.last_reading_ts = Gauge( "demeter_last_reading_timestamp_seconds", "Unix timestamp of the last reading received", ["device", "resource"], registry=self.registry, ) # CoAP request counters self.coap_requests = Counter( "demeter_coap_requests_total", "Total CoAP requests/notifications received", ["device"], registry=self.registry, ) self.coap_errors = Counter( "demeter_coap_errors_total", "Total CoAP communication errors", ["device", "error_type"], registry=self.registry, ) # Active subscriptions self.active_subscriptions = Gauge( "demeter_active_subscriptions", "Number of active CoAP Observe subscriptions", registry=self.registry, ) def set_server_info(self, version: str) -> None: """Set server version info.""" self.server_info.info({"version": version}) def update_sensor_metric( self, device_id: str, resource_uri: str, value: Any, unit: str, ) -> None: """Update the sensor reading gauge from a notification.""" try: numeric = float(value) except (TypeError, ValueError): # Non-numeric values (e.g., trigger state) — store as 0/1 numeric = float(value) if isinstance(value, (int, bool)) else 0.0 self.sensor_reading.labels( device=device_id, resource=resource_uri, unit=unit ).set(numeric) self.last_reading_ts.labels( device=device_id, resource=resource_uri ).set_to_current_time() def set_device_online(self, device_id: str, online: bool) -> None: """Update device online/offline status.""" self.device_online.labels(device=device_id).set(1 if online else 0) def record_coap_request(self, device_id: str) -> None: """Increment CoAP request counter.""" self.coap_requests.labels(device=device_id).inc() def record_coap_error(self, device_id: str, error_type: str) -> None: """Increment CoAP error counter.""" self.coap_errors.labels(device=device_id, error_type=error_type).inc() def set_active_subscriptions(self, count: int) -> None: """Update the active subscription gauge.""" self.active_subscriptions.set(count) def generate(self) -> bytes: """Generate Prometheus metrics output from this collector's registry.""" return generate_latest(self.registry)