""" Dial Plan — Pattern matching and digit normalisation. Matches a dialled string to a route type and normalises the destination to a canonical form the rest of the gateway can act on. Route types: "extension" — internal 2XX endpoint "service" — internal 5XX system service "pstn" — outbound call via SIP trunk (normalised E.164) "invalid" — no match """ import re from dataclasses import dataclass from typing import Optional # ================================================================ # Emergency numbers — always route to PSTN, highest priority # ================================================================ EMERGENCY_NUMBERS: dict[str, str] = { "911": "+1911", # North American emergency "9911": "+1911", # Mis-dial with phantom '9' prefix "112": "+112", # International GSM emergency } # ================================================================ # Extension ranges # ================================================================ EXTENSION_FIRST = 221 EXTENSION_LAST = 299 SERVICE_FIRST = 500 SERVICE_LAST = 599 # ================================================================ # Known system services # ================================================================ SERVICES: dict[int, str] = { 500: "auto_attendant", 510: "gateway_status", 511: "echo_test", 520: "hold_slayer_launch", 599: "operator_fallback", } # ================================================================ # Route result # ================================================================ @dataclass class RouteResult: """Result of a dial plan lookup.""" route_type: str # "extension" | "service" | "pstn" | "invalid" destination: str # normalised — extension number, service name, or E.164 original: str # what was dialled description: str = "" @property def is_internal(self) -> bool: return self.route_type in ("extension", "service") @property def is_outbound(self) -> bool: return self.route_type == "pstn" @property def is_valid(self) -> bool: return self.route_type != "invalid" # ================================================================ # Core matcher # ================================================================ def match(digits: str) -> RouteResult: """ Match dialled digits against the dial plan. Returns a RouteResult with the normalised destination. Examples: match("221") → RouteResult(route_type="extension", destination="221") match("511") → RouteResult(route_type="service", destination="echo_test") match("6135550100") → RouteResult(route_type="pstn", destination="+16135550100") match("16135550100") → RouteResult(route_type="pstn", destination="+16135550100") match("+16135550100") → RouteResult(route_type="pstn", destination="+16135550100") match("01144201234") → RouteResult(route_type="pstn", destination="+44201234") """ digits = digits.strip() # ---- Emergency numbers — checked first, no interception ---- if digits in EMERGENCY_NUMBERS: e164 = EMERGENCY_NUMBERS[digits] return RouteResult( route_type="pstn", destination=e164, original=digits, description=f"EMERGENCY {digits} → {e164}", ) # ---- 2XX extensions ---- if re.fullmatch(r"2\d{2}", digits): ext = int(digits) if EXTENSION_FIRST <= ext <= EXTENSION_LAST: return RouteResult( route_type="extension", destination=digits, original=digits, description=f"Extension {digits}", ) # ---- 5XX system services ---- if re.fullmatch(r"5\d{2}", digits): svc = int(digits) if SERVICE_FIRST <= svc <= SERVICE_LAST: name = SERVICES.get(svc, f"service_{svc}") return RouteResult( route_type="service", destination=name, original=digits, description=f"System service: {name}", ) # ---- PSTN outbound ---- e164 = _normalise_e164(digits) if e164: return RouteResult( route_type="pstn", destination=e164, original=digits, description=f"PSTN outbound → {e164}", ) return RouteResult( route_type="invalid", destination=digits, original=digits, description=f"No route for '{digits}'", ) # ================================================================ # E.164 normalisation # ================================================================ def _normalise_e164(digits: str) -> Optional[str]: """ Normalise a dialled string to E.164 (+CC…). Handles: +CCNNN… → unchanged (already E.164) 1NPANXXXXXX → +1NPANXXXXXX (NANP with country code, 11 digits) NPANXXXXXX → +1NPANXXXXXX (NANP 10-digit) 011CCNNN… → +CCNNN… (IDD 011 prefix) 00CCNNN… → +CCNNN… (IDD 00 prefix) """ # Strip spaces/dashes/dots/parens for matching only clean = re.sub(r"[\s\-\.\(\)]", "", digits) # Already E.164 if re.fullmatch(r"\+\d{7,15}", clean): return clean # NANP: 1 + 10 digits (NPA must be 2-9, NXX must be 2-9) if re.fullmatch(r"1[2-9]\d{2}[2-9]\d{6}", clean): return f"+{clean}" # NANP: 10 digits only if re.fullmatch(r"[2-9]\d{2}[2-9]\d{6}", clean): return f"+1{clean}" # IDD 011 (North American international dialling prefix) m = re.fullmatch(r"011(\d{7,13})", clean) if m: return f"+{m.group(1)}" # IDD 00 (international dialling prefix used in many countries) m = re.fullmatch(r"00(\d{7,13})", clean) if m: return f"+{m.group(1)}" return None # ================================================================ # Extension helpers # ================================================================ def next_extension(used: set[int]) -> Optional[int]: """ Return the lowest available extension in the 2XX range. Args: used: Set of already-assigned extension numbers. Returns: Next free extension, or None if the range is exhausted. """ for ext in range(EXTENSION_FIRST, EXTENSION_LAST + 1): if ext not in used: return ext return None def is_extension(digits: str) -> bool: """True if the string is a valid 2XX extension.""" return bool(re.fullmatch(r"2\d{2}", digits)) and ( EXTENSION_FIRST <= int(digits) <= EXTENSION_LAST ) def is_service(digits: str) -> bool: """True if the string is a valid 5XX service code.""" return bool(re.fullmatch(r"5\d{2}", digits)) and ( SERVICE_FIRST <= int(digits) <= SERVICE_LAST )