132 lines
4.0 KiB
Python
132 lines
4.0 KiB
Python
"""
|
|
Demeter Server — Prometheus Metrics
|
|
|
|
Defines gauges and counters for sensor readings, device status,
|
|
and CoAP communication. Exposes a /metrics endpoint for Prometheus
|
|
scraping.
|
|
|
|
Uses a dedicated CollectorRegistry to avoid conflicts when creating
|
|
multiple app instances (e.g., in tests).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any
|
|
|
|
from prometheus_client import (
|
|
CollectorRegistry,
|
|
Counter,
|
|
Gauge,
|
|
Info,
|
|
generate_latest,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MetricsCollector:
|
|
"""Manages all Prometheus metrics for the Demeter server."""
|
|
|
|
def __init__(self, registry: CollectorRegistry | None = None) -> None:
|
|
self.registry = registry or CollectorRegistry()
|
|
|
|
# Server info
|
|
self.server_info = Info(
|
|
"demeter_server",
|
|
"Demeter IoT management server information",
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Sensor readings (the core metric)
|
|
self.sensor_reading = Gauge(
|
|
"demeter_sensor_reading",
|
|
"Latest sensor reading value",
|
|
["device", "resource", "unit"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Device online status
|
|
self.device_online = Gauge(
|
|
"demeter_device_online",
|
|
"Whether the device is online (1) or offline (0)",
|
|
["device"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Last reading timestamp (unix epoch)
|
|
self.last_reading_ts = Gauge(
|
|
"demeter_last_reading_timestamp_seconds",
|
|
"Unix timestamp of the last reading received",
|
|
["device", "resource"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# CoAP request counters
|
|
self.coap_requests = Counter(
|
|
"demeter_coap_requests_total",
|
|
"Total CoAP requests/notifications received",
|
|
["device"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.coap_errors = Counter(
|
|
"demeter_coap_errors_total",
|
|
"Total CoAP communication errors",
|
|
["device", "error_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Active subscriptions
|
|
self.active_subscriptions = Gauge(
|
|
"demeter_active_subscriptions",
|
|
"Number of active CoAP Observe subscriptions",
|
|
registry=self.registry,
|
|
)
|
|
|
|
def set_server_info(self, version: str) -> None:
|
|
"""Set server version info."""
|
|
self.server_info.info({"version": version})
|
|
|
|
def update_sensor_metric(
|
|
self,
|
|
device_id: str,
|
|
resource_uri: str,
|
|
value: Any,
|
|
unit: str,
|
|
) -> None:
|
|
"""Update the sensor reading gauge from a notification."""
|
|
try:
|
|
numeric = float(value)
|
|
except (TypeError, ValueError):
|
|
# Non-numeric values (e.g., trigger state) — store as 0/1
|
|
numeric = float(value) if isinstance(value, (int, bool)) else 0.0
|
|
|
|
self.sensor_reading.labels(
|
|
device=device_id, resource=resource_uri, unit=unit
|
|
).set(numeric)
|
|
|
|
self.last_reading_ts.labels(
|
|
device=device_id, resource=resource_uri
|
|
).set_to_current_time()
|
|
|
|
def set_device_online(self, device_id: str, online: bool) -> None:
|
|
"""Update device online/offline status."""
|
|
self.device_online.labels(device=device_id).set(1 if online else 0)
|
|
|
|
def record_coap_request(self, device_id: str) -> None:
|
|
"""Increment CoAP request counter."""
|
|
self.coap_requests.labels(device=device_id).inc()
|
|
|
|
def record_coap_error(self, device_id: str, error_type: str) -> None:
|
|
"""Increment CoAP error counter."""
|
|
self.coap_errors.labels(device=device_id, error_type=error_type).inc()
|
|
|
|
def set_active_subscriptions(self, count: int) -> None:
|
|
"""Update the active subscription gauge."""
|
|
self.active_subscriptions.set(count)
|
|
|
|
def generate(self) -> bytes:
|
|
"""Generate Prometheus metrics output from this collector's registry."""
|
|
return generate_latest(self.registry)
|