Files
stentor/stentor-gateway/test_client.py
Robert Helewka 912593b796 feat: scaffold stentor-gateway with FastAPI voice pipeline
Initialize the stentor-gateway project with WebSocket-based voice
pipeline orchestrating STT → Agent → TTS via OpenAI-compatible APIs.

- Add FastAPI app with WebSocket endpoint for audio streaming
- Add pipeline orchestration (stt_client, tts_client, agent_client)
- Add Pydantic Settings configuration and message models
- Add audio utilities for PCM/WAV conversion and resampling
- Add health check endpoints
- Add Dockerfile and pyproject.toml with dependencies
- Add initial test suite (pipeline, STT, TTS, WebSocket)
- Add comprehensive README covering gateway and ESP32 ear design
- Clean up .gitignore for Python/uv project
2026-03-21 19:11:48 +00:00

227 lines
7.3 KiB
Python

#!/usr/bin/env python3
"""Stentor Gateway test client.
Sends a WAV file over WebSocket to the Stentor Gateway and plays back
or saves the audio response. Useful for testing without ESP32 hardware.
Usage:
# Send a WAV file and save the response
python test_client.py --input recording.wav --output response.pcm
# Send a WAV file to a custom gateway URL
python test_client.py --input recording.wav --gateway ws://10.10.0.5:8600
# Generate silent audio for testing connectivity
python test_client.py --test-silence
"""
import argparse
import asyncio
import base64
import json
import struct
import sys
import wave
from pathlib import Path
async def run_client(
gateway_url: str,
audio_data: bytes,
client_id: str = "test-client",
output_file: str | None = None,
) -> None:
"""Connect to the gateway, send audio, and receive the response.
Args:
gateway_url: WebSocket URL of the Stentor Gateway.
audio_data: Raw PCM audio bytes to send.
client_id: Client identifier.
output_file: Optional path to save response PCM audio.
"""
try:
import websockets
except ImportError:
print("Error: 'websockets' package required. Install with: pip install websockets")
sys.exit(1)
print(f"Connecting to {gateway_url}...")
async with websockets.connect(gateway_url) as ws:
# 1. Start session
await ws.send(json.dumps({
"type": "session.start",
"client_id": client_id,
"audio_config": {
"sample_rate": 16000,
"channels": 1,
"sample_width": 16,
"encoding": "pcm_s16le",
},
}))
# Wait for session.created
msg = json.loads(await ws.recv())
assert msg["type"] == "session.created", f"Expected session.created, got {msg}"
session_id = msg["session_id"]
print(f"Session created: {session_id}")
# Wait for listening status
msg = json.loads(await ws.recv())
print(f"Status: {msg.get('state', msg)}")
# 2. Stream audio in chunks (32ms chunks at 16kHz = 1024 bytes)
chunk_size = 1024
total_chunks = 0
for offset in range(0, len(audio_data), chunk_size):
chunk = audio_data[offset : offset + chunk_size]
b64_chunk = base64.b64encode(chunk).decode("ascii")
await ws.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": b64_chunk,
}))
total_chunks += 1
print(f"Sent {total_chunks} audio chunks ({len(audio_data)} bytes)")
# 3. Commit the audio buffer
await ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
print("Audio committed, waiting for response...")
# 4. Receive response events
response_audio = bytearray()
done = False
while not done:
raw = await ws.recv()
msg = json.loads(raw)
msg_type = msg.get("type", "")
if msg_type == "status":
print(f" Status: {msg['state']}")
elif msg_type == "transcript.done":
print(f" Transcript: {msg['text']}")
elif msg_type == "response.text.done":
print(f" Response: {msg['text']}")
elif msg_type == "response.audio.delta":
chunk = base64.b64decode(msg["delta"])
response_audio.extend(chunk)
print(f" Audio chunk: {len(chunk)} bytes", end="\r")
elif msg_type == "response.audio.done":
print(f"\n Audio complete: {len(response_audio)} bytes total")
elif msg_type == "response.done":
print(" Response complete!")
done = True
elif msg_type == "error":
print(f" ERROR [{msg.get('code', '?')}]: {msg['message']}")
done = True
else:
print(f" Unknown event: {msg_type}")
# 5. Save response audio
if response_audio:
if output_file:
out_path = Path(output_file)
if out_path.suffix == ".wav":
# Write as WAV
with wave.open(str(out_path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(bytes(response_audio))
else:
# Write raw PCM
out_path.write_bytes(bytes(response_audio))
print(f"Response audio saved to {output_file}")
else:
print("(Use --output to save response audio)")
# 6. Close session
await ws.send(json.dumps({"type": "session.close"}))
print("Session closed.")
def load_wav_as_pcm(wav_path: str) -> bytes:
"""Load a WAV file and return raw PCM data."""
with wave.open(wav_path, "rb") as wf:
print(f"Input: {wav_path}")
print(f" Channels: {wf.getnchannels()}")
print(f" Sample rate: {wf.getframerate()} Hz")
print(f" Sample width: {wf.getsampwidth() * 8}-bit")
print(f" Frames: {wf.getnframes()}")
print(f" Duration: {wf.getnframes() / wf.getframerate():.2f}s")
if wf.getframerate() != 16000:
print(f" WARNING: Expected 16kHz, got {wf.getframerate()} Hz")
if wf.getnchannels() != 1:
print(f" WARNING: Expected mono, got {wf.getnchannels()} channels")
if wf.getsampwidth() != 2:
print(f" WARNING: Expected 16-bit, got {wf.getsampwidth() * 8}-bit")
return wf.readframes(wf.getnframes())
def generate_silence(duration_ms: int = 2000) -> bytes:
"""Generate silent PCM audio for testing."""
num_samples = int(16000 * duration_ms / 1000)
print(f"Generated {duration_ms}ms of silence ({num_samples} samples)")
return struct.pack(f"<{num_samples}h", *([0] * num_samples))
def main():
parser = argparse.ArgumentParser(
description="Stentor Gateway test client",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument(
"--gateway",
default="ws://localhost:8600/api/v1/realtime",
help="Gateway WebSocket URL (default: ws://localhost:8600/api/v1/realtime)",
)
parser.add_argument(
"--input", "-i",
help="Path to input WAV file (16kHz, mono, 16-bit)",
)
parser.add_argument(
"--output", "-o",
help="Path to save response audio (.wav or .pcm)",
)
parser.add_argument(
"--client-id",
default="test-client",
help="Client identifier (default: test-client)",
)
parser.add_argument(
"--test-silence",
action="store_true",
help="Send 2 seconds of silence (for connectivity testing)",
)
args = parser.parse_args()
if args.test_silence:
audio_data = generate_silence()
elif args.input:
audio_data = load_wav_as_pcm(args.input)
else:
parser.error("Specify --input WAV_FILE or --test-silence")
asyncio.run(run_client(
gateway_url=args.gateway,
audio_data=audio_data,
client_id=args.client_id,
output_file=args.output,
))
if __name__ == "__main__":
main()