Files
local-transcription/client/deepgram_transcription.py
Developer 352615c15c
All checks were successful
Tests / Python Backend Tests (push) Successful in 5s
Tests / Frontend Tests (push) Successful in 8s
Tests / Rust Sidecar Tests (push) Successful in 2m0s
Fix Deepgram broken pipe: wait for WebSocket before starting audio
Audio capture started immediately after spawning the WebSocket thread,
but the WebSocket hadn't connected yet. Audio chunks sent to the
unconnected WebSocket caused a broken pipe error.

Fix: added a threading.Event that start_recording() waits on (up to
15s) before opening the audio stream. The event is set in _ws_lifecycle
after the WebSocket connects and handshake completes.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 12:18:47 -07:00

555 lines
20 KiB
Python

"""Deepgram-based transcription engine using WebSocket streaming.
Supports two modes:
- Managed mode: connects to a proxy server that handles Deepgram credentials
- BYOK mode: connects directly to the Deepgram API with a user-provided key
Implements the same duck-type interface as RealtimeTranscriptionEngine so
MainWindow can use it as a drop-in replacement.
"""
import asyncio
import json
import logging
import numpy as np
import threading
from datetime import datetime
from queue import Queue, Empty
from typing import Optional, Callable
from client.models import TranscriptionResult
logger = logging.getLogger(__name__)
class DeepgramTranscriptionEngine:
"""
Transcription engine that streams audio to Deepgram via WebSocket.
In managed mode the connection goes through a proxy at
``wss://<server>/ws/transcribe`` which handles authentication and
Deepgram credentials. In BYOK (bring-your-own-key) mode the
connection goes directly to the Deepgram API.
"""
# ------------------------------------------------------------------ #
# Construction / configuration
# ------------------------------------------------------------------ #
def __init__(self, config, user_name: str = "User", input_device_index: Optional[int] = None):
"""
Initialise the engine from a :class:`client.config.Config` object.
Args:
config: Application ``Config`` instance.
user_name: Display name attached to transcriptions.
input_device_index: Index of the audio input device to use
(``None`` for the system default).
"""
self.config = config
self.user_name = user_name
self.input_device_index = input_device_index
# Mode: 'managed' (proxy) or 'byok' (direct Deepgram)
self.mode: str = config.get("remote.mode", "managed")
# Managed-mode settings
self.server_url: str = config.get("remote.server_url", "")
self.auth_token: str = config.get("remote.auth_token", "")
# BYOK-mode settings
self.byok_api_key: str = config.get("remote.byok_api_key", "")
# Deepgram model / language (used in both modes)
self.deepgram_model: str = config.get("remote.deepgram_model", "nova-2")
self.language: str = config.get("remote.language", "en-US")
# Audio parameters
self.sample_rate: int = 16000
self.channels: int = 1
self.blocksize: int = 1024 # ~64ms chunks for lower latency streaming
# Callbacks
self.realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None
self.final_callback: Optional[Callable[[TranscriptionResult], None]] = None
self._on_error: Optional[Callable[[str], None]] = None
self._on_credits_low: Optional[Callable[[int], None]] = None
# Internal state
self._is_initialized: bool = False
self._is_recording: bool = False
self._stop_event: threading.Event = threading.Event()
self._audio_queue: Queue = Queue()
# Asyncio event loop running in a daemon thread
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._thread: Optional[threading.Thread] = None
# WebSocket handle (set inside the async context)
self._ws = None
# sounddevice InputStream
self._stream = None
# ------------------------------------------------------------------ #
# Callback setters
# ------------------------------------------------------------------ #
def set_callbacks(
self,
realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None,
final_callback: Optional[Callable[[TranscriptionResult], None]] = None,
):
"""Set transcription result callbacks (matches RealtimeTranscriptionEngine API)."""
self.realtime_callback = realtime_callback
self.final_callback = final_callback
def set_error_callback(self, fn: Optional[Callable[[str], None]]):
"""Set a callback invoked on errors. ``fn`` receives a string message."""
self._on_error = fn
def set_credits_low_callback(self, fn: Optional[Callable[[int], None]]):
"""Set a callback for low-credit warnings. ``fn`` receives seconds remaining."""
self._on_credits_low = fn
# ------------------------------------------------------------------ #
# Public interface (duck-typed with RealtimeTranscriptionEngine)
# ------------------------------------------------------------------ #
def initialize(self) -> bool:
"""Validate configuration and mark the engine as ready.
Returns ``True`` when the engine is ready to start recording.
"""
if self._is_initialized:
return True
if self.mode == "managed":
if not self.server_url:
logger.error("Managed mode requires a server URL (remote.server_url)")
return False
if not self.auth_token:
logger.error("Managed mode requires an auth token (remote.auth_token)")
return False
elif self.mode == "byok":
if not self.byok_api_key:
logger.error("BYOK mode requires an API key (remote.byok_api_key)")
return False
else:
logger.error("Unknown remote mode: %s (expected 'managed' or 'byok')", self.mode)
return False
self._is_initialized = True
logger.info("DeepgramTranscriptionEngine initialised in %s mode", self.mode)
return True
def start_recording(self) -> bool:
"""Open the audio stream and connect the WebSocket.
Returns ``True`` on success.
"""
if not self._is_initialized:
logger.error("Engine not initialised -- call initialize() first")
return False
if self._is_recording:
return True
self._stop_event.clear()
self._ws_connected = threading.Event()
self._is_recording = True
# Start the asyncio event-loop thread (handles WS send/receive)
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()
# Wait for the WebSocket to connect before starting audio capture.
# Without this, audio chunks arrive before the WS is open -> broken pipe.
if not self._ws_connected.wait(timeout=15):
logger.error("Timed out waiting for Deepgram WebSocket connection")
print("ERROR: Timed out waiting for Deepgram WebSocket connection")
self._last_error = "Timed out connecting to Deepgram"
self._is_recording = False
self._stop_event.set()
return False
# Start the audio capture stream
try:
self._start_audio_stream()
except Exception as exc:
logger.error("Failed to open audio stream: %s", exc)
print(f"ERROR: Failed to open audio stream: {exc}")
self._last_error = f"Audio stream error: {exc}"
self._is_recording = False
self._stop_event.set()
return False
logger.info("Recording started")
return True
def stop_recording(self):
"""Stop audio capture and close the WebSocket."""
if not self._is_recording:
return
self._is_recording = False
self._stop_event.set()
# Stop audio stream
self._stop_audio_stream()
# Close WebSocket from outside the event-loop thread
if self._ws is not None and self._loop is not None and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(self._close_ws(), self._loop)
# Wait for the thread to finish
if self._thread is not None:
self._thread.join(timeout=5)
self._thread = None
logger.info("Recording stopped")
def stop(self):
"""Full shutdown -- stop recording and release all resources."""
self.stop_recording()
self._is_initialized = False
logger.info("DeepgramTranscriptionEngine shut down")
def is_ready(self) -> bool:
"""Return ``True`` if the engine has been successfully initialised."""
return self._is_initialized
# ------------------------------------------------------------------ #
# Audio capture (sounddevice)
# ------------------------------------------------------------------ #
def _start_audio_stream(self):
"""Open a ``sounddevice.InputStream`` that feeds the audio queue."""
import sounddevice as sd
def _audio_callback(indata, frames, time_info, status): # noqa: ARG001
if status:
logger.warning("Audio stream status: %s", status)
if self._is_recording:
# float32 -> int16 PCM bytes
pcm = (indata * 32767).astype(np.int16).tobytes()
self._audio_queue.put(pcm)
self._stream = sd.InputStream(
samplerate=self.sample_rate,
blocksize=self.blocksize,
channels=self.channels,
dtype="float32",
device=self.input_device_index,
callback=_audio_callback,
)
self._stream.start()
def _stop_audio_stream(self):
"""Close the audio input stream."""
if self._stream is not None:
try:
self._stream.stop()
self._stream.close()
except Exception as exc:
logger.debug("Error closing audio stream: %s", exc)
finally:
self._stream = None
# ------------------------------------------------------------------ #
# Asyncio event-loop (runs in daemon thread)
# ------------------------------------------------------------------ #
def _run_event_loop(self):
"""Entry point for the daemon thread -- runs the async event loop."""
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._ws_lifecycle())
except Exception as exc:
logger.error("Event-loop error: %s", exc)
finally:
try:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
except Exception:
pass
self._loop.close()
self._loop = None
async def _ws_lifecycle(self):
"""Connect, authenticate (if managed), then run send/receive loops."""
import websockets
try:
ws_url, extra_headers = self._build_ws_url_and_headers()
logger.info("Connecting to %s", ws_url)
self._ws = await websockets.connect(
ws_url,
additional_headers=extra_headers,
ping_interval=20,
ping_timeout=10,
)
# Managed mode: send auth message and wait for ready
if self.mode == "managed":
if not await self._managed_handshake():
return
# Signal that the WebSocket is connected and ready
logger.info("WebSocket connected to Deepgram")
if hasattr(self, '_ws_connected'):
self._ws_connected.set()
# Run send and receive concurrently
await asyncio.gather(
self._send_loop(),
self._receive_loop(),
)
except asyncio.CancelledError:
pass
except Exception as exc:
msg = f"WebSocket error: {exc}"
logger.error(msg)
if self._on_error:
self._on_error(msg)
finally:
await self._close_ws()
def _build_ws_url_and_headers(self):
"""Return ``(url, headers)`` depending on the current mode."""
if self.mode == "managed":
# Ensure the server URL uses wss:// and append the path
url = self.server_url.rstrip("/")
if not url.startswith("ws://") and not url.startswith("wss://"):
url = f"wss://{url}"
url = f"{url}/ws/transcribe"
return url, {}
# BYOK -- connect directly to Deepgram
params = (
f"model={self.deepgram_model}"
f"&language={self.language}"
"&interim_results=true"
"&punctuate=true"
"&smart_format=true"
"&encoding=linear16"
f"&sample_rate={self.sample_rate}"
f"&channels={self.channels}"
)
url = f"wss://api.deepgram.com/v1/listen?{params}"
headers = {"Authorization": f"Token {self.byok_api_key}"}
return url, headers
# -- managed-mode handshake ---------------------------------------- #
async def _managed_handshake(self) -> bool:
"""Send auth message and wait for ``ready`` (managed mode).
Returns ``True`` on success.
"""
auth_msg = {
"type": "auth",
"token": self.auth_token,
"config": {
"model": self.deepgram_model,
"language": self.language,
"sample_rate": self.sample_rate,
"channels": self.channels,
"encoding": "linear16",
"interim_results": True,
},
}
await self._ws.send(json.dumps(auth_msg))
try:
raw = await asyncio.wait_for(self._ws.recv(), timeout=15)
data = json.loads(raw)
if data.get("type") == "ready":
logger.info("Managed proxy is ready")
return True
if data.get("type") == "error":
err = data.get("message", "unknown error")
logger.error("Auth error from proxy: %s", err)
if self._on_error:
self._on_error(f"Proxy auth error: {err}")
return False
logger.warning("Unexpected handshake message: %s", data)
return False
except asyncio.TimeoutError:
logger.error("Timed out waiting for proxy ready message")
if self._on_error:
self._on_error("Timed out waiting for proxy ready message")
return False
# -- send loop ----------------------------------------------------- #
async def _send_loop(self):
"""Drain the audio queue and push raw PCM bytes over the WebSocket."""
loop = asyncio.get_event_loop()
while not self._stop_event.is_set():
try:
# Use run_in_executor to avoid blocking the async event loop
# (which would stall the receive loop and delay transcriptions)
pcm_bytes = await asyncio.wait_for(
loop.run_in_executor(None, lambda: self._audio_queue.get(timeout=0.5)),
timeout=1.0,
)
except (Empty, asyncio.TimeoutError):
continue
try:
await self._ws.send(pcm_bytes)
except Exception as exc:
if not self._stop_event.is_set():
logger.error("Send error: %s", exc)
break
# -- receive loop -------------------------------------------------- #
async def _receive_loop(self):
"""Listen for messages from the WebSocket and dispatch them."""
while not self._stop_event.is_set():
try:
raw = await asyncio.wait_for(self._ws.recv(), timeout=1.0)
except asyncio.TimeoutError:
continue
except Exception as exc:
if not self._stop_event.is_set():
logger.error("Receive error: %s", exc)
break
try:
data = json.loads(raw)
except (json.JSONDecodeError, TypeError):
logger.debug("Non-JSON message received, ignoring")
continue
if self.mode == "managed":
self._handle_managed_message(data)
else:
self._handle_byok_message(data)
# ------------------------------------------------------------------ #
# Message handlers
# ------------------------------------------------------------------ #
def _handle_managed_message(self, data: dict):
"""Process a message from the managed proxy."""
msg_type = data.get("type", "")
if msg_type == "transcript":
text = data.get("text", "")
is_final = data.get("is_final", False)
if text.strip():
result = TranscriptionResult(
text=text,
is_final=is_final,
timestamp=datetime.now(),
user_name=self.user_name,
)
if is_final:
if self.final_callback:
self.final_callback(result)
else:
if self.realtime_callback:
self.realtime_callback(result)
elif msg_type == "credits_low":
seconds_remaining = data.get("seconds_remaining", 0)
logger.warning("Credits low -- %d seconds remaining", seconds_remaining)
if self._on_credits_low:
self._on_credits_low(int(seconds_remaining))
elif msg_type == "error":
code = data.get("code", "")
message = data.get("message", "Unknown error")
logger.error("Proxy error [%s]: %s", code, message)
if self._on_error:
self._on_error(f"[{code}] {message}" if code else message)
elif msg_type == "session_end":
seconds_used = data.get("seconds_used", 0)
logger.info("Session ended -- %d seconds used", seconds_used)
elif msg_type == "ready":
# May arrive again after reconnects; safe to ignore.
logger.debug("Received ready message (already connected)")
else:
logger.debug("Unhandled managed message type: %s", msg_type)
def _handle_byok_message(self, data: dict):
"""Process a message received directly from the Deepgram API."""
msg_type = data.get("type", "")
if msg_type == "Results":
channel = data.get("channel", {})
alternatives = channel.get("alternatives", [])
if not alternatives:
return
transcript = alternatives[0].get("transcript", "")
is_final = data.get("is_final", False)
if transcript.strip():
result = TranscriptionResult(
text=transcript,
is_final=is_final,
timestamp=datetime.now(),
user_name=self.user_name,
)
if is_final:
if self.final_callback:
self.final_callback(result)
else:
if self.realtime_callback:
self.realtime_callback(result)
elif msg_type == "Metadata":
logger.debug("Deepgram metadata: %s", data)
elif msg_type == "UtteranceEnd":
logger.debug("Deepgram utterance end")
else:
logger.debug("Unhandled Deepgram message type: %s", msg_type)
# ------------------------------------------------------------------ #
# Helpers
# ------------------------------------------------------------------ #
async def _close_ws(self):
"""Close the WebSocket connection if open."""
if self._ws is not None:
try:
await self._ws.close()
except Exception:
pass
self._ws = None
def set_user_name(self, user_name: str):
"""Update the user name attached to future transcriptions."""
self.user_name = user_name
def is_recording_active(self) -> bool:
"""Return ``True`` if audio is currently being captured."""
return self._is_recording
def __repr__(self) -> str:
return (
f"DeepgramTranscriptionEngine(mode={self.mode}, "
f"recording={self._is_recording})"
)
def __del__(self):
"""Best-effort cleanup."""
try:
self.stop()
except Exception:
pass