""" Demeter ESP Sensor Node - Sensor Hardware Abstraction Reads analog/digital sensors and returns normalized values. Designed for ESP32 but with ESP8266 fallback. Usage: sensors = SensorManager(config) sensors.init() reading = sensors.read_soil_moisture() # returns 0.0 - 100.0 """ try: from machine import Pin, ADC except ImportError: # Running on CPython for testing class Pin: IN = 0 PULL_UP = 1 IRQ_FALLING = 2 IRQ_RISING = 1 IRQ_BOTH = 3 def __init__(self, *a, **kw): pass def value(self): return 0 def irq(self, **kw): pass class ADC: ATTN_11DB = 3 WIDTH_12BIT = 3 def __init__(self, pin): pass def atten(self, a): pass def width(self, w): pass def read(self): return 2048 try: import json except ImportError: import ujson as json def _clamp(value, lo, hi): return max(lo, min(hi, value)) def _map_range(value, in_min, in_max, out_min, out_max): """Map a value from one range to another, clamped.""" if in_max == in_min: return out_min mapped = (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min return _clamp(mapped, min(out_min, out_max), max(out_min, out_max)) class SensorManager: """Manages all sensors for a Demeter node.""" def __init__(self, config): self.config = config self._soil_adc = None self._water_adc = None self._trigger_pin = None self._temp_sensor = None self._trigger_fired = False self._trigger_value = 0 def init(self): """Initialize all configured sensor hardware.""" cfg = self.config # Soil moisture (analog) if cfg.SOIL_MOISTURE_PIN is not None: pin = Pin(cfg.SOIL_MOISTURE_PIN) self._soil_adc = ADC(pin) try: self._soil_adc.atten(ADC.ATTN_11DB) # Full 3.3V range self._soil_adc.width(ADC.WIDTH_12BIT) # 0-4095 except Exception: pass # ESP8266 doesn't support these # Water level (analog) if cfg.WATER_LEVEL_PIN is not None: pin = Pin(cfg.WATER_LEVEL_PIN) self._water_adc = ADC(pin) try: self._water_adc.atten(ADC.ATTN_11DB) self._water_adc.width(ADC.WIDTH_12BIT) except Exception: pass # Temperature sensor if cfg.TEMP_SENSOR_PIN is not None: self._init_temp_sensor() # Digital trigger with interrupt if cfg.TRIGGER_PIN is not None: self._trigger_pin = Pin(cfg.TRIGGER_PIN, Pin.IN, Pin.PULL_UP) self._trigger_value = self._trigger_pin.value() edge = Pin.IRQ_FALLING if cfg.TRIGGER_EDGE == "rising": edge = Pin.IRQ_RISING elif cfg.TRIGGER_EDGE == "both": edge = Pin.IRQ_FALLING | Pin.IRQ_RISING self._trigger_pin.irq(trigger=edge, handler=self._trigger_isr) def _init_temp_sensor(self): """Initialize temperature sensor based on config type.""" cfg = self.config try: if cfg.TEMP_SENSOR_TYPE == "dht22": import dht self._temp_sensor = dht.DHT22(Pin(cfg.TEMP_SENSOR_PIN)) elif cfg.TEMP_SENSOR_TYPE == "ds18b20": import onewire import ds18x20 ow = onewire.OneWire(Pin(cfg.TEMP_SENSOR_PIN)) self._temp_sensor = ds18x20.DS18X20(ow) roms = self._temp_sensor.scan() if roms: self._ds18b20_rom = roms[0] else: print("[sensors] No DS18B20 found on pin", cfg.TEMP_SENSOR_PIN) self._temp_sensor = None except ImportError: print("[sensors] Temperature sensor driver not available") self._temp_sensor = None def _trigger_isr(self, pin): """ISR for digital trigger - just sets a flag (no CoAP inside ISR).""" self._trigger_fired = True self._trigger_value = pin.value() # ── Reading Methods ── def read_soil_moisture(self): """Read soil moisture as a percentage (0.0 = dry, 100.0 = wet). Returns None if sensor is not configured. """ if self._soil_adc is None: return None raw = self._soil_adc.read() cfg = self.config return round(_map_range(raw, cfg.SOIL_MOISTURE_DRY, cfg.SOIL_MOISTURE_WET, 0.0, 100.0), 1) def read_temperature(self): """Read temperature in Celsius. Returns None if sensor is not configured or read fails. """ if self._temp_sensor is None: return None cfg = self.config try: if cfg.TEMP_SENSOR_TYPE == "dht22": self._temp_sensor.measure() return round(self._temp_sensor.temperature(), 1) elif cfg.TEMP_SENSOR_TYPE == "ds18b20": self._temp_sensor.convert_temp() # DS18B20 needs ~750ms conversion time import time time.sleep_ms(750) return round(self._temp_sensor.read_temp(self._ds18b20_rom), 1) except Exception as e: print("[sensors] Temperature read error:", e) return None def read_water_level(self): """Read water level as a percentage (0.0 = empty, 100.0 = full). Returns None if sensor is not configured. """ if self._water_adc is None: return None raw = self._water_adc.read() cfg = self.config return round(_map_range(raw, cfg.WATER_LEVEL_MIN, cfg.WATER_LEVEL_MAX, 0.0, 100.0), 1) def read_trigger(self): """Read the current digital trigger pin state. Returns (value, fired) where: value: current pin state (0 or 1) fired: True if the ISR has fired since last call (clears flag) """ if self._trigger_pin is None: return (0, False) fired = self._trigger_fired self._trigger_fired = False return (self._trigger_value, fired) def read_all_json(self): """Read all sensors and return a JSON string. Example output: { "device": "esp32-plant-01", "soil_moisture": 45.2, "temperature": 24.1, "water_level": 78.5, "trigger": 0 } """ data = {"device": self.config.DEVICE_ID} val = self.read_soil_moisture() if val is not None: data["soil_moisture"] = val val = self.read_temperature() if val is not None: data["temperature"] = val val = self.read_water_level() if val is not None: data["water_level"] = val trigger_val, _ = self.read_trigger() data["trigger"] = trigger_val return json.dumps(data)