Files
demeter/esp/sensors.py

224 lines
6.9 KiB
Python

"""
Demeter ESP Sensor Node - Sensor Hardware Abstraction
Reads analog/digital sensors and returns normalized values.
Designed for ESP32 but with ESP8266 fallback.
Usage:
sensors = SensorManager(config)
sensors.init()
reading = sensors.read_soil_moisture() # returns 0.0 - 100.0
"""
try:
from machine import Pin, ADC
except ImportError:
# Running on CPython for testing
class Pin:
IN = 0
PULL_UP = 1
IRQ_FALLING = 2
IRQ_RISING = 1
IRQ_BOTH = 3
def __init__(self, *a, **kw): pass
def value(self): return 0
def irq(self, **kw): pass
class ADC:
ATTN_11DB = 3
WIDTH_12BIT = 3
def __init__(self, pin): pass
def atten(self, a): pass
def width(self, w): pass
def read(self): return 2048
try:
import json
except ImportError:
import ujson as json
def _clamp(value, lo, hi):
return max(lo, min(hi, value))
def _map_range(value, in_min, in_max, out_min, out_max):
"""Map a value from one range to another, clamped."""
if in_max == in_min:
return out_min
mapped = (value - in_min) * (out_max - out_min) / (in_max - in_min) + out_min
return _clamp(mapped, min(out_min, out_max), max(out_min, out_max))
class SensorManager:
"""Manages all sensors for a Demeter node."""
def __init__(self, config):
self.config = config
self._soil_adc = None
self._water_adc = None
self._trigger_pin = None
self._temp_sensor = None
self._trigger_fired = False
self._trigger_value = 0
def init(self):
"""Initialize all configured sensor hardware."""
cfg = self.config
# Soil moisture (analog)
if cfg.SOIL_MOISTURE_PIN is not None:
pin = Pin(cfg.SOIL_MOISTURE_PIN)
self._soil_adc = ADC(pin)
try:
self._soil_adc.atten(ADC.ATTN_11DB) # Full 3.3V range
self._soil_adc.width(ADC.WIDTH_12BIT) # 0-4095
except Exception:
pass # ESP8266 doesn't support these
# Water level (analog)
if cfg.WATER_LEVEL_PIN is not None:
pin = Pin(cfg.WATER_LEVEL_PIN)
self._water_adc = ADC(pin)
try:
self._water_adc.atten(ADC.ATTN_11DB)
self._water_adc.width(ADC.WIDTH_12BIT)
except Exception:
pass
# Temperature sensor
if cfg.TEMP_SENSOR_PIN is not None:
self._init_temp_sensor()
# Digital trigger with interrupt
if cfg.TRIGGER_PIN is not None:
self._trigger_pin = Pin(cfg.TRIGGER_PIN, Pin.IN, Pin.PULL_UP)
self._trigger_value = self._trigger_pin.value()
edge = Pin.IRQ_FALLING
if cfg.TRIGGER_EDGE == "rising":
edge = Pin.IRQ_RISING
elif cfg.TRIGGER_EDGE == "both":
edge = Pin.IRQ_FALLING | Pin.IRQ_RISING
self._trigger_pin.irq(trigger=edge, handler=self._trigger_isr)
def _init_temp_sensor(self):
"""Initialize temperature sensor based on config type."""
cfg = self.config
try:
if cfg.TEMP_SENSOR_TYPE == "dht22":
import dht
self._temp_sensor = dht.DHT22(Pin(cfg.TEMP_SENSOR_PIN))
elif cfg.TEMP_SENSOR_TYPE == "ds18b20":
import onewire
import ds18x20
ow = onewire.OneWire(Pin(cfg.TEMP_SENSOR_PIN))
self._temp_sensor = ds18x20.DS18X20(ow)
roms = self._temp_sensor.scan()
if roms:
self._ds18b20_rom = roms[0]
else:
print("[sensors] No DS18B20 found on pin", cfg.TEMP_SENSOR_PIN)
self._temp_sensor = None
except ImportError:
print("[sensors] Temperature sensor driver not available")
self._temp_sensor = None
def _trigger_isr(self, pin):
"""ISR for digital trigger - just sets a flag (no CoAP inside ISR)."""
self._trigger_fired = True
self._trigger_value = pin.value()
# ── Reading Methods ──
def read_soil_moisture(self):
"""Read soil moisture as a percentage (0.0 = dry, 100.0 = wet).
Returns None if sensor is not configured.
"""
if self._soil_adc is None:
return None
raw = self._soil_adc.read()
cfg = self.config
return round(_map_range(raw, cfg.SOIL_MOISTURE_DRY, cfg.SOIL_MOISTURE_WET, 0.0, 100.0), 1)
def read_temperature(self):
"""Read temperature in Celsius.
Returns None if sensor is not configured or read fails.
"""
if self._temp_sensor is None:
return None
cfg = self.config
try:
if cfg.TEMP_SENSOR_TYPE == "dht22":
self._temp_sensor.measure()
return round(self._temp_sensor.temperature(), 1)
elif cfg.TEMP_SENSOR_TYPE == "ds18b20":
self._temp_sensor.convert_temp()
# DS18B20 needs ~750ms conversion time
import time
time.sleep_ms(750)
return round(self._temp_sensor.read_temp(self._ds18b20_rom), 1)
except Exception as e:
print("[sensors] Temperature read error:", e)
return None
def read_water_level(self):
"""Read water level as a percentage (0.0 = empty, 100.0 = full).
Returns None if sensor is not configured.
"""
if self._water_adc is None:
return None
raw = self._water_adc.read()
cfg = self.config
return round(_map_range(raw, cfg.WATER_LEVEL_MIN, cfg.WATER_LEVEL_MAX, 0.0, 100.0), 1)
def read_trigger(self):
"""Read the current digital trigger pin state.
Returns (value, fired) where:
value: current pin state (0 or 1)
fired: True if the ISR has fired since last call (clears flag)
"""
if self._trigger_pin is None:
return (0, False)
fired = self._trigger_fired
self._trigger_fired = False
return (self._trigger_value, fired)
def read_all_json(self):
"""Read all sensors and return a JSON string.
Example output:
{
"device": "esp32-plant-01",
"soil_moisture": 45.2,
"temperature": 24.1,
"water_level": 78.5,
"trigger": 0
}
"""
data = {"device": self.config.DEVICE_ID}
val = self.read_soil_moisture()
if val is not None:
data["soil_moisture"] = val
val = self.read_temperature()
if val is not None:
data["temperature"] = val
val = self.read_water_level()
if val is not None:
data["water_level"] = val
trigger_val, _ = self.read_trigger()
data["trigger"] = trigger_val
return json.dumps(data)