145 lines
4.9 KiB
Python
145 lines
4.9 KiB
Python
"""
|
|
CoAP Observe Manager (RFC 7641)
|
|
|
|
Manages observer registrations for CoAP resources on the ESP server side.
|
|
Each resource can have multiple observers. When the resource state changes,
|
|
all registered observers receive a notification.
|
|
|
|
Observer entry structure:
|
|
{
|
|
"ip": str, # Observer IP address
|
|
"port": int, # Observer UDP port
|
|
"token": bytearray, # Token from the original GET request (for matching)
|
|
}
|
|
"""
|
|
|
|
try:
|
|
import time
|
|
except ImportError:
|
|
import utime as time
|
|
|
|
|
|
class ObserveManager:
|
|
"""Manages CoAP Observe subscriptions per resource URI."""
|
|
|
|
# Maximum observers per resource (memory constrained on ESP)
|
|
MAX_OBSERVERS_PER_RESOURCE = 4
|
|
# Maximum total observers across all resources
|
|
MAX_TOTAL_OBSERVERS = 8
|
|
|
|
def __init__(self, debug=True):
|
|
# Dict of resource_url -> list of observer entries
|
|
self._observers = {}
|
|
# Per-resource sequence counter (24-bit, wraps at 0xFFFFFF)
|
|
self._sequence = {}
|
|
self.debug = debug
|
|
|
|
def log(self, s):
|
|
if self.debug:
|
|
print("[observe]: " + s)
|
|
|
|
def register(self, resource_url, ip, port, token):
|
|
"""Register an observer for a resource.
|
|
|
|
Returns True if successfully registered, False if limits exceeded.
|
|
"""
|
|
if resource_url not in self._observers:
|
|
self._observers[resource_url] = []
|
|
self._sequence[resource_url] = 0
|
|
|
|
observers = self._observers[resource_url]
|
|
|
|
# Check if this observer is already registered (same ip+port+token)
|
|
for obs in observers:
|
|
if obs["ip"] == ip and obs["port"] == port:
|
|
# Update the token (re-registration)
|
|
obs["token"] = token
|
|
self.log("Re-registered observer {}:{} for {}".format(ip, port, resource_url))
|
|
return True
|
|
|
|
# Check limits
|
|
total = sum(len(v) for v in self._observers.values())
|
|
if total >= self.MAX_TOTAL_OBSERVERS:
|
|
self.log("Max total observers reached, rejecting registration")
|
|
return False
|
|
|
|
if len(observers) >= self.MAX_OBSERVERS_PER_RESOURCE:
|
|
self.log("Max observers for {} reached, rejecting".format(resource_url))
|
|
return False
|
|
|
|
observers.append({
|
|
"ip": ip,
|
|
"port": port,
|
|
"token": token,
|
|
})
|
|
|
|
self.log("Registered observer {}:{} for {} (token={})".format(
|
|
ip, port, resource_url, token
|
|
))
|
|
return True
|
|
|
|
def deregister(self, resource_url, ip, port):
|
|
"""Remove an observer for a resource."""
|
|
if resource_url not in self._observers:
|
|
return
|
|
|
|
observers = self._observers[resource_url]
|
|
self._observers[resource_url] = [
|
|
obs for obs in observers
|
|
if not (obs["ip"] == ip and obs["port"] == port)
|
|
]
|
|
|
|
self.log("Deregistered observer {}:{} from {}".format(ip, port, resource_url))
|
|
|
|
def deregister_by_token(self, resource_url, token):
|
|
"""Remove an observer by token (used when RST is received)."""
|
|
if resource_url not in self._observers:
|
|
return
|
|
|
|
observers = self._observers[resource_url]
|
|
self._observers[resource_url] = [
|
|
obs for obs in observers
|
|
if obs["token"] != token
|
|
]
|
|
|
|
def deregister_all(self, resource_url):
|
|
"""Remove all observers for a resource."""
|
|
if resource_url in self._observers:
|
|
del self._observers[resource_url]
|
|
del self._sequence[resource_url]
|
|
|
|
def get_observers(self, resource_url):
|
|
"""Get the list of observers for a resource."""
|
|
return self._observers.get(resource_url, [])
|
|
|
|
def has_observers(self, resource_url):
|
|
"""Check if a resource has any registered observers."""
|
|
return len(self._observers.get(resource_url, [])) > 0
|
|
|
|
def next_sequence(self, resource_url):
|
|
"""Get and increment the sequence number for a resource (24-bit wrap)."""
|
|
if resource_url not in self._sequence:
|
|
self._sequence[resource_url] = 0
|
|
|
|
seq = self._sequence[resource_url]
|
|
self._sequence[resource_url] = (seq + 1) & 0xFFFFFF
|
|
return seq
|
|
|
|
def get_all_resources(self):
|
|
"""Get all resource URLs that have observers."""
|
|
return list(self._observers.keys())
|
|
|
|
def observer_count(self, resource_url=None):
|
|
"""Get observer count. If resource_url is None, returns total count."""
|
|
if resource_url:
|
|
return len(self._observers.get(resource_url, []))
|
|
return sum(len(v) for v in self._observers.values())
|
|
|
|
def summary(self):
|
|
"""Return a summary string of all observer registrations."""
|
|
parts = []
|
|
for url, observers in self._observers.items():
|
|
addrs = ["{}:{}".format(o["ip"], o["port"]) for o in observers]
|
|
parts.append("{} -> [{}]".format(url, ", ".join(addrs)))
|
|
return "; ".join(parts) if parts else "(no observers)"
|