""" Tests for microCoAPy Observe Extension Run with: python -m pytest tests/test_observe.py -v Or simply: python tests/test_observe.py """ import sys import os # Add parent directory to path so we can import microcoapy sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from microcoapy.coap_macros import ( COAP_OPTION_NUMBER, COAP_TYPE, COAP_METHOD, COAP_RESPONSE_CODE, COAP_CONTENT_FORMAT, COAP_OBSERVE_REGISTER, COAP_OBSERVE_DEREGISTER, ) from microcoapy.coap_packet import CoapPacket from microcoapy.coap_writer import writePacketHeaderInfo, writePacketOptions, writePacketPayload from microcoapy.coap_reader import parsePacketHeaderInfo, parsePacketOptionsAndPayload from microcoapy.observe_manager import ObserveManager def test_observe_option_encode_single_byte(): """Observe value < 256 should encode as 1 byte.""" pkt = CoapPacket() pkt.setObserve(0) obs_opts = [o for o in pkt.options if o.number == COAP_OPTION_NUMBER.COAP_OBSERVE] assert len(obs_opts) == 1 assert obs_opts[0].buffer == bytearray([0]) print("PASS: observe option encode single byte (value=0)") pkt2 = CoapPacket() pkt2.setObserve(42) obs_opts2 = [o for o in pkt2.options if o.number == COAP_OPTION_NUMBER.COAP_OBSERVE] assert obs_opts2[0].buffer == bytearray([42]) print("PASS: observe option encode single byte (value=42)") def test_observe_option_encode_two_bytes(): """Observe value 256-65535 should encode as 2 bytes.""" pkt = CoapPacket() pkt.setObserve(1000) obs_opts = [o for o in pkt.options if o.number == COAP_OPTION_NUMBER.COAP_OBSERVE] assert len(obs_opts) == 1 buf = obs_opts[0].buffer assert len(buf) == 2 val = (buf[0] << 8) | buf[1] assert val == 1000 print("PASS: observe option encode two bytes (value=1000)") def test_observe_option_encode_three_bytes(): """Observe value 65536+ should encode as 3 bytes.""" pkt = CoapPacket() pkt.setObserve(100000) obs_opts = [o for o in pkt.options if o.number == COAP_OPTION_NUMBER.COAP_OBSERVE] buf = obs_opts[0].buffer assert len(buf) == 3 val = (buf[0] << 16) | (buf[1] << 8) | buf[2] assert val == 100000 print("PASS: observe option encode three bytes (value=100000)") def test_get_observe_value(): """getObserveValue() should decode the Observe option.""" pkt = CoapPacket() assert pkt.getObserveValue() is None print("PASS: getObserveValue returns None when no option set") pkt.setObserve(0) assert pkt.getObserveValue() == 0 print("PASS: getObserveValue returns 0 (register)") pkt2 = CoapPacket() pkt2.setObserve(1) assert pkt2.getObserveValue() == 1 print("PASS: getObserveValue returns 1 (deregister)") pkt3 = CoapPacket() pkt3.setObserve(65000) assert pkt3.getObserveValue() == 65000 print("PASS: getObserveValue returns 65000") def test_packet_roundtrip_with_observe(): """A packet with Observe option should survive encode -> decode.""" pkt = CoapPacket() pkt.type = COAP_TYPE.COAP_CON pkt.method = COAP_METHOD.COAP_GET pkt.messageid = 0x1234 pkt.token = bytearray([0xAA, 0xBB]) pkt.setObserve(COAP_OBSERVE_REGISTER) pkt.setUriPath("sensors/temperature") # Encode buffer = bytearray() writePacketHeaderInfo(buffer, pkt) writePacketOptions(buffer, pkt) writePacketPayload(buffer, pkt) # Decode decoded = CoapPacket() parsePacketHeaderInfo(buffer, decoded) decoded.tokenLength = buffer[0] & 0x0F decoded.token = buffer[4:4 + decoded.tokenLength] assert parsePacketOptionsAndPayload(buffer, decoded) # Verify Observe option survived obs_val = decoded.getObserveValue() assert obs_val == 0, "Expected Observe=0, got {}".format(obs_val) print("PASS: packet roundtrip with Observe option") # Verify URI path survived uri = decoded.getUriPath() assert uri == "sensors/temperature", "Expected 'sensors/temperature', got '{}'".format(uri) print("PASS: packet roundtrip URI path preserved") def test_max_age_option(): """Max-Age option should encode correctly.""" pkt = CoapPacket() pkt.setMaxAge(60) max_age_opts = [o for o in pkt.options if o.number == COAP_OPTION_NUMBER.COAP_MAX_AGE] assert len(max_age_opts) == 1 assert max_age_opts[0].buffer == bytearray([60]) print("PASS: Max-Age option encode (value=60)") def test_observer_manager_register(): """ObserveManager should register and retrieve observers.""" mgr = ObserveManager(debug=False) assert not mgr.has_observers("test/resource") assert mgr.observer_count() == 0 ok = mgr.register("test/resource", "192.168.1.10", 5683, bytearray([1, 2])) assert ok assert mgr.has_observers("test/resource") assert mgr.observer_count("test/resource") == 1 assert mgr.observer_count() == 1 print("PASS: observer registration") def test_observer_manager_reregister(): """Re-registering the same observer should update token, not duplicate.""" mgr = ObserveManager(debug=False) mgr.register("test/resource", "192.168.1.10", 5683, bytearray([1, 2])) mgr.register("test/resource", "192.168.1.10", 5683, bytearray([3, 4])) assert mgr.observer_count("test/resource") == 1 observers = mgr.get_observers("test/resource") assert observers[0]["token"] == bytearray([3, 4]) print("PASS: observer re-registration updates token") def test_observer_manager_deregister(): """Deregistering should remove the observer.""" mgr = ObserveManager(debug=False) mgr.register("test/resource", "192.168.1.10", 5683, bytearray([1])) mgr.register("test/resource", "192.168.1.20", 5683, bytearray([2])) assert mgr.observer_count("test/resource") == 2 mgr.deregister("test/resource", "192.168.1.10", 5683) assert mgr.observer_count("test/resource") == 1 assert mgr.get_observers("test/resource")[0]["ip"] == "192.168.1.20" print("PASS: observer deregistration") def test_observer_manager_deregister_by_token(): """Deregistering by token (RST handling) should work.""" mgr = ObserveManager(debug=False) mgr.register("test/resource", "192.168.1.10", 5683, bytearray([0xAA])) mgr.register("test/resource", "192.168.1.20", 5684, bytearray([0xBB])) mgr.deregister_by_token("test/resource", bytearray([0xAA])) assert mgr.observer_count("test/resource") == 1 assert mgr.get_observers("test/resource")[0]["token"] == bytearray([0xBB]) print("PASS: observer deregister by token (RST)") def test_observer_manager_limits(): """Observer limits should be enforced.""" mgr = ObserveManager(debug=False) mgr.MAX_OBSERVERS_PER_RESOURCE = 2 mgr.MAX_TOTAL_OBSERVERS = 3 assert mgr.register("res1", "10.0.0.1", 5683, bytearray([1])) assert mgr.register("res1", "10.0.0.2", 5683, bytearray([2])) assert not mgr.register("res1", "10.0.0.3", 5683, bytearray([3])) # per-resource limit print("PASS: per-resource observer limit enforced") assert mgr.register("res2", "10.0.0.3", 5683, bytearray([3])) assert not mgr.register("res2", "10.0.0.4", 5683, bytearray([4])) # total limit print("PASS: total observer limit enforced") def test_observer_manager_sequence(): """Sequence numbers should increment and wrap at 24 bits.""" mgr = ObserveManager(debug=False) mgr._sequence["test"] = 0xFFFFFE assert mgr.next_sequence("test") == 0xFFFFFE assert mgr.next_sequence("test") == 0xFFFFFF assert mgr.next_sequence("test") == 0 # Wrap print("PASS: sequence number wrap at 24 bits") def test_get_uri_path(): """getUriPath() should reconstruct the path from options.""" pkt = CoapPacket() pkt.setUriPath("sensors/soil_moisture") assert pkt.getUriPath() == "sensors/soil_moisture" print("PASS: getUriPath reconstruction") # ── Run all tests ── if __name__ == "__main__": tests = [ test_observe_option_encode_single_byte, test_observe_option_encode_two_bytes, test_observe_option_encode_three_bytes, test_get_observe_value, test_packet_roundtrip_with_observe, test_max_age_option, test_observer_manager_register, test_observer_manager_reregister, test_observer_manager_deregister, test_observer_manager_deregister_by_token, test_observer_manager_limits, test_observer_manager_sequence, test_get_uri_path, ] passed = 0 failed = 0 for test in tests: try: test() passed += 1 except Exception as e: print("FAIL: {} - {}".format(test.__name__, e)) import traceback traceback.print_exc() failed += 1 print("\n" + "=" * 40) print("Results: {} passed, {} failed".format(passed, failed)) print("=" * 40)