"""Deepgram-based transcription engine using WebSocket streaming. Supports two modes: - Managed mode: connects to a proxy server that handles Deepgram credentials - BYOK mode: connects directly to the Deepgram API with a user-provided key Implements the same duck-type interface as RealtimeTranscriptionEngine so MainWindow can use it as a drop-in replacement. """ import asyncio import json import logging import numpy as np import threading from datetime import datetime from queue import Queue, Empty from typing import Optional, Callable from client.models import TranscriptionResult logger = logging.getLogger(__name__) class DeepgramTranscriptionEngine: """ Transcription engine that streams audio to Deepgram via WebSocket. In managed mode the connection goes through a proxy at ``wss:///ws/transcribe`` which handles authentication and Deepgram credentials. In BYOK (bring-your-own-key) mode the connection goes directly to the Deepgram API. """ # ------------------------------------------------------------------ # # Construction / configuration # ------------------------------------------------------------------ # def __init__(self, config, input_device_index: Optional[int] = None): """ Initialise the engine from a :class:`client.config.Config` object. Args: config: Application ``Config`` instance. input_device_index: Index of the audio input device to use (``None`` for the system default). """ self.config = config self.input_device_index = input_device_index # Mode: 'managed' (proxy) or 'byok' (direct Deepgram) self.mode: str = config.get("remote.mode", "managed") # Managed-mode settings self.server_url: str = config.get("remote.server_url", "") self.auth_token: str = config.get("remote.auth_token", "") # BYOK-mode settings self.byok_api_key: str = config.get("remote.byok_api_key", "") # Deepgram model / language (used in both modes) self.deepgram_model: str = config.get("remote.deepgram_model", "nova-2") self.language: str = config.get("remote.language", "en-US") # Audio parameters self.sample_rate: int = 16000 self.channels: int = 1 self.blocksize: int = 1024 # ~64ms chunks for lower latency streaming # Callbacks self.realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None self.final_callback: Optional[Callable[[TranscriptionResult], None]] = None self._on_error: Optional[Callable[[str], None]] = None self._on_credits_low: Optional[Callable[[int], None]] = None # Internal state self._is_initialized: bool = False self._is_recording: bool = False self._stop_event: threading.Event = threading.Event() self._audio_queue: Queue = Queue() # Asyncio event loop running in a daemon thread self._loop: Optional[asyncio.AbstractEventLoop] = None self._thread: Optional[threading.Thread] = None # WebSocket handle (set inside the async context) self._ws = None # sounddevice InputStream self._stream = None # ------------------------------------------------------------------ # # Callback setters # ------------------------------------------------------------------ # def set_callbacks( self, realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None, final_callback: Optional[Callable[[TranscriptionResult], None]] = None, ): """Set transcription result callbacks (matches RealtimeTranscriptionEngine API).""" self.realtime_callback = realtime_callback self.final_callback = final_callback def set_error_callback(self, fn: Optional[Callable[[str], None]]): """Set a callback invoked on errors. ``fn`` receives a string message.""" self._on_error = fn def set_credits_low_callback(self, fn: Optional[Callable[[int], None]]): """Set a callback for low-credit warnings. ``fn`` receives seconds remaining.""" self._on_credits_low = fn # ------------------------------------------------------------------ # # Public interface (duck-typed with RealtimeTranscriptionEngine) # ------------------------------------------------------------------ # def initialize(self) -> bool: """Validate configuration and mark the engine as ready. Returns ``True`` when the engine is ready to start recording. """ if self._is_initialized: return True if self.mode == "managed": if not self.server_url: logger.error("Managed mode requires a server URL (remote.server_url)") return False if not self.auth_token: logger.error("Managed mode requires an auth token (remote.auth_token)") return False elif self.mode == "byok": if not self.byok_api_key: logger.error("BYOK mode requires an API key (remote.byok_api_key)") return False else: logger.error("Unknown remote mode: %s (expected 'managed' or 'byok')", self.mode) return False self._is_initialized = True logger.info("DeepgramTranscriptionEngine initialised in %s mode", self.mode) return True def start_recording(self) -> bool: """Open the audio stream and connect the WebSocket. Returns ``True`` on success. """ if not self._is_initialized: logger.error("Engine not initialised -- call initialize() first") return False if self._is_recording: return True self._stop_event.clear() self._ws_connected = threading.Event() self._is_recording = True # Start the asyncio event-loop thread (handles WS send/receive) self._thread = threading.Thread(target=self._run_event_loop, daemon=True) self._thread.start() # Wait for the WebSocket to connect before starting audio capture. # Without this, audio chunks arrive before the WS is open -> broken pipe. if not self._ws_connected.wait(timeout=15): logger.error("Timed out waiting for Deepgram WebSocket connection") print("ERROR: Timed out waiting for Deepgram WebSocket connection") self._last_error = "Timed out connecting to Deepgram" self._is_recording = False self._stop_event.set() return False # Start the audio capture stream try: self._start_audio_stream() except Exception as exc: logger.error("Failed to open audio stream: %s", exc) print(f"ERROR: Failed to open audio stream: {exc}") self._last_error = f"Audio stream error: {exc}" self._is_recording = False self._stop_event.set() return False logger.info("Recording started") return True def stop_recording(self): """Stop audio capture and close the WebSocket.""" if not self._is_recording: return self._is_recording = False self._stop_event.set() # Stop audio stream self._stop_audio_stream() # Close WebSocket from outside the event-loop thread if self._ws is not None and self._loop is not None and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(self._close_ws(), self._loop) # Wait for the thread to finish if self._thread is not None: self._thread.join(timeout=5) self._thread = None logger.info("Recording stopped") def stop(self): """Full shutdown -- stop recording and release all resources.""" self.stop_recording() self._is_initialized = False logger.info("DeepgramTranscriptionEngine shut down") def is_ready(self) -> bool: """Return ``True`` if the engine has been successfully initialised.""" return self._is_initialized # ------------------------------------------------------------------ # # Audio capture (sounddevice) # ------------------------------------------------------------------ # def _start_audio_stream(self): """Open a ``sounddevice.InputStream`` that feeds the audio queue.""" import sounddevice as sd def _audio_callback(indata, frames, time_info, status): # noqa: ARG001 if status: logger.warning("Audio stream status: %s", status) if self._is_recording: # float32 -> int16 PCM bytes pcm = (indata * 32767).astype(np.int16).tobytes() self._audio_queue.put(pcm) self._stream = sd.InputStream( samplerate=self.sample_rate, blocksize=self.blocksize, channels=self.channels, dtype="float32", device=self.input_device_index, callback=_audio_callback, ) self._stream.start() def _stop_audio_stream(self): """Close the audio input stream.""" if self._stream is not None: try: self._stream.stop() self._stream.close() except Exception as exc: logger.debug("Error closing audio stream: %s", exc) finally: self._stream = None # ------------------------------------------------------------------ # # Asyncio event-loop (runs in daemon thread) # ------------------------------------------------------------------ # def _run_event_loop(self): """Entry point for the daemon thread -- runs the async event loop.""" self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) try: self._loop.run_until_complete(self._ws_lifecycle()) except Exception as exc: logger.error("Event-loop error: %s", exc) finally: try: self._loop.run_until_complete(self._loop.shutdown_asyncgens()) except Exception: pass self._loop.close() self._loop = None async def _ws_lifecycle(self): """Connect, authenticate (if managed), then run send/receive loops.""" import websockets try: ws_url, extra_headers = self._build_ws_url_and_headers() logger.info("Connecting to %s", ws_url) self._ws = await websockets.connect( ws_url, additional_headers=extra_headers, ping_interval=20, ping_timeout=10, ) # Managed mode: send auth message and wait for ready if self.mode == "managed": if not await self._managed_handshake(): return # Signal that the WebSocket is connected and ready logger.info("WebSocket connected to Deepgram") if hasattr(self, '_ws_connected'): self._ws_connected.set() # Run send and receive concurrently await asyncio.gather( self._send_loop(), self._receive_loop(), ) except asyncio.CancelledError: pass except Exception as exc: msg = f"WebSocket error: {exc}" logger.error(msg) if self._on_error: self._on_error(msg) finally: await self._close_ws() def _build_ws_url_and_headers(self): """Return ``(url, headers)`` depending on the current mode.""" if self.mode == "managed": # Convert HTTP(S) URLs to WS(S) for WebSocket connection url = self.server_url.rstrip("/") if url.startswith("https://"): url = "wss://" + url[len("https://"):] elif url.startswith("http://"): url = "ws://" + url[len("http://"):] elif not url.startswith("ws://") and not url.startswith("wss://"): url = f"wss://{url}" url = f"{url}/ws/transcribe" return url, {} # BYOK -- connect directly to Deepgram params = ( f"model={self.deepgram_model}" f"&language={self.language}" "&interim_results=true" "&punctuate=true" "&smart_format=true" "&encoding=linear16" f"&sample_rate={self.sample_rate}" f"&channels={self.channels}" ) url = f"wss://api.deepgram.com/v1/listen?{params}" headers = {"Authorization": f"Token {self.byok_api_key}"} return url, headers # -- managed-mode handshake ---------------------------------------- # async def _managed_handshake(self) -> bool: """Send auth message and wait for ``ready`` (managed mode). Returns ``True`` on success. """ auth_msg = { "type": "auth", "token": self.auth_token, "config": { "model": self.deepgram_model, "language": self.language, "sample_rate": self.sample_rate, "channels": self.channels, "encoding": "linear16", "interim_results": True, }, } await self._ws.send(json.dumps(auth_msg)) try: raw = await asyncio.wait_for(self._ws.recv(), timeout=15) data = json.loads(raw) if data.get("type") == "ready": logger.info("Managed proxy is ready") return True if data.get("type") == "error": err = data.get("message", "unknown error") logger.error("Auth error from proxy: %s", err) if self._on_error: self._on_error(f"Proxy auth error: {err}") return False logger.warning("Unexpected handshake message: %s", data) return False except asyncio.TimeoutError: logger.error("Timed out waiting for proxy ready message") if self._on_error: self._on_error("Timed out waiting for proxy ready message") return False # -- send loop ----------------------------------------------------- # async def _send_loop(self): """Drain the audio queue and push raw PCM bytes over the WebSocket.""" loop = asyncio.get_event_loop() while not self._stop_event.is_set(): try: # Use run_in_executor to avoid blocking the async event loop # (which would stall the receive loop and delay transcriptions) pcm_bytes = await asyncio.wait_for( loop.run_in_executor(None, lambda: self._audio_queue.get(timeout=0.5)), timeout=1.0, ) except (Empty, asyncio.TimeoutError): continue try: await self._ws.send(pcm_bytes) except Exception as exc: if not self._stop_event.is_set(): logger.error("Send error: %s", exc) break # -- receive loop -------------------------------------------------- # async def _receive_loop(self): """Listen for messages from the WebSocket and dispatch them.""" while not self._stop_event.is_set(): try: raw = await asyncio.wait_for(self._ws.recv(), timeout=1.0) except asyncio.TimeoutError: continue except Exception as exc: if not self._stop_event.is_set(): logger.error("Receive error: %s", exc) break try: data = json.loads(raw) except (json.JSONDecodeError, TypeError): logger.debug("Non-JSON message received, ignoring") continue if self.mode == "managed": self._handle_managed_message(data) else: self._handle_byok_message(data) # ------------------------------------------------------------------ # # Message handlers # ------------------------------------------------------------------ # def _handle_managed_message(self, data: dict): """Process a message from the managed proxy.""" msg_type = data.get("type", "") if msg_type == "transcript": text = data.get("text", "") is_final = data.get("is_final", False) if text.strip(): result = TranscriptionResult( text=text, is_final=is_final, timestamp=datetime.now(), user_name=self.config.get('user.name', 'User'), ) if is_final: if self.final_callback: self.final_callback(result) else: if self.realtime_callback: self.realtime_callback(result) elif msg_type == "credits_low": seconds_remaining = data.get("seconds_remaining", 0) logger.warning("Credits low -- %d seconds remaining", seconds_remaining) if self._on_credits_low: self._on_credits_low(int(seconds_remaining)) elif msg_type == "error": code = data.get("code", "") message = data.get("message", "Unknown error") logger.error("Proxy error [%s]: %s", code, message) if self._on_error: self._on_error(f"[{code}] {message}" if code else message) elif msg_type == "session_end": seconds_used = data.get("seconds_used", 0) logger.info("Session ended -- %d seconds used", seconds_used) elif msg_type == "ready": # May arrive again after reconnects; safe to ignore. logger.debug("Received ready message (already connected)") else: logger.debug("Unhandled managed message type: %s", msg_type) def _handle_byok_message(self, data: dict): """Process a message received directly from the Deepgram API.""" msg_type = data.get("type", "") if msg_type == "Results": channel = data.get("channel", {}) alternatives = channel.get("alternatives", []) if not alternatives: return transcript = alternatives[0].get("transcript", "") is_final = data.get("is_final", False) if transcript.strip(): result = TranscriptionResult( text=transcript, is_final=is_final, timestamp=datetime.now(), user_name=self.config.get('user.name', 'User'), ) if is_final: if self.final_callback: self.final_callback(result) else: if self.realtime_callback: self.realtime_callback(result) elif msg_type == "Metadata": logger.debug("Deepgram metadata: %s", data) elif msg_type == "UtteranceEnd": logger.debug("Deepgram utterance end") else: logger.debug("Unhandled Deepgram message type: %s", msg_type) # ------------------------------------------------------------------ # # Helpers # ------------------------------------------------------------------ # async def _close_ws(self): """Close the WebSocket connection if open.""" if self._ws is not None: try: await self._ws.close() except Exception: pass self._ws = None def is_recording_active(self) -> bool: """Return ``True`` if audio is currently being captured.""" return self._is_recording def __repr__(self) -> str: return ( f"DeepgramTranscriptionEngine(mode={self.mode}, " f"recording={self._is_recording})" ) def __del__(self): """Best-effort cleanup.""" try: self.stop() except Exception: pass