Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
"""Deepgram-based transcription engine using WebSocket streaming.
|
|
|
|
|
|
|
|
|
|
Supports two modes:
|
|
|
|
|
- Managed mode: connects to a proxy server that handles Deepgram credentials
|
|
|
|
|
- BYOK mode: connects directly to the Deepgram API with a user-provided key
|
|
|
|
|
|
|
|
|
|
Implements the same duck-type interface as RealtimeTranscriptionEngine so
|
|
|
|
|
MainWindow can use it as a drop-in replacement.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
import numpy as np
|
|
|
|
|
import threading
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from queue import Queue, Empty
|
|
|
|
|
from typing import Optional, Callable
|
|
|
|
|
|
2026-04-07 16:57:43 -07:00
|
|
|
from client.models import TranscriptionResult
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeepgramTranscriptionEngine:
|
|
|
|
|
"""
|
|
|
|
|
Transcription engine that streams audio to Deepgram via WebSocket.
|
|
|
|
|
|
|
|
|
|
In managed mode the connection goes through a proxy at
|
|
|
|
|
``wss://<server>/ws/transcribe`` which handles authentication and
|
|
|
|
|
Deepgram credentials. In BYOK (bring-your-own-key) mode the
|
|
|
|
|
connection goes directly to the Deepgram API.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Construction / configuration
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
def __init__(self, config, user_name: str = "User", input_device_index: Optional[int] = None):
|
|
|
|
|
"""
|
|
|
|
|
Initialise the engine from a :class:`client.config.Config` object.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
config: Application ``Config`` instance.
|
|
|
|
|
user_name: Display name attached to transcriptions.
|
|
|
|
|
input_device_index: Index of the audio input device to use
|
|
|
|
|
(``None`` for the system default).
|
|
|
|
|
"""
|
|
|
|
|
self.config = config
|
|
|
|
|
self.user_name = user_name
|
|
|
|
|
self.input_device_index = input_device_index
|
|
|
|
|
|
|
|
|
|
# Mode: 'managed' (proxy) or 'byok' (direct Deepgram)
|
|
|
|
|
self.mode: str = config.get("remote.mode", "managed")
|
|
|
|
|
|
|
|
|
|
# Managed-mode settings
|
|
|
|
|
self.server_url: str = config.get("remote.server_url", "")
|
|
|
|
|
self.auth_token: str = config.get("remote.auth_token", "")
|
|
|
|
|
|
|
|
|
|
# BYOK-mode settings
|
|
|
|
|
self.byok_api_key: str = config.get("remote.byok_api_key", "")
|
|
|
|
|
|
|
|
|
|
# Deepgram model / language (used in both modes)
|
|
|
|
|
self.deepgram_model: str = config.get("remote.deepgram_model", "nova-2")
|
|
|
|
|
self.language: str = config.get("remote.language", "en-US")
|
|
|
|
|
|
|
|
|
|
# Audio parameters
|
|
|
|
|
self.sample_rate: int = 16000
|
|
|
|
|
self.channels: int = 1
|
2026-04-07 16:31:50 -07:00
|
|
|
self.blocksize: int = 1024 # ~64ms chunks for lower latency streaming
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
|
|
|
|
|
# Callbacks
|
|
|
|
|
self.realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
|
|
|
|
self.final_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
|
|
|
|
self._on_error: Optional[Callable[[str], None]] = None
|
|
|
|
|
self._on_credits_low: Optional[Callable[[int], None]] = None
|
|
|
|
|
|
|
|
|
|
# Internal state
|
|
|
|
|
self._is_initialized: bool = False
|
|
|
|
|
self._is_recording: bool = False
|
|
|
|
|
self._stop_event: threading.Event = threading.Event()
|
|
|
|
|
self._audio_queue: Queue = Queue()
|
|
|
|
|
|
|
|
|
|
# Asyncio event loop running in a daemon thread
|
|
|
|
|
self._loop: Optional[asyncio.AbstractEventLoop] = None
|
|
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
|
|
|
|
|
|
|
# WebSocket handle (set inside the async context)
|
|
|
|
|
self._ws = None
|
|
|
|
|
|
|
|
|
|
# sounddevice InputStream
|
|
|
|
|
self._stream = None
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Callback setters
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
def set_callbacks(
|
|
|
|
|
self,
|
|
|
|
|
realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None,
|
|
|
|
|
final_callback: Optional[Callable[[TranscriptionResult], None]] = None,
|
|
|
|
|
):
|
|
|
|
|
"""Set transcription result callbacks (matches RealtimeTranscriptionEngine API)."""
|
|
|
|
|
self.realtime_callback = realtime_callback
|
|
|
|
|
self.final_callback = final_callback
|
|
|
|
|
|
|
|
|
|
def set_error_callback(self, fn: Optional[Callable[[str], None]]):
|
|
|
|
|
"""Set a callback invoked on errors. ``fn`` receives a string message."""
|
|
|
|
|
self._on_error = fn
|
|
|
|
|
|
|
|
|
|
def set_credits_low_callback(self, fn: Optional[Callable[[int], None]]):
|
|
|
|
|
"""Set a callback for low-credit warnings. ``fn`` receives seconds remaining."""
|
|
|
|
|
self._on_credits_low = fn
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Public interface (duck-typed with RealtimeTranscriptionEngine)
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
def initialize(self) -> bool:
|
|
|
|
|
"""Validate configuration and mark the engine as ready.
|
|
|
|
|
|
|
|
|
|
Returns ``True`` when the engine is ready to start recording.
|
|
|
|
|
"""
|
|
|
|
|
if self._is_initialized:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if self.mode == "managed":
|
|
|
|
|
if not self.server_url:
|
|
|
|
|
logger.error("Managed mode requires a server URL (remote.server_url)")
|
|
|
|
|
return False
|
|
|
|
|
if not self.auth_token:
|
|
|
|
|
logger.error("Managed mode requires an auth token (remote.auth_token)")
|
|
|
|
|
return False
|
|
|
|
|
elif self.mode == "byok":
|
|
|
|
|
if not self.byok_api_key:
|
|
|
|
|
logger.error("BYOK mode requires an API key (remote.byok_api_key)")
|
|
|
|
|
return False
|
|
|
|
|
else:
|
|
|
|
|
logger.error("Unknown remote mode: %s (expected 'managed' or 'byok')", self.mode)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
self._is_initialized = True
|
|
|
|
|
logger.info("DeepgramTranscriptionEngine initialised in %s mode", self.mode)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def start_recording(self) -> bool:
|
|
|
|
|
"""Open the audio stream and connect the WebSocket.
|
|
|
|
|
|
|
|
|
|
Returns ``True`` on success.
|
|
|
|
|
"""
|
|
|
|
|
if not self._is_initialized:
|
|
|
|
|
logger.error("Engine not initialised -- call initialize() first")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if self._is_recording:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
self._stop_event.clear()
|
2026-04-08 12:18:47 -07:00
|
|
|
self._ws_connected = threading.Event()
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
self._is_recording = True
|
|
|
|
|
|
|
|
|
|
# Start the asyncio event-loop thread (handles WS send/receive)
|
|
|
|
|
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
|
|
|
|
|
self._thread.start()
|
|
|
|
|
|
2026-04-08 12:18:47 -07:00
|
|
|
# Wait for the WebSocket to connect before starting audio capture.
|
|
|
|
|
# Without this, audio chunks arrive before the WS is open -> broken pipe.
|
|
|
|
|
if not self._ws_connected.wait(timeout=15):
|
|
|
|
|
logger.error("Timed out waiting for Deepgram WebSocket connection")
|
|
|
|
|
print("ERROR: Timed out waiting for Deepgram WebSocket connection")
|
|
|
|
|
self._last_error = "Timed out connecting to Deepgram"
|
|
|
|
|
self._is_recording = False
|
|
|
|
|
self._stop_event.set()
|
|
|
|
|
return False
|
|
|
|
|
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
# Start the audio capture stream
|
|
|
|
|
try:
|
|
|
|
|
self._start_audio_stream()
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.error("Failed to open audio stream: %s", exc)
|
2026-04-08 12:15:43 -07:00
|
|
|
print(f"ERROR: Failed to open audio stream: {exc}")
|
|
|
|
|
self._last_error = f"Audio stream error: {exc}"
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
self._is_recording = False
|
|
|
|
|
self._stop_event.set()
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
logger.info("Recording started")
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def stop_recording(self):
|
|
|
|
|
"""Stop audio capture and close the WebSocket."""
|
|
|
|
|
if not self._is_recording:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
self._is_recording = False
|
|
|
|
|
self._stop_event.set()
|
|
|
|
|
|
|
|
|
|
# Stop audio stream
|
|
|
|
|
self._stop_audio_stream()
|
|
|
|
|
|
|
|
|
|
# Close WebSocket from outside the event-loop thread
|
|
|
|
|
if self._ws is not None and self._loop is not None and not self._loop.is_closed():
|
|
|
|
|
asyncio.run_coroutine_threadsafe(self._close_ws(), self._loop)
|
|
|
|
|
|
|
|
|
|
# Wait for the thread to finish
|
|
|
|
|
if self._thread is not None:
|
|
|
|
|
self._thread.join(timeout=5)
|
|
|
|
|
self._thread = None
|
|
|
|
|
|
|
|
|
|
logger.info("Recording stopped")
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
"""Full shutdown -- stop recording and release all resources."""
|
|
|
|
|
self.stop_recording()
|
|
|
|
|
self._is_initialized = False
|
|
|
|
|
logger.info("DeepgramTranscriptionEngine shut down")
|
|
|
|
|
|
|
|
|
|
def is_ready(self) -> bool:
|
|
|
|
|
"""Return ``True`` if the engine has been successfully initialised."""
|
|
|
|
|
return self._is_initialized
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Audio capture (sounddevice)
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
def _start_audio_stream(self):
|
|
|
|
|
"""Open a ``sounddevice.InputStream`` that feeds the audio queue."""
|
|
|
|
|
import sounddevice as sd
|
|
|
|
|
|
|
|
|
|
def _audio_callback(indata, frames, time_info, status): # noqa: ARG001
|
|
|
|
|
if status:
|
|
|
|
|
logger.warning("Audio stream status: %s", status)
|
|
|
|
|
if self._is_recording:
|
|
|
|
|
# float32 -> int16 PCM bytes
|
|
|
|
|
pcm = (indata * 32767).astype(np.int16).tobytes()
|
|
|
|
|
self._audio_queue.put(pcm)
|
|
|
|
|
|
|
|
|
|
self._stream = sd.InputStream(
|
|
|
|
|
samplerate=self.sample_rate,
|
|
|
|
|
blocksize=self.blocksize,
|
|
|
|
|
channels=self.channels,
|
|
|
|
|
dtype="float32",
|
|
|
|
|
device=self.input_device_index,
|
|
|
|
|
callback=_audio_callback,
|
|
|
|
|
)
|
|
|
|
|
self._stream.start()
|
|
|
|
|
|
|
|
|
|
def _stop_audio_stream(self):
|
|
|
|
|
"""Close the audio input stream."""
|
|
|
|
|
if self._stream is not None:
|
|
|
|
|
try:
|
|
|
|
|
self._stream.stop()
|
|
|
|
|
self._stream.close()
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.debug("Error closing audio stream: %s", exc)
|
|
|
|
|
finally:
|
|
|
|
|
self._stream = None
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Asyncio event-loop (runs in daemon thread)
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
def _run_event_loop(self):
|
|
|
|
|
"""Entry point for the daemon thread -- runs the async event loop."""
|
|
|
|
|
self._loop = asyncio.new_event_loop()
|
|
|
|
|
asyncio.set_event_loop(self._loop)
|
|
|
|
|
try:
|
|
|
|
|
self._loop.run_until_complete(self._ws_lifecycle())
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
logger.error("Event-loop error: %s", exc)
|
|
|
|
|
finally:
|
|
|
|
|
try:
|
|
|
|
|
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
self._loop.close()
|
|
|
|
|
self._loop = None
|
|
|
|
|
|
|
|
|
|
async def _ws_lifecycle(self):
|
|
|
|
|
"""Connect, authenticate (if managed), then run send/receive loops."""
|
|
|
|
|
import websockets
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
ws_url, extra_headers = self._build_ws_url_and_headers()
|
|
|
|
|
|
|
|
|
|
logger.info("Connecting to %s", ws_url)
|
|
|
|
|
self._ws = await websockets.connect(
|
|
|
|
|
ws_url,
|
|
|
|
|
additional_headers=extra_headers,
|
|
|
|
|
ping_interval=20,
|
|
|
|
|
ping_timeout=10,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Managed mode: send auth message and wait for ready
|
|
|
|
|
if self.mode == "managed":
|
|
|
|
|
if not await self._managed_handshake():
|
|
|
|
|
return
|
|
|
|
|
|
2026-04-08 12:18:47 -07:00
|
|
|
# Signal that the WebSocket is connected and ready
|
|
|
|
|
logger.info("WebSocket connected to Deepgram")
|
|
|
|
|
if hasattr(self, '_ws_connected'):
|
|
|
|
|
self._ws_connected.set()
|
|
|
|
|
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
# Run send and receive concurrently
|
|
|
|
|
await asyncio.gather(
|
|
|
|
|
self._send_loop(),
|
|
|
|
|
self._receive_loop(),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
except asyncio.CancelledError:
|
|
|
|
|
pass
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
msg = f"WebSocket error: {exc}"
|
|
|
|
|
logger.error(msg)
|
|
|
|
|
if self._on_error:
|
|
|
|
|
self._on_error(msg)
|
|
|
|
|
finally:
|
|
|
|
|
await self._close_ws()
|
|
|
|
|
|
|
|
|
|
def _build_ws_url_and_headers(self):
|
|
|
|
|
"""Return ``(url, headers)`` depending on the current mode."""
|
|
|
|
|
if self.mode == "managed":
|
|
|
|
|
# Ensure the server URL uses wss:// and append the path
|
|
|
|
|
url = self.server_url.rstrip("/")
|
|
|
|
|
if not url.startswith("ws://") and not url.startswith("wss://"):
|
|
|
|
|
url = f"wss://{url}"
|
|
|
|
|
url = f"{url}/ws/transcribe"
|
|
|
|
|
return url, {}
|
|
|
|
|
|
|
|
|
|
# BYOK -- connect directly to Deepgram
|
|
|
|
|
params = (
|
|
|
|
|
f"model={self.deepgram_model}"
|
|
|
|
|
f"&language={self.language}"
|
|
|
|
|
"&interim_results=true"
|
2026-04-07 16:31:50 -07:00
|
|
|
"&punctuate=true"
|
|
|
|
|
"&smart_format=true"
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
"&encoding=linear16"
|
|
|
|
|
f"&sample_rate={self.sample_rate}"
|
|
|
|
|
f"&channels={self.channels}"
|
|
|
|
|
)
|
|
|
|
|
url = f"wss://api.deepgram.com/v1/listen?{params}"
|
|
|
|
|
headers = {"Authorization": f"Token {self.byok_api_key}"}
|
|
|
|
|
return url, headers
|
|
|
|
|
|
|
|
|
|
# -- managed-mode handshake ---------------------------------------- #
|
|
|
|
|
|
|
|
|
|
async def _managed_handshake(self) -> bool:
|
|
|
|
|
"""Send auth message and wait for ``ready`` (managed mode).
|
|
|
|
|
|
|
|
|
|
Returns ``True`` on success.
|
|
|
|
|
"""
|
|
|
|
|
auth_msg = {
|
|
|
|
|
"type": "auth",
|
|
|
|
|
"token": self.auth_token,
|
|
|
|
|
"config": {
|
|
|
|
|
"model": self.deepgram_model,
|
|
|
|
|
"language": self.language,
|
|
|
|
|
"sample_rate": self.sample_rate,
|
|
|
|
|
"channels": self.channels,
|
|
|
|
|
"encoding": "linear16",
|
|
|
|
|
"interim_results": True,
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
await self._ws.send(json.dumps(auth_msg))
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
raw = await asyncio.wait_for(self._ws.recv(), timeout=15)
|
|
|
|
|
data = json.loads(raw)
|
|
|
|
|
if data.get("type") == "ready":
|
|
|
|
|
logger.info("Managed proxy is ready")
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if data.get("type") == "error":
|
|
|
|
|
err = data.get("message", "unknown error")
|
|
|
|
|
logger.error("Auth error from proxy: %s", err)
|
|
|
|
|
if self._on_error:
|
|
|
|
|
self._on_error(f"Proxy auth error: {err}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
logger.warning("Unexpected handshake message: %s", data)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
logger.error("Timed out waiting for proxy ready message")
|
|
|
|
|
if self._on_error:
|
|
|
|
|
self._on_error("Timed out waiting for proxy ready message")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# -- send loop ----------------------------------------------------- #
|
|
|
|
|
|
|
|
|
|
async def _send_loop(self):
|
|
|
|
|
"""Drain the audio queue and push raw PCM bytes over the WebSocket."""
|
2026-04-07 16:31:50 -07:00
|
|
|
loop = asyncio.get_event_loop()
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
while not self._stop_event.is_set():
|
|
|
|
|
try:
|
2026-04-07 16:31:50 -07:00
|
|
|
# Use run_in_executor to avoid blocking the async event loop
|
|
|
|
|
# (which would stall the receive loop and delay transcriptions)
|
|
|
|
|
pcm_bytes = await asyncio.wait_for(
|
|
|
|
|
loop.run_in_executor(None, lambda: self._audio_queue.get(timeout=0.5)),
|
|
|
|
|
timeout=1.0,
|
|
|
|
|
)
|
|
|
|
|
except (Empty, asyncio.TimeoutError):
|
Phase 6: Add Deepgram remote transcription (managed + BYOK modes)
New files:
- client/deepgram_transcription.py — DeepgramTranscriptionEngine with
managed mode (proxy) and BYOK mode (direct Deepgram). Sends raw binary
PCM audio over WebSocket, handles both proxy and Deepgram response formats.
Modified files:
- config/default_config.yaml — Replace remote_processing with new remote
section (mode, server_url, auth_token, byok_api_key, deepgram_model, language)
- client/config.py — Add migration from old remote_processing config
- gui/settings_dialog_qt.py — Replace Remote Processing group with
Transcription Mode section (Local/Managed/BYOK radio buttons, login/register
dialogs, balance display, model selector)
- gui/main_window_qt.py — Select engine based on remote.mode config,
add error and credits_low handlers
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-05 11:45:30 -07:00
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
await self._ws.send(pcm_bytes)
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
if not self._stop_event.is_set():
|
|
|
|
|
logger.error("Send error: %s", exc)
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# -- receive loop -------------------------------------------------- #
|
|
|
|
|
|
|
|
|
|
async def _receive_loop(self):
|
|
|
|
|
"""Listen for messages from the WebSocket and dispatch them."""
|
|
|
|
|
while not self._stop_event.is_set():
|
|
|
|
|
try:
|
|
|
|
|
raw = await asyncio.wait_for(self._ws.recv(), timeout=1.0)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
continue
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
if not self._stop_event.is_set():
|
|
|
|
|
logger.error("Receive error: %s", exc)
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
data = json.loads(raw)
|
|
|
|
|
except (json.JSONDecodeError, TypeError):
|
|
|
|
|
logger.debug("Non-JSON message received, ignoring")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
if self.mode == "managed":
|
|
|
|
|
self._handle_managed_message(data)
|
|
|
|
|
else:
|
|
|
|
|
self._handle_byok_message(data)
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Message handlers
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
def _handle_managed_message(self, data: dict):
|
|
|
|
|
"""Process a message from the managed proxy."""
|
|
|
|
|
msg_type = data.get("type", "")
|
|
|
|
|
|
|
|
|
|
if msg_type == "transcript":
|
|
|
|
|
text = data.get("text", "")
|
|
|
|
|
is_final = data.get("is_final", False)
|
|
|
|
|
if text.strip():
|
|
|
|
|
result = TranscriptionResult(
|
|
|
|
|
text=text,
|
|
|
|
|
is_final=is_final,
|
|
|
|
|
timestamp=datetime.now(),
|
|
|
|
|
user_name=self.user_name,
|
|
|
|
|
)
|
|
|
|
|
if is_final:
|
|
|
|
|
if self.final_callback:
|
|
|
|
|
self.final_callback(result)
|
|
|
|
|
else:
|
|
|
|
|
if self.realtime_callback:
|
|
|
|
|
self.realtime_callback(result)
|
|
|
|
|
|
|
|
|
|
elif msg_type == "credits_low":
|
|
|
|
|
seconds_remaining = data.get("seconds_remaining", 0)
|
|
|
|
|
logger.warning("Credits low -- %d seconds remaining", seconds_remaining)
|
|
|
|
|
if self._on_credits_low:
|
|
|
|
|
self._on_credits_low(int(seconds_remaining))
|
|
|
|
|
|
|
|
|
|
elif msg_type == "error":
|
|
|
|
|
code = data.get("code", "")
|
|
|
|
|
message = data.get("message", "Unknown error")
|
|
|
|
|
logger.error("Proxy error [%s]: %s", code, message)
|
|
|
|
|
if self._on_error:
|
|
|
|
|
self._on_error(f"[{code}] {message}" if code else message)
|
|
|
|
|
|
|
|
|
|
elif msg_type == "session_end":
|
|
|
|
|
seconds_used = data.get("seconds_used", 0)
|
|
|
|
|
logger.info("Session ended -- %d seconds used", seconds_used)
|
|
|
|
|
|
|
|
|
|
elif msg_type == "ready":
|
|
|
|
|
# May arrive again after reconnects; safe to ignore.
|
|
|
|
|
logger.debug("Received ready message (already connected)")
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
logger.debug("Unhandled managed message type: %s", msg_type)
|
|
|
|
|
|
|
|
|
|
def _handle_byok_message(self, data: dict):
|
|
|
|
|
"""Process a message received directly from the Deepgram API."""
|
|
|
|
|
msg_type = data.get("type", "")
|
|
|
|
|
|
|
|
|
|
if msg_type == "Results":
|
|
|
|
|
channel = data.get("channel", {})
|
|
|
|
|
alternatives = channel.get("alternatives", [])
|
|
|
|
|
if not alternatives:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
transcript = alternatives[0].get("transcript", "")
|
|
|
|
|
is_final = data.get("is_final", False)
|
|
|
|
|
|
|
|
|
|
if transcript.strip():
|
|
|
|
|
result = TranscriptionResult(
|
|
|
|
|
text=transcript,
|
|
|
|
|
is_final=is_final,
|
|
|
|
|
timestamp=datetime.now(),
|
|
|
|
|
user_name=self.user_name,
|
|
|
|
|
)
|
|
|
|
|
if is_final:
|
|
|
|
|
if self.final_callback:
|
|
|
|
|
self.final_callback(result)
|
|
|
|
|
else:
|
|
|
|
|
if self.realtime_callback:
|
|
|
|
|
self.realtime_callback(result)
|
|
|
|
|
|
|
|
|
|
elif msg_type == "Metadata":
|
|
|
|
|
logger.debug("Deepgram metadata: %s", data)
|
|
|
|
|
|
|
|
|
|
elif msg_type == "UtteranceEnd":
|
|
|
|
|
logger.debug("Deepgram utterance end")
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
logger.debug("Unhandled Deepgram message type: %s", msg_type)
|
|
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
# Helpers
|
|
|
|
|
# ------------------------------------------------------------------ #
|
|
|
|
|
|
|
|
|
|
async def _close_ws(self):
|
|
|
|
|
"""Close the WebSocket connection if open."""
|
|
|
|
|
if self._ws is not None:
|
|
|
|
|
try:
|
|
|
|
|
await self._ws.close()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|
|
|
|
|
self._ws = None
|
|
|
|
|
|
|
|
|
|
def set_user_name(self, user_name: str):
|
|
|
|
|
"""Update the user name attached to future transcriptions."""
|
|
|
|
|
self.user_name = user_name
|
|
|
|
|
|
|
|
|
|
def is_recording_active(self) -> bool:
|
|
|
|
|
"""Return ``True`` if audio is currently being captured."""
|
|
|
|
|
return self._is_recording
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
return (
|
|
|
|
|
f"DeepgramTranscriptionEngine(mode={self.mode}, "
|
|
|
|
|
f"recording={self._is_recording})"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
"""Best-effort cleanup."""
|
|
|
|
|
try:
|
|
|
|
|
self.stop()
|
|
|
|
|
except Exception:
|
|
|
|
|
pass
|