Files
demeter/esp/microcoapy/microcoapy.py

513 lines
18 KiB
Python

"""
microCoAPy - Extended for Demeter with CoAP Observe (RFC 7641)
Changes from upstream microCoAPy:
- Observer registration/deregistration on incoming GET with Option 6
- notifyObservers() method for server-side Observe notifications
- observeGet() client method for subscribing to observable resources
- RST handling to remove observers
- Per-resource Max-Age support in notifications
"""
try:
import socket
except ImportError:
import usocket as socket
try:
import os
except ImportError:
import uos as os
try:
import time
except ImportError:
import utime as time
import binascii
from . import coap_macros as macros
from .coap_packet import CoapPacket
from .coap_reader import parsePacketHeaderInfo
from .coap_reader import parsePacketOptionsAndPayload
from .coap_writer import writePacketHeaderInfo
from .coap_writer import writePacketOptions
from .coap_writer import writePacketPayload
from .observe_manager import ObserveManager
class Coap:
TRANSMISSION_STATE = macros.enum(
STATE_IDLE=0,
STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA=1
)
def __init__(self):
self.debug = True
self.sock = None
self.callbacks = {}
self.responseCallback = None
self.port = 0
self.isServer = False
self.state = self.TRANSMISSION_STATE.STATE_IDLE
self.isCustomSocket = False
# Observe manager (RFC 7641)
self.observe = ObserveManager(debug=True)
# Beta flags
self.discardRetransmissions = False
self.lastPacketStr = ""
def log(self, s):
if self.debug:
print("[microcoapy]: " + s)
# ── Socket Management ──
def start(self, port=macros._COAP_DEFAULT_PORT):
"""Create and bind a UDP socket."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind(("", port))
def stop(self):
"""Close the socket."""
if self.sock is not None:
self.sock.close()
self.sock = None
def setCustomSocket(self, custom_socket):
"""Use a custom UDP socket implementation."""
self.stop()
self.isCustomSocket = True
self.sock = custom_socket
# ── Callback Registration ──
def addIncomingRequestCallback(self, requestUrl, callback):
"""Register a callback for incoming requests to a URL.
The callback signature is:
callback(packet, senderIp, senderPort)
For observable resources, the callback should return a tuple:
(payload_str, content_format)
This allows notifyObservers to retrieve the current value.
If the callback returns None, the server handles it as before
(callback is responsible for sending its own response).
"""
self.callbacks[requestUrl] = callback
self.isServer = True
# ── Packet Sending ──
def sendPacket(self, ip, port, coapPacket):
"""Serialize and send a CoAP packet."""
if coapPacket.content_format != macros.COAP_CONTENT_FORMAT.COAP_NONE:
optionBuffer = bytearray(2)
optionBuffer[0] = (coapPacket.content_format & 0xFF00) >> 8
optionBuffer[1] = coapPacket.content_format & 0x00FF
coapPacket.addOption(
macros.COAP_OPTION_NUMBER.COAP_CONTENT_FORMAT, optionBuffer
)
if (coapPacket.query is not None) and (len(coapPacket.query) > 0):
coapPacket.addOption(
macros.COAP_OPTION_NUMBER.COAP_URI_QUERY, coapPacket.query
)
buffer = bytearray()
writePacketHeaderInfo(buffer, coapPacket)
writePacketOptions(buffer, coapPacket)
writePacketPayload(buffer, coapPacket)
status = 0
try:
sockaddr = (ip, port)
try:
sockaddr = socket.getaddrinfo(ip, port)[0][-1]
except Exception:
pass
status = self.sock.sendto(buffer, sockaddr)
if status > 0:
status = coapPacket.messageid
self.log("Packet sent. messageid: " + str(status))
except Exception as e:
status = 0
print("Exception while sending packet...")
import sys
sys.print_exception(e)
return status
def send(self, ip, port, url, type, method, token, payload, content_format, query_option):
"""Build and send a CoAP request."""
packet = CoapPacket()
packet.type = type
packet.method = method
packet.token = token
packet.payload = payload
packet.content_format = content_format
packet.query = query_option
return self.sendEx(ip, port, url, packet)
def sendEx(self, ip, port, url, packet):
"""Send a packet with auto-generated message ID and URI options."""
self.state = self.TRANSMISSION_STATE.STATE_IDLE
randBytes = os.urandom(2)
packet.messageid = (randBytes[0] << 8) | randBytes[1]
packet.setUriHost(ip)
packet.setUriPath(url)
return self.sendPacket(ip, port, packet)
def sendResponse(self, ip, port, messageid, payload, method, content_format, token):
"""Send a response (ACK) packet."""
packet = CoapPacket()
packet.type = macros.COAP_TYPE.COAP_ACK
packet.method = method
packet.token = token
packet.payload = payload
packet.messageid = messageid
packet.content_format = content_format
return self.sendPacket(ip, port, packet)
# ── Client Methods (Confirmable) ──
def get(self, ip, port, url, token=bytearray()):
return self.send(
ip, port, url,
macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_GET,
token, None, macros.COAP_CONTENT_FORMAT.COAP_NONE, None
)
def put(self, ip, port, url, payload=bytearray(), query_option=None,
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
return self.send(
ip, port, url,
macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_PUT,
token, payload, content_format, query_option
)
def post(self, ip, port, url, payload=bytearray(), query_option=None,
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
return self.send(
ip, port, url,
macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_POST,
token, payload, content_format, query_option
)
# ── Client Methods (Non-Confirmable) ──
def getNonConf(self, ip, port, url, token=bytearray()):
return self.send(
ip, port, url,
macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_GET,
token, None, macros.COAP_CONTENT_FORMAT.COAP_NONE, None
)
def putNonConf(self, ip, port, url, payload=bytearray(), query_option=None,
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
return self.send(
ip, port, url,
macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_PUT,
token, payload, content_format, query_option
)
def postNonConf(self, ip, port, url, payload=bytearray(), query_option=None,
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
return self.send(
ip, port, url,
macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_POST,
token, payload, content_format, query_option
)
# ── Observe Client Methods (RFC 7641) ──
def observeGet(self, ip, port, url, token=None):
"""Send a GET with Observe option 0 (register) to subscribe to a resource.
Args:
ip: Server IP
port: Server port
url: Resource URI path
token: Token for matching responses (auto-generated if None)
Returns:
Message ID on success, 0 on failure.
"""
if token is None:
token = bytearray(os.urandom(4))
packet = CoapPacket()
packet.type = macros.COAP_TYPE.COAP_CON
packet.method = macros.COAP_METHOD.COAP_GET
packet.token = token
packet.payload = None
packet.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE
# Set Observe = 0 (register)
packet.setObserve(macros.COAP_OBSERVE_REGISTER)
return self.sendEx(ip, port, url, packet)
def observeCancel(self, ip, port, url, token=bytearray()):
"""Send a GET with Observe option 1 (deregister) to cancel observation.
Returns:
Message ID on success, 0 on failure.
"""
packet = CoapPacket()
packet.type = macros.COAP_TYPE.COAP_CON
packet.method = macros.COAP_METHOD.COAP_GET
packet.token = token
packet.payload = None
packet.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE
# Set Observe = 1 (deregister)
packet.setObserve(macros.COAP_OBSERVE_DEREGISTER)
return self.sendEx(ip, port, url, packet)
# ── Observe Server Methods (RFC 7641) ──
def notifyObservers(self, resource_url, payload, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE,
message_type=macros.COAP_TYPE.COAP_NONCON, max_age=None):
"""Send an Observe notification to all observers of a resource.
Args:
resource_url: The resource URI path (must match what observers subscribed to)
payload: The current resource representation (str or bytes)
content_format: Content format of the payload
message_type: COAP_NON for periodic data, COAP_CON for critical events
max_age: Optional Max-Age in seconds (freshness lifetime)
Returns:
Number of notifications sent successfully.
"""
if not self.observe.has_observers(resource_url):
return 0
observers = self.observe.get_observers(resource_url)
seq = self.observe.next_sequence(resource_url)
sent = 0
for obs in observers:
packet = CoapPacket()
packet.type = message_type
packet.method = macros.COAP_RESPONSE_CODE.COAP_CONTENT
packet.token = obs["token"]
packet.content_format = content_format
if isinstance(payload, str):
packet.payload = bytearray(payload.encode("utf-8"))
elif payload is not None:
packet.payload = bytearray(payload)
else:
packet.payload = bytearray()
# Generate message ID
randBytes = os.urandom(2)
packet.messageid = (randBytes[0] << 8) | randBytes[1]
# Add Observe sequence number
packet.setObserve(seq)
# Add Max-Age if specified
if max_age is not None:
packet.setMaxAge(max_age)
status = self.sendPacket(obs["ip"], obs["port"], packet)
if status > 0:
sent += 1
else:
self.log("Failed to notify observer {}:{}".format(obs["ip"], obs["port"]))
self.log("Notified {}/{} observers of {} (seq={})".format(
sent, len(observers), resource_url, seq
))
return sent
# ── Incoming Request Handling ──
def handleIncomingRequest(self, requestPacket, sourceIp, sourcePort):
"""Handle an incoming CoAP request, including Observe registration."""
url = requestPacket.getUriPath()
urlCallback = None
if url != "":
urlCallback = self.callbacks.get(url)
if urlCallback is None:
if self.responseCallback:
return False
print("Callback for url [", url, "] not found")
self.sendResponse(
sourceIp, sourcePort, requestPacket.messageid,
None, macros.COAP_RESPONSE_CODE.COAP_NOT_FOUND,
macros.COAP_CONTENT_FORMAT.COAP_NONE, requestPacket.token,
)
return True
# Check for Observe option in GET requests (RFC 7641)
if requestPacket.method == macros.COAP_METHOD.COAP_GET:
observeValue = requestPacket.getObserveValue()
if observeValue == macros.COAP_OBSERVE_REGISTER:
# Register observer
registered = self.observe.register(
url, sourceIp, sourcePort, requestPacket.token
)
if registered:
# Send initial response with Observe option (sequence 0)
result = urlCallback(requestPacket, sourceIp, sourcePort)
if result is not None:
payload_str, cf = result
response = CoapPacket()
response.type = macros.COAP_TYPE.COAP_ACK
response.method = macros.COAP_RESPONSE_CODE.COAP_CONTENT
response.token = requestPacket.token
response.messageid = requestPacket.messageid
response.content_format = cf
if isinstance(payload_str, str):
response.payload = bytearray(payload_str.encode("utf-8"))
elif payload_str is not None:
response.payload = bytearray(payload_str)
# Include Observe option with sequence 0 in initial response
response.setObserve(self.observe.next_sequence(url))
self.sendPacket(sourceIp, sourcePort, response)
# else: callback handled its own response
return True
else:
# Registration failed (limits exceeded) — respond without Observe
self.log("Observer registration failed for {}:{} on {}".format(
sourceIp, sourcePort, url
))
# Fall through to normal callback handling
elif observeValue == macros.COAP_OBSERVE_DEREGISTER:
# Deregister observer
self.observe.deregister(url, sourceIp, sourcePort)
# Fall through to normal GET response
# Normal (non-observe) request handling
urlCallback(requestPacket, sourceIp, sourcePort)
return True
# ── Socket Reading ──
def readBytesFromSocket(self, numOfBytes):
try:
return self.sock.recvfrom(numOfBytes)
except Exception:
return (None, None)
def parsePacketToken(self, buffer, packet):
if packet.tokenLength == 0:
packet.token = None
elif packet.tokenLength <= 8:
packet.token = buffer[4: 4 + packet.tokenLength]
else:
(tempBuffer, tempRemoteAddress) = self.readBytesFromSocket(
macros._BUF_MAX_SIZE
)
if tempBuffer is not None:
buffer.extend(tempBuffer)
return False
return True
# ── Main Loop ──
def loop(self, blocking=True):
"""Process one incoming packet.
Returns True if a packet was processed, False otherwise.
"""
if self.sock is None:
return False
self.sock.setblocking(blocking)
(buffer, remoteAddress) = self.readBytesFromSocket(macros._BUF_MAX_SIZE)
self.sock.setblocking(True)
while (buffer is not None) and (len(buffer) > 0):
bufferLen = len(buffer)
if (bufferLen < macros._COAP_HEADER_SIZE) or \
(((buffer[0] & 0xC0) >> 6) != 1):
(tempBuffer, tempRemoteAddress) = self.readBytesFromSocket(
macros._BUF_MAX_SIZE - bufferLen
)
if tempBuffer is not None:
buffer.extend(tempBuffer)
continue
packet = CoapPacket()
self.log("Incoming bytes: " + str(binascii.hexlify(bytearray(buffer))))
parsePacketHeaderInfo(buffer, packet)
if not self.parsePacketToken(buffer, packet):
continue
if not parsePacketOptionsAndPayload(buffer, packet):
return False
# Handle RST — deregister observer (RFC 7641 §3.6)
if packet.type == macros.COAP_TYPE.COAP_RESET:
self.log("RST received, deregistering any observer with matching token")
for res_url in self.observe.get_all_resources():
self.observe.deregister_by_token(res_url, packet.token)
return True
# Beta: discard retransmissions
if self.discardRetransmissions:
if packet.toString() == self.lastPacketStr:
self.log("Discarded retransmission: " + packet.toString())
return False
else:
self.lastPacketStr = packet.toString()
if not self.isServer or not self.handleIncomingRequest(
packet, remoteAddress[0], remoteAddress[1]
):
# Separate response handling (RFC 7252 §5.2.2)
if (packet.type == macros.COAP_TYPE.COAP_ACK and
packet.method == macros.COAP_METHOD.COAP_EMPTY_MESSAGE):
self.state = self.TRANSMISSION_STATE.STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA
return False
else:
if self.state == self.TRANSMISSION_STATE.STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA:
self.state = self.TRANSMISSION_STATE.STATE_IDLE
self.sendResponse(
remoteAddress[0], remoteAddress[1],
packet.messageid, None,
macros.COAP_TYPE.COAP_ACK,
macros.COAP_CONTENT_FORMAT.COAP_NONE,
packet.token,
)
if self.responseCallback is not None:
self.responseCallback(packet, remoteAddress)
return True
return False
def poll(self, timeoutMs=-1, pollPeriodMs=500):
"""Poll for incoming packets for up to timeoutMs milliseconds."""
start_time = time.ticks_ms()
status = False
while not status:
status = self.loop(False)
if time.ticks_diff(time.ticks_ms(), start_time) >= timeoutMs:
break
time.sleep_ms(pollPeriodMs)
return status