""" microCoAPy - Extended for Demeter with CoAP Observe (RFC 7641) Changes from upstream microCoAPy: - Observer registration/deregistration on incoming GET with Option 6 - notifyObservers() method for server-side Observe notifications - observeGet() client method for subscribing to observable resources - RST handling to remove observers - Per-resource Max-Age support in notifications """ try: import socket except ImportError: import usocket as socket try: import os except ImportError: import uos as os try: import time except ImportError: import utime as time import binascii from . import coap_macros as macros from .coap_packet import CoapPacket from .coap_reader import parsePacketHeaderInfo from .coap_reader import parsePacketOptionsAndPayload from .coap_writer import writePacketHeaderInfo from .coap_writer import writePacketOptions from .coap_writer import writePacketPayload from .observe_manager import ObserveManager class Coap: TRANSMISSION_STATE = macros.enum( STATE_IDLE=0, STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA=1 ) def __init__(self): self.debug = True self.sock = None self.callbacks = {} self.responseCallback = None self.port = 0 self.isServer = False self.state = self.TRANSMISSION_STATE.STATE_IDLE self.isCustomSocket = False # Observe manager (RFC 7641) self.observe = ObserveManager(debug=True) # Beta flags self.discardRetransmissions = False self.lastPacketStr = "" def log(self, s): if self.debug: print("[microcoapy]: " + s) # ── Socket Management ── def start(self, port=macros._COAP_DEFAULT_PORT): """Create and bind a UDP socket.""" self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.sock.bind(("", port)) def stop(self): """Close the socket.""" if self.sock is not None: self.sock.close() self.sock = None def setCustomSocket(self, custom_socket): """Use a custom UDP socket implementation.""" self.stop() self.isCustomSocket = True self.sock = custom_socket # ── Callback Registration ── def addIncomingRequestCallback(self, requestUrl, callback): """Register a callback for incoming requests to a URL. The callback signature is: callback(packet, senderIp, senderPort) For observable resources, the callback should return a tuple: (payload_str, content_format) This allows notifyObservers to retrieve the current value. If the callback returns None, the server handles it as before (callback is responsible for sending its own response). """ self.callbacks[requestUrl] = callback self.isServer = True # ── Packet Sending ── def sendPacket(self, ip, port, coapPacket): """Serialize and send a CoAP packet.""" if coapPacket.content_format != macros.COAP_CONTENT_FORMAT.COAP_NONE: optionBuffer = bytearray(2) optionBuffer[0] = (coapPacket.content_format & 0xFF00) >> 8 optionBuffer[1] = coapPacket.content_format & 0x00FF coapPacket.addOption( macros.COAP_OPTION_NUMBER.COAP_CONTENT_FORMAT, optionBuffer ) if (coapPacket.query is not None) and (len(coapPacket.query) > 0): coapPacket.addOption( macros.COAP_OPTION_NUMBER.COAP_URI_QUERY, coapPacket.query ) buffer = bytearray() writePacketHeaderInfo(buffer, coapPacket) writePacketOptions(buffer, coapPacket) writePacketPayload(buffer, coapPacket) status = 0 try: sockaddr = (ip, port) try: sockaddr = socket.getaddrinfo(ip, port)[0][-1] except Exception: pass status = self.sock.sendto(buffer, sockaddr) if status > 0: status = coapPacket.messageid self.log("Packet sent. messageid: " + str(status)) except Exception as e: status = 0 print("Exception while sending packet...") import sys sys.print_exception(e) return status def send(self, ip, port, url, type, method, token, payload, content_format, query_option): """Build and send a CoAP request.""" packet = CoapPacket() packet.type = type packet.method = method packet.token = token packet.payload = payload packet.content_format = content_format packet.query = query_option return self.sendEx(ip, port, url, packet) def sendEx(self, ip, port, url, packet): """Send a packet with auto-generated message ID and URI options.""" self.state = self.TRANSMISSION_STATE.STATE_IDLE randBytes = os.urandom(2) packet.messageid = (randBytes[0] << 8) | randBytes[1] packet.setUriHost(ip) packet.setUriPath(url) return self.sendPacket(ip, port, packet) def sendResponse(self, ip, port, messageid, payload, method, content_format, token): """Send a response (ACK) packet.""" packet = CoapPacket() packet.type = macros.COAP_TYPE.COAP_ACK packet.method = method packet.token = token packet.payload = payload packet.messageid = messageid packet.content_format = content_format return self.sendPacket(ip, port, packet) # ── Client Methods (Confirmable) ── def get(self, ip, port, url, token=bytearray()): return self.send( ip, port, url, macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_GET, token, None, macros.COAP_CONTENT_FORMAT.COAP_NONE, None ) def put(self, ip, port, url, payload=bytearray(), query_option=None, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()): return self.send( ip, port, url, macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_PUT, token, payload, content_format, query_option ) def post(self, ip, port, url, payload=bytearray(), query_option=None, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()): return self.send( ip, port, url, macros.COAP_TYPE.COAP_CON, macros.COAP_METHOD.COAP_POST, token, payload, content_format, query_option ) # ── Client Methods (Non-Confirmable) ── def getNonConf(self, ip, port, url, token=bytearray()): return self.send( ip, port, url, macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_GET, token, None, macros.COAP_CONTENT_FORMAT.COAP_NONE, None ) def putNonConf(self, ip, port, url, payload=bytearray(), query_option=None, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()): return self.send( ip, port, url, macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_PUT, token, payload, content_format, query_option ) def postNonConf(self, ip, port, url, payload=bytearray(), query_option=None, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, token=bytearray()): return self.send( ip, port, url, macros.COAP_TYPE.COAP_NONCON, macros.COAP_METHOD.COAP_POST, token, payload, content_format, query_option ) # ── Observe Client Methods (RFC 7641) ── def observeGet(self, ip, port, url, token=None): """Send a GET with Observe option 0 (register) to subscribe to a resource. Args: ip: Server IP port: Server port url: Resource URI path token: Token for matching responses (auto-generated if None) Returns: Message ID on success, 0 on failure. """ if token is None: token = bytearray(os.urandom(4)) packet = CoapPacket() packet.type = macros.COAP_TYPE.COAP_CON packet.method = macros.COAP_METHOD.COAP_GET packet.token = token packet.payload = None packet.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE # Set Observe = 0 (register) packet.setObserve(macros.COAP_OBSERVE_REGISTER) return self.sendEx(ip, port, url, packet) def observeCancel(self, ip, port, url, token=bytearray()): """Send a GET with Observe option 1 (deregister) to cancel observation. Returns: Message ID on success, 0 on failure. """ packet = CoapPacket() packet.type = macros.COAP_TYPE.COAP_CON packet.method = macros.COAP_METHOD.COAP_GET packet.token = token packet.payload = None packet.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE # Set Observe = 1 (deregister) packet.setObserve(macros.COAP_OBSERVE_DEREGISTER) return self.sendEx(ip, port, url, packet) # ── Observe Server Methods (RFC 7641) ── def notifyObservers(self, resource_url, payload, content_format=macros.COAP_CONTENT_FORMAT.COAP_NONE, message_type=macros.COAP_TYPE.COAP_NONCON, max_age=None): """Send an Observe notification to all observers of a resource. Args: resource_url: The resource URI path (must match what observers subscribed to) payload: The current resource representation (str or bytes) content_format: Content format of the payload message_type: COAP_NON for periodic data, COAP_CON for critical events max_age: Optional Max-Age in seconds (freshness lifetime) Returns: Number of notifications sent successfully. """ if not self.observe.has_observers(resource_url): return 0 observers = self.observe.get_observers(resource_url) seq = self.observe.next_sequence(resource_url) sent = 0 for obs in observers: packet = CoapPacket() packet.type = message_type packet.method = macros.COAP_RESPONSE_CODE.COAP_CONTENT packet.token = obs["token"] packet.content_format = content_format if isinstance(payload, str): packet.payload = bytearray(payload.encode("utf-8")) elif payload is not None: packet.payload = bytearray(payload) else: packet.payload = bytearray() # Generate message ID randBytes = os.urandom(2) packet.messageid = (randBytes[0] << 8) | randBytes[1] # Add Observe sequence number packet.setObserve(seq) # Add Max-Age if specified if max_age is not None: packet.setMaxAge(max_age) status = self.sendPacket(obs["ip"], obs["port"], packet) if status > 0: sent += 1 else: self.log("Failed to notify observer {}:{}".format(obs["ip"], obs["port"])) self.log("Notified {}/{} observers of {} (seq={})".format( sent, len(observers), resource_url, seq )) return sent # ── Incoming Request Handling ── def handleIncomingRequest(self, requestPacket, sourceIp, sourcePort): """Handle an incoming CoAP request, including Observe registration.""" url = requestPacket.getUriPath() urlCallback = None if url != "": urlCallback = self.callbacks.get(url) if urlCallback is None: if self.responseCallback: return False print("Callback for url [", url, "] not found") self.sendResponse( sourceIp, sourcePort, requestPacket.messageid, None, macros.COAP_RESPONSE_CODE.COAP_NOT_FOUND, macros.COAP_CONTENT_FORMAT.COAP_NONE, requestPacket.token, ) return True # Check for Observe option in GET requests (RFC 7641) if requestPacket.method == macros.COAP_METHOD.COAP_GET: observeValue = requestPacket.getObserveValue() if observeValue == macros.COAP_OBSERVE_REGISTER: # Register observer registered = self.observe.register( url, sourceIp, sourcePort, requestPacket.token ) if registered: # Send initial response with Observe option (sequence 0) result = urlCallback(requestPacket, sourceIp, sourcePort) if result is not None: payload_str, cf = result response = CoapPacket() response.type = macros.COAP_TYPE.COAP_ACK response.method = macros.COAP_RESPONSE_CODE.COAP_CONTENT response.token = requestPacket.token response.messageid = requestPacket.messageid response.content_format = cf if isinstance(payload_str, str): response.payload = bytearray(payload_str.encode("utf-8")) elif payload_str is not None: response.payload = bytearray(payload_str) # Include Observe option with sequence 0 in initial response response.setObserve(self.observe.next_sequence(url)) self.sendPacket(sourceIp, sourcePort, response) # else: callback handled its own response return True else: # Registration failed (limits exceeded) — respond without Observe self.log("Observer registration failed for {}:{} on {}".format( sourceIp, sourcePort, url )) # Fall through to normal callback handling elif observeValue == macros.COAP_OBSERVE_DEREGISTER: # Deregister observer self.observe.deregister(url, sourceIp, sourcePort) # Fall through to normal GET response # Normal (non-observe) request handling urlCallback(requestPacket, sourceIp, sourcePort) return True # ── Socket Reading ── def readBytesFromSocket(self, numOfBytes): try: return self.sock.recvfrom(numOfBytes) except Exception: return (None, None) def parsePacketToken(self, buffer, packet): if packet.tokenLength == 0: packet.token = None elif packet.tokenLength <= 8: packet.token = buffer[4: 4 + packet.tokenLength] else: (tempBuffer, tempRemoteAddress) = self.readBytesFromSocket( macros._BUF_MAX_SIZE ) if tempBuffer is not None: buffer.extend(tempBuffer) return False return True # ── Main Loop ── def loop(self, blocking=True): """Process one incoming packet. Returns True if a packet was processed, False otherwise. """ if self.sock is None: return False self.sock.setblocking(blocking) (buffer, remoteAddress) = self.readBytesFromSocket(macros._BUF_MAX_SIZE) self.sock.setblocking(True) while (buffer is not None) and (len(buffer) > 0): bufferLen = len(buffer) if (bufferLen < macros._COAP_HEADER_SIZE) or \ (((buffer[0] & 0xC0) >> 6) != 1): (tempBuffer, tempRemoteAddress) = self.readBytesFromSocket( macros._BUF_MAX_SIZE - bufferLen ) if tempBuffer is not None: buffer.extend(tempBuffer) continue packet = CoapPacket() self.log("Incoming bytes: " + str(binascii.hexlify(bytearray(buffer)))) parsePacketHeaderInfo(buffer, packet) if not self.parsePacketToken(buffer, packet): continue if not parsePacketOptionsAndPayload(buffer, packet): return False # Handle RST — deregister observer (RFC 7641 §3.6) if packet.type == macros.COAP_TYPE.COAP_RESET: self.log("RST received, deregistering any observer with matching token") for res_url in self.observe.get_all_resources(): self.observe.deregister_by_token(res_url, packet.token) return True # Beta: discard retransmissions if self.discardRetransmissions: if packet.toString() == self.lastPacketStr: self.log("Discarded retransmission: " + packet.toString()) return False else: self.lastPacketStr = packet.toString() if not self.isServer or not self.handleIncomingRequest( packet, remoteAddress[0], remoteAddress[1] ): # Separate response handling (RFC 7252 §5.2.2) if (packet.type == macros.COAP_TYPE.COAP_ACK and packet.method == macros.COAP_METHOD.COAP_EMPTY_MESSAGE): self.state = self.TRANSMISSION_STATE.STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA return False else: if self.state == self.TRANSMISSION_STATE.STATE_SEPARATE_ACK_RECEIVED_WAITING_DATA: self.state = self.TRANSMISSION_STATE.STATE_IDLE self.sendResponse( remoteAddress[0], remoteAddress[1], packet.messageid, None, macros.COAP_TYPE.COAP_ACK, macros.COAP_CONTENT_FORMAT.COAP_NONE, packet.token, ) if self.responseCallback is not None: self.responseCallback(packet, remoteAddress) return True return False def poll(self, timeoutMs=-1, pollPeriodMs=500): """Poll for incoming packets for up to timeoutMs milliseconds.""" start_time = time.ticks_ms() status = False while not status: status = self.loop(False) if time.ticks_diff(time.ticks_ms(), start_time) >= timeoutMs: break time.sleep_ms(pollPeriodMs) return status