Files
demeter/esp/microcoapy/coap_packet.py

84 lines
3.0 KiB
Python

from . import coap_macros as macros
from .coap_option import CoapOption
class CoapPacket:
def __init__(self):
self.version = macros.COAP_VERSION.COAP_VERSION_UNSUPPORTED
self.type = macros.COAP_TYPE.COAP_CON
self.method = macros.COAP_METHOD.COAP_GET
self.token = bytearray()
self.payload = bytearray()
self.messageid = 0
self.content_format = macros.COAP_CONTENT_FORMAT.COAP_NONE
self.query = bytearray()
self.options = []
def addOption(self, number, opt_payload):
if len(self.options) >= macros._MAX_OPTION_NUM:
return
self.options.append(CoapOption(number, opt_payload))
def setUriHost(self, address):
self.addOption(macros.COAP_OPTION_NUMBER.COAP_URI_HOST, address)
def setUriPath(self, url):
for subPath in url.split('/'):
self.addOption(macros.COAP_OPTION_NUMBER.COAP_URI_PATH, subPath)
def setObserve(self, value):
"""Set the Observe option (RFC 7641).
For requests:
value=0: register as observer
value=1: deregister
For notifications:
value=sequence number (24-bit, 0-16777215)
"""
if value < 256:
buf = bytearray([value & 0xFF])
elif value < 65536:
buf = bytearray([(value >> 8) & 0xFF, value & 0xFF])
else:
buf = bytearray([(value >> 16) & 0xFF, (value >> 8) & 0xFF, value & 0xFF])
self.addOption(macros.COAP_OPTION_NUMBER.COAP_OBSERVE, buf)
def setMaxAge(self, seconds):
"""Set the Max-Age option (seconds until resource is considered stale)."""
if seconds < 256:
buf = bytearray([seconds & 0xFF])
elif seconds < 65536:
buf = bytearray([(seconds >> 8) & 0xFF, seconds & 0xFF])
else:
buf = bytearray([
(seconds >> 24) & 0xFF, (seconds >> 16) & 0xFF,
(seconds >> 8) & 0xFF, seconds & 0xFF
])
self.addOption(macros.COAP_OPTION_NUMBER.COAP_MAX_AGE, buf)
def getObserveValue(self):
"""Extract the Observe option value from the packet, or None if absent."""
for opt in self.options:
if opt.number == macros.COAP_OPTION_NUMBER.COAP_OBSERVE:
val = 0
for b in opt.buffer:
val = (val << 8) | b
return val
return None
def getUriPath(self):
"""Reconstruct the URI path from URI_PATH options."""
parts = []
for opt in self.options:
if opt.number == macros.COAP_OPTION_NUMBER.COAP_URI_PATH and len(opt.buffer) > 0:
parts.append(opt.buffer.decode("utf-8"))
return "/".join(parts)
def toString(self):
class_, detail = macros.CoapResponseCode.decode(self.method)
return "type: {}, method: {}.{:02d}, messageid: {}, payload: {}".format(
macros.coapTypeToString(self.type), class_, detail,
self.messageid, self.payload
)