513 lines
18 KiB
Python
513 lines
18 KiB
Python
"""
|
|
microCoAPy - Extended for Demeter with CoAP Observe (RFC 7641)
|
|
|
|
Changes from upstream microCoAPy:
|
|
- Observer registration/deregistration on incoming GET with Option 6
|
|
- notifyObservers() method for server-side Observe notifications
|
|
- observeGet() client method for subscribing to observable resources
|
|
- RST handling to remove observers
|
|
- Per-resource Max-Age support in notifications
|
|
"""
|
|
|
|
try:
|
|
import socket
|
|
except ImportError:
|
|
import usocket as socket
|
|
|
|
try:
|
|
import os
|
|
except ImportError:
|
|
import uos as os
|
|
|
|
try:
|
|
import time
|
|
except ImportError:
|
|
import utime as time
|
|
|
|
import binascii
|
|
|
|
from . import coap_macros as macros
|
|
from .coap_packet import CoapPacket
|
|
from .coap_reader import parsePacketHeaderInfo
|
|
from .coap_reader import parsePacketOptionsAndPayload
|
|
from .coap_writer import writePacketHeaderInfo
|
|
from .coap_writer import writePacketOptions
|
|
from .coap_writer import writePacketPayload
|
|
from .observe_manager import ObserveManager
|
|
|
|
|
|
class Coap:
|
|
TRANSMISSION_STATE = macros.enum(
|
|
STATE_IDLE=0,
|
|
STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA=1
|
|
)
|
|
|
|
def __init__(self):
|
|
self.debug = True
|
|
self.sock = None
|
|
self.callbacks = {}
|
|
self.responseCallback = None
|
|
self.port = 0
|
|
self.isServer = False
|
|
self.state = self.TRANSMISSION_STATE.STATE_IDLE
|
|
self.isCustomSocket = False
|
|
|
|
# Observe manager (RFC 7641)
|
|
self.observe = ObserveManager(debug=True)
|
|
|
|
# Beta flags
|
|
self.discardRetransmissions = False
|
|
self.lastPacketStr = ""
|
|
|
|
def log(self, s):
|
|
if self.debug:
|
|
print("[microcoapy]: " + s)
|
|
|
|
# ── Socket Management ──
|
|
|
|
def start(self, port=macros._COAP_DEFAULT_PORT):
|
|
"""Create and bind a UDP socket."""
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
self.sock.bind(("", port))
|
|
|
|
def stop(self):
|
|
"""Close the socket."""
|
|
if self.sock is not None:
|
|
self.sock.close()
|
|
self.sock = None
|
|
|
|
def setCustomSocket(self, custom_socket):
|
|
"""Use a custom UDP socket implementation."""
|
|
self.stop()
|
|
self.isCustomSocket = True
|
|
self.sock = custom_socket
|
|
|
|
# ── Callback Registration ──
|
|
|
|
def addIncomingRequestCallback(self, requestUrl, callback):
|
|
"""Register a callback for incoming requests to a URL.
|
|
|
|
The callback signature is:
|
|
callback(packet, senderIp, senderPort)
|
|
|
|
For observable resources, the callback should return a tuple:
|
|
(payload_str, content_format)
|
|
This allows notifyObservers to retrieve the current value.
|
|
|
|
If the callback returns None, the server handles it as before
|
|
(callback is responsible for sending its own response).
|
|
"""
|
|
self.callbacks[requestUrl] = callback
|
|
self.isServer = True
|
|
|
|
# ── Packet Sending ──
|
|
|
|
def sendPacket(self, ip, port, coapPacket):
|
|
"""Serialize and send a CoAP packet."""
|
|
if coapPacket.content_format != macros.COAP_CONTENT_FORMAT.COAP_NONE:
|
|
optionBuffer = bytearray(2)
|
|
optionBuffer[0] = (coapPacket.content_format & 0xFF00) >> 8
|
|
optionBuffer[1] = coapPacket.content_format & 0x00FF
|
|
coapPacket.addOption(
|
|
macros.COAP_OPTION_NUMBER.COAP_CONTENT_FORMAT, optionBuffer
|
|
)
|
|
|
|
if (coapPacket.query is not None) and (len(coapPacket.query) > 0):
|
|
coapPacket.addOption(
|
|
macros.COAP_OPTION_NUMBER.COAP_URI_QUERY, coapPacket.query
|
|
)
|
|
|
|
buffer = bytearray()
|
|
writePacketHeaderInfo(buffer, coapPacket)
|
|
writePacketOptions(buffer, coapPacket)
|
|
writePacketPayload(buffer, coapPacket)
|
|
|
|
status = 0
|
|
try:
|
|
sockaddr = (ip, port)
|
|
try:
|
|
sockaddr = socket.getaddrinfo(ip, port)[0][-1]
|
|
except Exception:
|
|
pass
|
|
|
|
status = self.sock.sendto(buffer, sockaddr)
|
|
|
|
if status > 0:
|
|
status = coapPacket.messageid
|
|
|
|
self.log("Packet sent. messageid: " + str(status))
|
|
except Exception as e:
|
|
status = 0
|
|
print("Exception while sending packet...")
|
|
import sys
|
|
sys.print_exception(e)
|
|
|
|
return status
|
|
|
|
def send(self, ip, port, url, type, method, token, payload, content_format, query_option):
|
|
"""Build and send a CoAP request."""
|
|
packet = CoapPacket()
|
|
packet.type = type
|
|
packet.method = method
|
|
packet.token = token
|
|
packet.payload = payload
|
|
packet.content_format = content_format
|
|
packet.query = query_option
|
|
return self.sendEx(ip, port, url, packet)
|
|
|
|
def sendEx(self, ip, port, url, packet):
|
|
"""Send a packet with auto-generated message ID and URI options."""
|
|
self.state = self.TRANSMISSION_STATE.STATE_IDLE
|
|
randBytes = os.urandom(2)
|
|
packet.messageid = (randBytes[0] << 8) | randBytes[1]
|
|
packet.setUriHost(ip)
|
|
packet.setUriPath(url)
|
|
return self.sendPacket(ip, port, packet)
|
|
|
|
def sendResponse(self, ip, port, messageid, payload, method, content_format, token):
|
|
"""Send a response (ACK) packet."""
|
|
packet = CoapPacket()
|
|
packet.type = macros.COAP_TYPE.COAP_ACK
|
|
packet.method = method
|
|
packet.token = token
|
|
packet.payload = payload
|
|
packet.messageid = messageid
|
|
packet.content_format = content_format
|
|
return self.sendPacket(ip, port, packet)
|
|
|
|
# ── Client Methods (Confirmable) ──
|
|
|
|
def get(self, ip, port, url, token=bytearray()):
|
|
return self.send(
|
|
ip, port, url,
|
|
macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_GET,
|
|
token, None, macros.COAP_CONTENT_FORMAT.COAP_NONE, None
|
|
)
|
|
|
|
def put(self, ip, port, url, payload=bytearray(), query_option=None,
|
|
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
|
|
return self.send(
|
|
ip, port, url,
|
|
macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_PUT,
|
|
token, payload, content_format, query_option
|
|
)
|
|
|
|
def post(self, ip, port, url, payload=bytearray(), query_option=None,
|
|
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
|
|
return self.send(
|
|
ip, port, url,
|
|
macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_POST,
|
|
token, payload, content_format, query_option
|
|
)
|
|
|
|
# ── Client Methods (Non-Confirmable) ──
|
|
|
|
def getNonConf(self, ip, port, url, token=bytearray()):
|
|
return self.send(
|
|
ip, port, url,
|
|
macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_GET,
|
|
token, None, macros.COAP_CONTENT_FORMAT.COAP_NONE, None
|
|
)
|
|
|
|
def putNonConf(self, ip, port, url, payload=bytearray(), query_option=None,
|
|
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
|
|
return self.send(
|
|
ip, port, url,
|
|
macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_PUT,
|
|
token, payload, content_format, query_option
|
|
)
|
|
|
|
def postNonConf(self, ip, port, url, payload=bytearray(), query_option=None,
|
|
content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()):
|
|
return self.send(
|
|
ip, port, url,
|
|
macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_POST,
|
|
token, payload, content_format, query_option
|
|
)
|
|
|
|
# ── Observe Client Methods (RFC 7641) ──
|
|
|
|
def observeGet(self, ip, port, url, token=None):
|
|
"""Send a GET with Observe option 0 (register) to subscribe to a resource.
|
|
|
|
Args:
|
|
ip: Server IP
|
|
port: Server port
|
|
url: Resource URI path
|
|
token: Token for matching responses (auto-generated if None)
|
|
|
|
Returns:
|
|
Message ID on success, 0 on failure.
|
|
"""
|
|
if token is None:
|
|
token = bytearray(os.urandom(4))
|
|
|
|
packet = CoapPacket()
|
|
packet.type = macros.COAP_TYPE.COAP_CON
|
|
packet.method = macros.COAP_METHOD.COAP_GET
|
|
packet.token = token
|
|
packet.payload = None
|
|
packet.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE
|
|
|
|
# Set Observe = 0 (register)
|
|
packet.setObserve(macros.COAP_OBSERVE_REGISTER)
|
|
|
|
return self.sendEx(ip, port, url, packet)
|
|
|
|
def observeCancel(self, ip, port, url, token=bytearray()):
|
|
"""Send a GET with Observe option 1 (deregister) to cancel observation.
|
|
|
|
Returns:
|
|
Message ID on success, 0 on failure.
|
|
"""
|
|
packet = CoapPacket()
|
|
packet.type = macros.COAP_TYPE.COAP_CON
|
|
packet.method = macros.COAP_METHOD.COAP_GET
|
|
packet.token = token
|
|
packet.payload = None
|
|
packet.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE
|
|
|
|
# Set Observe = 1 (deregister)
|
|
packet.setObserve(macros.COAP_OBSERVE_DEREGISTER)
|
|
|
|
return self.sendEx(ip, port, url, packet)
|
|
|
|
# ── Observe Server Methods (RFC 7641) ──
|
|
|
|
def notifyObservers(self, resource_url, payload, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE,
|
|
message_type=macros.COAP_TYPE.COAP_NONCON, max_age=None):
|
|
"""Send an Observe notification to all observers of a resource.
|
|
|
|
Args:
|
|
resource_url: The resource URI path (must match what observers subscribed to)
|
|
payload: The current resource representation (str or bytes)
|
|
content_format: Content format of the payload
|
|
message_type: COAP_NON for periodic data, COAP_CON for critical events
|
|
max_age: Optional Max-Age in seconds (freshness lifetime)
|
|
|
|
Returns:
|
|
Number of notifications sent successfully.
|
|
"""
|
|
if not self.observe.has_observers(resource_url):
|
|
return 0
|
|
|
|
observers = self.observe.get_observers(resource_url)
|
|
seq = self.observe.next_sequence(resource_url)
|
|
sent = 0
|
|
|
|
for obs in observers:
|
|
packet = CoapPacket()
|
|
packet.type = message_type
|
|
packet.method = macros.COAP_RESPONSE_CODE.COAP_CONTENT
|
|
packet.token = obs["token"]
|
|
packet.content_format = content_format
|
|
|
|
if isinstance(payload, str):
|
|
packet.payload = bytearray(payload.encode("utf-8"))
|
|
elif payload is not None:
|
|
packet.payload = bytearray(payload)
|
|
else:
|
|
packet.payload = bytearray()
|
|
|
|
# Generate message ID
|
|
randBytes = os.urandom(2)
|
|
packet.messageid = (randBytes[0] << 8) | randBytes[1]
|
|
|
|
# Add Observe sequence number
|
|
packet.setObserve(seq)
|
|
|
|
# Add Max-Age if specified
|
|
if max_age is not None:
|
|
packet.setMaxAge(max_age)
|
|
|
|
status = self.sendPacket(obs["ip"], obs["port"], packet)
|
|
if status > 0:
|
|
sent += 1
|
|
else:
|
|
self.log("Failed to notify observer {}:{}".format(obs["ip"], obs["port"]))
|
|
|
|
self.log("Notified {}/{} observers of {} (seq={})".format(
|
|
sent, len(observers), resource_url, seq
|
|
))
|
|
return sent
|
|
|
|
# ── Incoming Request Handling ──
|
|
|
|
def handleIncomingRequest(self, requestPacket, sourceIp, sourcePort):
|
|
"""Handle an incoming CoAP request, including Observe registration."""
|
|
url = requestPacket.getUriPath()
|
|
|
|
urlCallback = None
|
|
if url != "":
|
|
urlCallback = self.callbacks.get(url)
|
|
|
|
if urlCallback is None:
|
|
if self.responseCallback:
|
|
return False
|
|
print("Callback for url [", url, "] not found")
|
|
self.sendResponse(
|
|
sourceIp, sourcePort, requestPacket.messageid,
|
|
None, macros.COAP_RESPONSE_CODE.COAP_NOT_FOUND,
|
|
macros.COAP_CONTENT_FORMAT.COAP_NONE, requestPacket.token,
|
|
)
|
|
return True
|
|
|
|
# Check for Observe option in GET requests (RFC 7641)
|
|
if requestPacket.method == macros.COAP_METHOD.COAP_GET:
|
|
observeValue = requestPacket.getObserveValue()
|
|
|
|
if observeValue == macros.COAP_OBSERVE_REGISTER:
|
|
# Register observer
|
|
registered = self.observe.register(
|
|
url, sourceIp, sourcePort, requestPacket.token
|
|
)
|
|
|
|
if registered:
|
|
# Send initial response with Observe option (sequence 0)
|
|
result = urlCallback(requestPacket, sourceIp, sourcePort)
|
|
|
|
if result is not None:
|
|
payload_str, cf = result
|
|
response = CoapPacket()
|
|
response.type = macros.COAP_TYPE.COAP_ACK
|
|
response.method = macros.COAP_RESPONSE_CODE.COAP_CONTENT
|
|
response.token = requestPacket.token
|
|
response.messageid = requestPacket.messageid
|
|
response.content_format = cf
|
|
|
|
if isinstance(payload_str, str):
|
|
response.payload = bytearray(payload_str.encode("utf-8"))
|
|
elif payload_str is not None:
|
|
response.payload = bytearray(payload_str)
|
|
|
|
# Include Observe option with sequence 0 in initial response
|
|
response.setObserve(self.observe.next_sequence(url))
|
|
|
|
self.sendPacket(sourceIp, sourcePort, response)
|
|
# else: callback handled its own response
|
|
return True
|
|
|
|
else:
|
|
# Registration failed (limits exceeded) — respond without Observe
|
|
self.log("Observer registration failed for {}:{} on {}".format(
|
|
sourceIp, sourcePort, url
|
|
))
|
|
# Fall through to normal callback handling
|
|
|
|
elif observeValue == macros.COAP_OBSERVE_DEREGISTER:
|
|
# Deregister observer
|
|
self.observe.deregister(url, sourceIp, sourcePort)
|
|
# Fall through to normal GET response
|
|
|
|
# Normal (non-observe) request handling
|
|
urlCallback(requestPacket, sourceIp, sourcePort)
|
|
return True
|
|
|
|
# ── Socket Reading ──
|
|
|
|
def readBytesFromSocket(self, numOfBytes):
|
|
try:
|
|
return self.sock.recvfrom(numOfBytes)
|
|
except Exception:
|
|
return (None, None)
|
|
|
|
def parsePacketToken(self, buffer, packet):
|
|
if packet.tokenLength == 0:
|
|
packet.token = None
|
|
elif packet.tokenLength <= 8:
|
|
packet.token = buffer[4: 4 + packet.tokenLength]
|
|
else:
|
|
(tempBuffer, tempRemoteAddress) = self.readBytesFromSocket(
|
|
macros._BUF_MAX_SIZE
|
|
)
|
|
if tempBuffer is not None:
|
|
buffer.extend(tempBuffer)
|
|
return False
|
|
return True
|
|
|
|
# ── Main Loop ──
|
|
|
|
def loop(self, blocking=True):
|
|
"""Process one incoming packet.
|
|
|
|
Returns True if a packet was processed, False otherwise.
|
|
"""
|
|
if self.sock is None:
|
|
return False
|
|
|
|
self.sock.setblocking(blocking)
|
|
(buffer, remoteAddress) = self.readBytesFromSocket(macros._BUF_MAX_SIZE)
|
|
self.sock.setblocking(True)
|
|
|
|
while (buffer is not None) and (len(buffer) > 0):
|
|
bufferLen = len(buffer)
|
|
if (bufferLen < macros._COAP_HEADER_SIZE) or \
|
|
(((buffer[0] & 0xC0) >> 6) != 1):
|
|
(tempBuffer, tempRemoteAddress) = self.readBytesFromSocket(
|
|
macros._BUF_MAX_SIZE - bufferLen
|
|
)
|
|
if tempBuffer is not None:
|
|
buffer.extend(tempBuffer)
|
|
continue
|
|
|
|
packet = CoapPacket()
|
|
self.log("Incoming bytes: " + str(binascii.hexlify(bytearray(buffer))))
|
|
|
|
parsePacketHeaderInfo(buffer, packet)
|
|
|
|
if not self.parsePacketToken(buffer, packet):
|
|
continue
|
|
|
|
if not parsePacketOptionsAndPayload(buffer, packet):
|
|
return False
|
|
|
|
# Handle RST — deregister observer (RFC 7641 §3.6)
|
|
if packet.type == macros.COAP_TYPE.COAP_RESET:
|
|
self.log("RST received, deregistering any observer with matching token")
|
|
for res_url in self.observe.get_all_resources():
|
|
self.observe.deregister_by_token(res_url, packet.token)
|
|
return True
|
|
|
|
# Beta: discard retransmissions
|
|
if self.discardRetransmissions:
|
|
if packet.toString() == self.lastPacketStr:
|
|
self.log("Discarded retransmission: " + packet.toString())
|
|
return False
|
|
else:
|
|
self.lastPacketStr = packet.toString()
|
|
|
|
if not self.isServer or not self.handleIncomingRequest(
|
|
packet, remoteAddress[0], remoteAddress[1]
|
|
):
|
|
# Separate response handling (RFC 7252 §5.2.2)
|
|
if (packet.type == macros.COAP_TYPE.COAP_ACK and
|
|
packet.method == macros.COAP_METHOD.COAP_EMPTY_MESSAGE):
|
|
self.state = self.TRANSMISSION_STATE.STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA
|
|
return False
|
|
else:
|
|
if self.state == self.TRANSMISSION_STATE.STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA:
|
|
self.state = self.TRANSMISSION_STATE.STATE_IDLE
|
|
self.sendResponse(
|
|
remoteAddress[0], remoteAddress[1],
|
|
packet.messageid, None,
|
|
macros.COAP_TYPE.COAP_ACK,
|
|
macros.COAP_CONTENT_FORMAT.COAP_NONE,
|
|
packet.token,
|
|
)
|
|
if self.responseCallback is not None:
|
|
self.responseCallback(packet, remoteAddress)
|
|
return True
|
|
|
|
return False
|
|
|
|
def poll(self, timeoutMs=-1, pollPeriodMs=500):
|
|
"""Poll for incoming packets for up to timeoutMs milliseconds."""
|
|
start_time = time.ticks_ms()
|
|
status = False
|
|
while not status:
|
|
status = self.loop(False)
|
|
if time.ticks_diff(time.ticks_ms(), start_time) >= timeoutMs:
|
|
break
|
|
time.sleep_ms(pollPeriodMs)
|
|
return status
|