Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files: - client/deepgram_transcription.py — DeepgramTranscriptionEngine with managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary PCM audio over WebSocket, handles both proxy and Deepgram response formats. Modified files: - config/default_config.yaml — Replace remote_processing with new remote section (mode, server_url, auth_token, byok_api_key, deepgram_model, language) - client/config.py — Add migration from old remote_processing config - gui/settings_dialog_qt.py — Replace Remote Processing group with Transcription Mode section (Local/Managed/BYOK radio buttons, login/register dialogs, balance display, model selector) - gui/main_window_qt.py — Select engine based on remote.mode config, add error and credits_low handlers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
528
client/deepgram_transcription.py
Normal file
528
client/deepgram_transcription.py
Normal file
@@ -0,0 +1,528 @@
|
||||
"""Deepgram-based transcription engine using WebSocket streaming.
|
||||
|
||||
Supports two modes:
|
||||
- Managed mode: connects to a proxy server that handles Deepgram credentials
|
||||
- BYOK mode: connects directly to the Deepgram API with a user-provided key
|
||||
|
||||
Implements the same duck-type interface as RealtimeTranscriptionEngine so
|
||||
MainWindow can use it as a drop-in replacement.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
import threading
|
||||
from datetime import datetime
|
||||
from queue import Queue, Empty
|
||||
from typing import Optional, Callable
|
||||
|
||||
from client.transcription_engine_realtime import TranscriptionResult
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DeepgramTranscriptionEngine:
|
||||
"""
|
||||
Transcription engine that streams audio to Deepgram via WebSocket.
|
||||
|
||||
In managed mode the connection goes through a proxy at
|
||||
``wss://<server>/ws/transcribe`` which handles authentication and
|
||||
Deepgram credentials. In BYOK (bring-your-own-key) mode the
|
||||
connection goes directly to the Deepgram API.
|
||||
"""
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Construction / configuration
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def __init__(self, config, user_name: str = "User", input_device_index: Optional[int] = None):
|
||||
"""
|
||||
Initialise the engine from a :class:`client.config.Config` object.
|
||||
|
||||
Args:
|
||||
config: Application ``Config`` instance.
|
||||
user_name: Display name attached to transcriptions.
|
||||
input_device_index: Index of the audio input device to use
|
||||
(``None`` for the system default).
|
||||
"""
|
||||
self.config = config
|
||||
self.user_name = user_name
|
||||
self.input_device_index = input_device_index
|
||||
|
||||
# Mode: 'managed' (proxy) or 'byok' (direct Deepgram)
|
||||
self.mode: str = config.get("remote.mode", "managed")
|
||||
|
||||
# Managed-mode settings
|
||||
self.server_url: str = config.get("remote.server_url", "")
|
||||
self.auth_token: str = config.get("remote.auth_token", "")
|
||||
|
||||
# BYOK-mode settings
|
||||
self.byok_api_key: str = config.get("remote.byok_api_key", "")
|
||||
|
||||
# Deepgram model / language (used in both modes)
|
||||
self.deepgram_model: str = config.get("remote.deepgram_model", "nova-2")
|
||||
self.language: str = config.get("remote.language", "en-US")
|
||||
|
||||
# Audio parameters
|
||||
self.sample_rate: int = 16000
|
||||
self.channels: int = 1
|
||||
self.blocksize: int = 4096
|
||||
|
||||
# Callbacks
|
||||
self.realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
||||
self.final_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
||||
self._on_error: Optional[Callable[[str], None]] = None
|
||||
self._on_credits_low: Optional[Callable[[int], None]] = None
|
||||
|
||||
# Internal state
|
||||
self._is_initialized: bool = False
|
||||
self._is_recording: bool = False
|
||||
self._stop_event: threading.Event = threading.Event()
|
||||
self._audio_queue: Queue = Queue()
|
||||
|
||||
# Asyncio event loop running in a daemon thread
|
||||
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
||||
self._thread: Optional[threading.Thread] = None
|
||||
|
||||
# WebSocket handle (set inside the async context)
|
||||
self._ws = None
|
||||
|
||||
# sounddevice InputStream
|
||||
self._stream = None
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Callback setters
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def set_callbacks(
|
||||
self,
|
||||
realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None,
|
||||
final_callback: Optional[Callable[[TranscriptionResult], None]] = None,
|
||||
):
|
||||
"""Set transcription result callbacks (matches RealtimeTranscriptionEngine API)."""
|
||||
self.realtime_callback = realtime_callback
|
||||
self.final_callback = final_callback
|
||||
|
||||
def set_error_callback(self, fn: Optional[Callable[[str], None]]):
|
||||
"""Set a callback invoked on errors. ``fn`` receives a string message."""
|
||||
self._on_error = fn
|
||||
|
||||
def set_credits_low_callback(self, fn: Optional[Callable[[int], None]]):
|
||||
"""Set a callback for low-credit warnings. ``fn`` receives seconds remaining."""
|
||||
self._on_credits_low = fn
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Public interface (duck-typed with RealtimeTranscriptionEngine)
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def initialize(self) -> bool:
|
||||
"""Validate configuration and mark the engine as ready.
|
||||
|
||||
Returns ``True`` when the engine is ready to start recording.
|
||||
"""
|
||||
if self._is_initialized:
|
||||
return True
|
||||
|
||||
if self.mode == "managed":
|
||||
if not self.server_url:
|
||||
logger.error("Managed mode requires a server URL (remote.server_url)")
|
||||
return False
|
||||
if not self.auth_token:
|
||||
logger.error("Managed mode requires an auth token (remote.auth_token)")
|
||||
return False
|
||||
elif self.mode == "byok":
|
||||
if not self.byok_api_key:
|
||||
logger.error("BYOK mode requires an API key (remote.byok_api_key)")
|
||||
return False
|
||||
else:
|
||||
logger.error("Unknown remote mode: %s (expected 'managed' or 'byok')", self.mode)
|
||||
return False
|
||||
|
||||
self._is_initialized = True
|
||||
logger.info("DeepgramTranscriptionEngine initialised in %s mode", self.mode)
|
||||
return True
|
||||
|
||||
def start_recording(self) -> bool:
|
||||
"""Open the audio stream and connect the WebSocket.
|
||||
|
||||
Returns ``True`` on success.
|
||||
"""
|
||||
if not self._is_initialized:
|
||||
logger.error("Engine not initialised -- call initialize() first")
|
||||
return False
|
||||
|
||||
if self._is_recording:
|
||||
return True
|
||||
|
||||
self._stop_event.clear()
|
||||
self._is_recording = True
|
||||
|
||||
# Start the asyncio event-loop thread (handles WS send/receive)
|
||||
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
|
||||
self._thread.start()
|
||||
|
||||
# Start the audio capture stream
|
||||
try:
|
||||
self._start_audio_stream()
|
||||
except Exception as exc:
|
||||
logger.error("Failed to open audio stream: %s", exc)
|
||||
self._is_recording = False
|
||||
self._stop_event.set()
|
||||
return False
|
||||
|
||||
logger.info("Recording started")
|
||||
return True
|
||||
|
||||
def stop_recording(self):
|
||||
"""Stop audio capture and close the WebSocket."""
|
||||
if not self._is_recording:
|
||||
return
|
||||
|
||||
self._is_recording = False
|
||||
self._stop_event.set()
|
||||
|
||||
# Stop audio stream
|
||||
self._stop_audio_stream()
|
||||
|
||||
# Close WebSocket from outside the event-loop thread
|
||||
if self._ws is not None and self._loop is not None and not self._loop.is_closed():
|
||||
asyncio.run_coroutine_threadsafe(self._close_ws(), self._loop)
|
||||
|
||||
# Wait for the thread to finish
|
||||
if self._thread is not None:
|
||||
self._thread.join(timeout=5)
|
||||
self._thread = None
|
||||
|
||||
logger.info("Recording stopped")
|
||||
|
||||
def stop(self):
|
||||
"""Full shutdown -- stop recording and release all resources."""
|
||||
self.stop_recording()
|
||||
self._is_initialized = False
|
||||
logger.info("DeepgramTranscriptionEngine shut down")
|
||||
|
||||
def is_ready(self) -> bool:
|
||||
"""Return ``True`` if the engine has been successfully initialised."""
|
||||
return self._is_initialized
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Audio capture (sounddevice)
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _start_audio_stream(self):
|
||||
"""Open a ``sounddevice.InputStream`` that feeds the audio queue."""
|
||||
import sounddevice as sd
|
||||
|
||||
def _audio_callback(indata, frames, time_info, status): # noqa: ARG001
|
||||
if status:
|
||||
logger.warning("Audio stream status: %s", status)
|
||||
if self._is_recording:
|
||||
# float32 -> int16 PCM bytes
|
||||
pcm = (indata * 32767).astype(np.int16).tobytes()
|
||||
self._audio_queue.put(pcm)
|
||||
|
||||
self._stream = sd.InputStream(
|
||||
samplerate=self.sample_rate,
|
||||
blocksize=self.blocksize,
|
||||
channels=self.channels,
|
||||
dtype="float32",
|
||||
device=self.input_device_index,
|
||||
callback=_audio_callback,
|
||||
)
|
||||
self._stream.start()
|
||||
|
||||
def _stop_audio_stream(self):
|
||||
"""Close the audio input stream."""
|
||||
if self._stream is not None:
|
||||
try:
|
||||
self._stream.stop()
|
||||
self._stream.close()
|
||||
except Exception as exc:
|
||||
logger.debug("Error closing audio stream: %s", exc)
|
||||
finally:
|
||||
self._stream = None
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Asyncio event-loop (runs in daemon thread)
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _run_event_loop(self):
|
||||
"""Entry point for the daemon thread -- runs the async event loop."""
|
||||
self._loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self._loop)
|
||||
try:
|
||||
self._loop.run_until_complete(self._ws_lifecycle())
|
||||
except Exception as exc:
|
||||
logger.error("Event-loop error: %s", exc)
|
||||
finally:
|
||||
try:
|
||||
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
|
||||
except Exception:
|
||||
pass
|
||||
self._loop.close()
|
||||
self._loop = None
|
||||
|
||||
async def _ws_lifecycle(self):
|
||||
"""Connect, authenticate (if managed), then run send/receive loops."""
|
||||
import websockets
|
||||
|
||||
try:
|
||||
ws_url, extra_headers = self._build_ws_url_and_headers()
|
||||
|
||||
logger.info("Connecting to %s", ws_url)
|
||||
self._ws = await websockets.connect(
|
||||
ws_url,
|
||||
additional_headers=extra_headers,
|
||||
ping_interval=20,
|
||||
ping_timeout=10,
|
||||
)
|
||||
|
||||
# Managed mode: send auth message and wait for ready
|
||||
if self.mode == "managed":
|
||||
if not await self._managed_handshake():
|
||||
return
|
||||
|
||||
# Run send and receive concurrently
|
||||
await asyncio.gather(
|
||||
self._send_loop(),
|
||||
self._receive_loop(),
|
||||
)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
except Exception as exc:
|
||||
msg = f"WebSocket error: {exc}"
|
||||
logger.error(msg)
|
||||
if self._on_error:
|
||||
self._on_error(msg)
|
||||
finally:
|
||||
await self._close_ws()
|
||||
|
||||
def _build_ws_url_and_headers(self):
|
||||
"""Return ``(url, headers)`` depending on the current mode."""
|
||||
if self.mode == "managed":
|
||||
# Ensure the server URL uses wss:// and append the path
|
||||
url = self.server_url.rstrip("/")
|
||||
if not url.startswith("ws://") and not url.startswith("wss://"):
|
||||
url = f"wss://{url}"
|
||||
url = f"{url}/ws/transcribe"
|
||||
return url, {}
|
||||
|
||||
# BYOK -- connect directly to Deepgram
|
||||
params = (
|
||||
f"model={self.deepgram_model}"
|
||||
f"&language={self.language}"
|
||||
"&interim_results=true"
|
||||
"&encoding=linear16"
|
||||
f"&sample_rate={self.sample_rate}"
|
||||
f"&channels={self.channels}"
|
||||
)
|
||||
url = f"wss://api.deepgram.com/v1/listen?{params}"
|
||||
headers = {"Authorization": f"Token {self.byok_api_key}"}
|
||||
return url, headers
|
||||
|
||||
# -- managed-mode handshake ---------------------------------------- #
|
||||
|
||||
async def _managed_handshake(self) -> bool:
|
||||
"""Send auth message and wait for ``ready`` (managed mode).
|
||||
|
||||
Returns ``True`` on success.
|
||||
"""
|
||||
auth_msg = {
|
||||
"type": "auth",
|
||||
"token": self.auth_token,
|
||||
"config": {
|
||||
"model": self.deepgram_model,
|
||||
"language": self.language,
|
||||
"sample_rate": self.sample_rate,
|
||||
"channels": self.channels,
|
||||
"encoding": "linear16",
|
||||
"interim_results": True,
|
||||
},
|
||||
}
|
||||
await self._ws.send(json.dumps(auth_msg))
|
||||
|
||||
try:
|
||||
raw = await asyncio.wait_for(self._ws.recv(), timeout=15)
|
||||
data = json.loads(raw)
|
||||
if data.get("type") == "ready":
|
||||
logger.info("Managed proxy is ready")
|
||||
return True
|
||||
|
||||
if data.get("type") == "error":
|
||||
err = data.get("message", "unknown error")
|
||||
logger.error("Auth error from proxy: %s", err)
|
||||
if self._on_error:
|
||||
self._on_error(f"Proxy auth error: {err}")
|
||||
return False
|
||||
|
||||
logger.warning("Unexpected handshake message: %s", data)
|
||||
return False
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Timed out waiting for proxy ready message")
|
||||
if self._on_error:
|
||||
self._on_error("Timed out waiting for proxy ready message")
|
||||
return False
|
||||
|
||||
# -- send loop ----------------------------------------------------- #
|
||||
|
||||
async def _send_loop(self):
|
||||
"""Drain the audio queue and push raw PCM bytes over the WebSocket."""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
pcm_bytes = self._audio_queue.get(timeout=0.1)
|
||||
except Empty:
|
||||
continue
|
||||
|
||||
try:
|
||||
await self._ws.send(pcm_bytes)
|
||||
except Exception as exc:
|
||||
if not self._stop_event.is_set():
|
||||
logger.error("Send error: %s", exc)
|
||||
break
|
||||
|
||||
# -- receive loop -------------------------------------------------- #
|
||||
|
||||
async def _receive_loop(self):
|
||||
"""Listen for messages from the WebSocket and dispatch them."""
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
raw = await asyncio.wait_for(self._ws.recv(), timeout=1.0)
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
except Exception as exc:
|
||||
if not self._stop_event.is_set():
|
||||
logger.error("Receive error: %s", exc)
|
||||
break
|
||||
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
logger.debug("Non-JSON message received, ignoring")
|
||||
continue
|
||||
|
||||
if self.mode == "managed":
|
||||
self._handle_managed_message(data)
|
||||
else:
|
||||
self._handle_byok_message(data)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Message handlers
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
def _handle_managed_message(self, data: dict):
|
||||
"""Process a message from the managed proxy."""
|
||||
msg_type = data.get("type", "")
|
||||
|
||||
if msg_type == "transcript":
|
||||
text = data.get("text", "")
|
||||
is_final = data.get("is_final", False)
|
||||
if text.strip():
|
||||
result = TranscriptionResult(
|
||||
text=text,
|
||||
is_final=is_final,
|
||||
timestamp=datetime.now(),
|
||||
user_name=self.user_name,
|
||||
)
|
||||
if is_final:
|
||||
if self.final_callback:
|
||||
self.final_callback(result)
|
||||
else:
|
||||
if self.realtime_callback:
|
||||
self.realtime_callback(result)
|
||||
|
||||
elif msg_type == "credits_low":
|
||||
seconds_remaining = data.get("seconds_remaining", 0)
|
||||
logger.warning("Credits low -- %d seconds remaining", seconds_remaining)
|
||||
if self._on_credits_low:
|
||||
self._on_credits_low(int(seconds_remaining))
|
||||
|
||||
elif msg_type == "error":
|
||||
code = data.get("code", "")
|
||||
message = data.get("message", "Unknown error")
|
||||
logger.error("Proxy error [%s]: %s", code, message)
|
||||
if self._on_error:
|
||||
self._on_error(f"[{code}] {message}" if code else message)
|
||||
|
||||
elif msg_type == "session_end":
|
||||
seconds_used = data.get("seconds_used", 0)
|
||||
logger.info("Session ended -- %d seconds used", seconds_used)
|
||||
|
||||
elif msg_type == "ready":
|
||||
# May arrive again after reconnects; safe to ignore.
|
||||
logger.debug("Received ready message (already connected)")
|
||||
|
||||
else:
|
||||
logger.debug("Unhandled managed message type: %s", msg_type)
|
||||
|
||||
def _handle_byok_message(self, data: dict):
|
||||
"""Process a message received directly from the Deepgram API."""
|
||||
msg_type = data.get("type", "")
|
||||
|
||||
if msg_type == "Results":
|
||||
channel = data.get("channel", {})
|
||||
alternatives = channel.get("alternatives", [])
|
||||
if not alternatives:
|
||||
return
|
||||
|
||||
transcript = alternatives[0].get("transcript", "")
|
||||
is_final = data.get("is_final", False)
|
||||
|
||||
if transcript.strip():
|
||||
result = TranscriptionResult(
|
||||
text=transcript,
|
||||
is_final=is_final,
|
||||
timestamp=datetime.now(),
|
||||
user_name=self.user_name,
|
||||
)
|
||||
if is_final:
|
||||
if self.final_callback:
|
||||
self.final_callback(result)
|
||||
else:
|
||||
if self.realtime_callback:
|
||||
self.realtime_callback(result)
|
||||
|
||||
elif msg_type == "Metadata":
|
||||
logger.debug("Deepgram metadata: %s", data)
|
||||
|
||||
elif msg_type == "UtteranceEnd":
|
||||
logger.debug("Deepgram utterance end")
|
||||
|
||||
else:
|
||||
logger.debug("Unhandled Deepgram message type: %s", msg_type)
|
||||
|
||||
# ------------------------------------------------------------------ #
|
||||
# Helpers
|
||||
# ------------------------------------------------------------------ #
|
||||
|
||||
async def _close_ws(self):
|
||||
"""Close the WebSocket connection if open."""
|
||||
if self._ws is not None:
|
||||
try:
|
||||
await self._ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._ws = None
|
||||
|
||||
def set_user_name(self, user_name: str):
|
||||
"""Update the user name attached to future transcriptions."""
|
||||
self.user_name = user_name
|
||||
|
||||
def is_recording_active(self) -> bool:
|
||||
"""Return ``True`` if audio is currently being captured."""
|
||||
return self._is_recording
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
f"DeepgramTranscriptionEngine(mode={self.mode}, "
|
||||
f"recording={self._is_recording})"
|
||||
)
|
||||
|
||||
def __del__(self):
|
||||
"""Best-effort cleanup."""
|
||||
try:
|
||||
self.stop()
|
||||
except Exception:
|
||||
pass
|
||||
Reference in New Issue
Block a user