#!/usr/bin/env python3 """Stentor Gateway test client. Sends a WAV file over WebSocket to the Stentor Gateway and plays back or saves the audio response. Useful for testing without ESP32 hardware. Usage: # Send a WAV file and save the response python test_client.py --input recording.wav --output response.pcm # Send a WAV file to a custom gateway URL python test_client.py --input recording.wav --gateway ws://10.10.0.5:8600 # Generate silent audio for testing connectivity python test_client.py --test-silence """ import argparse import asyncio import base64 import json import struct import sys import wave from pathlib import Path async def run_client( gateway_url: str, audio_data: bytes, client_id: str = "test-client", output_file: str | None = None, ) -> None: """Connect to the gateway, send audio, and receive the response. Args: gateway_url: WebSocket URL of the Stentor Gateway. audio_data: Raw PCM audio bytes to send. client_id: Client identifier. output_file: Optional path to save response PCM audio. """ try: import websockets except ImportError: print("Error: 'websockets' package required. Install with: pip install websockets") sys.exit(1) print(f"Connecting to {gateway_url}...") async with websockets.connect(gateway_url) as ws: # 1. Start session await ws.send(json.dumps({ "type": "session.start", "client_id": client_id, "audio_config": { "sample_rate": 16000, "channels": 1, "sample_width": 16, "encoding": "pcm_s16le", }, })) # Wait for session.created msg = json.loads(await ws.recv()) assert msg["type"] == "session.created", f"Expected session.created, got {msg}" session_id = msg["session_id"] print(f"Session created: {session_id}") # Wait for listening status msg = json.loads(await ws.recv()) print(f"Status: {msg.get('state', msg)}") # 2. Stream audio in chunks (32ms chunks at 16kHz = 1024 bytes) chunk_size = 1024 total_chunks = 0 for offset in range(0, len(audio_data), chunk_size): chunk = audio_data[offset : offset + chunk_size] b64_chunk = base64.b64encode(chunk).decode("ascii") await ws.send(json.dumps({ "type": "input_audio_buffer.append", "audio": b64_chunk, })) total_chunks += 1 print(f"Sent {total_chunks} audio chunks ({len(audio_data)} bytes)") # 3. Commit the audio buffer await ws.send(json.dumps({"type": "input_audio_buffer.commit"})) print("Audio committed, waiting for response...") # 4. Receive response events response_audio = bytearray() done = False while not done: raw = await ws.recv() msg = json.loads(raw) msg_type = msg.get("type", "") if msg_type == "status": print(f" Status: {msg['state']}") elif msg_type == "transcript.done": print(f" Transcript: {msg['text']}") elif msg_type == "response.text.done": print(f" Response: {msg['text']}") elif msg_type == "response.audio.delta": chunk = base64.b64decode(msg["delta"]) response_audio.extend(chunk) print(f" Audio chunk: {len(chunk)} bytes", end="\r") elif msg_type == "response.audio.done": print(f"\n Audio complete: {len(response_audio)} bytes total") elif msg_type == "response.done": print(" Response complete!") done = True elif msg_type == "error": print(f" ERROR [{msg.get('code', '?')}]: {msg['message']}") done = True else: print(f" Unknown event: {msg_type}") # 5. Save response audio if response_audio: if output_file: out_path = Path(output_file) if out_path.suffix == ".wav": # Write as WAV with wave.open(str(out_path), "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) wf.writeframes(bytes(response_audio)) else: # Write raw PCM out_path.write_bytes(bytes(response_audio)) print(f"Response audio saved to {output_file}") else: print("(Use --output to save response audio)") # 6. Close session await ws.send(json.dumps({"type": "session.close"})) print("Session closed.") def load_wav_as_pcm(wav_path: str) -> bytes: """Load a WAV file and return raw PCM data.""" with wave.open(wav_path, "rb") as wf: print(f"Input: {wav_path}") print(f" Channels: {wf.getnchannels()}") print(f" Sample rate: {wf.getframerate()} Hz") print(f" Sample width: {wf.getsampwidth() * 8}-bit") print(f" Frames: {wf.getnframes()}") print(f" Duration: {wf.getnframes() / wf.getframerate():.2f}s") if wf.getframerate() != 16000: print(f" WARNING: Expected 16kHz, got {wf.getframerate()} Hz") if wf.getnchannels() != 1: print(f" WARNING: Expected mono, got {wf.getnchannels()} channels") if wf.getsampwidth() != 2: print(f" WARNING: Expected 16-bit, got {wf.getsampwidth() * 8}-bit") return wf.readframes(wf.getnframes()) def generate_silence(duration_ms: int = 2000) -> bytes: """Generate silent PCM audio for testing.""" num_samples = int(16000 * duration_ms / 1000) print(f"Generated {duration_ms}ms of silence ({num_samples} samples)") return struct.pack(f"<{num_samples}h", *([0] * num_samples)) def main(): parser = argparse.ArgumentParser( description="Stentor Gateway test client", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument( "--gateway", default="ws://localhost:8600/api/v1/realtime", help="Gateway WebSocket URL (default: ws://localhost:8600/api/v1/realtime)", ) parser.add_argument( "--input", "-i", help="Path to input WAV file (16kHz, mono, 16-bit)", ) parser.add_argument( "--output", "-o", help="Path to save response audio (.wav or .pcm)", ) parser.add_argument( "--client-id", default="test-client", help="Client identifier (default: test-client)", ) parser.add_argument( "--test-silence", action="store_true", help="Send 2 seconds of silence (for connectivity testing)", ) args = parser.parse_args() if args.test_silence: audio_data = generate_silence() elif args.input: audio_data = load_wav_as_pcm(args.input) else: parser.error("Specify --input WAV_FILE or --test-silence") asyncio.run(run_client( gateway_url=args.gateway, audio_data=audio_data, client_id=args.client_id, output_file=args.output, )) if __name__ == "__main__": main()