From 352615c15c3dfa27b752385e032a4ba71945562f Mon Sep 17 00:00:00 2001 From: Developer Date: Wed, 8 Apr 2026 12:18:47 -0700 Subject: [PATCH] Fix Deepgram broken pipe: wait for WebSocket before starting audio Audio capture started immediately after spawning the WebSocket thread, but the WebSocket hadn't connected yet. Audio chunks sent to the unconnected WebSocket caused a broken pipe error. Fix: added a threading.Event that start_recording() waits on (up to 15s) before opening the audio stream. The event is set in _ws_lifecycle after the WebSocket connects and handshake completes. Co-Authored-By: Claude Opus 4.6 (1M context) --- client/deepgram_transcription.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client/deepgram_transcription.py b/client/deepgram_transcription.py index 68b05a5..0feeba2 100644 --- a/client/deepgram_transcription.py +++ b/client/deepgram_transcription.py @@ -156,12 +156,23 @@ class DeepgramTranscriptionEngine: return True self._stop_event.clear() + self._ws_connected = threading.Event() self._is_recording = True # Start the asyncio event-loop thread (handles WS send/receive) self._thread = threading.Thread(target=self._run_event_loop, daemon=True) self._thread.start() + # Wait for the WebSocket to connect before starting audio capture. + # Without this, audio chunks arrive before the WS is open -> broken pipe. + if not self._ws_connected.wait(timeout=15): + logger.error("Timed out waiting for Deepgram WebSocket connection") + print("ERROR: Timed out waiting for Deepgram WebSocket connection") + self._last_error = "Timed out connecting to Deepgram" + self._is_recording = False + self._stop_event.set() + return False + # Start the audio capture stream try: self._start_audio_stream() @@ -285,6 +296,11 @@ class DeepgramTranscriptionEngine: if not await self._managed_handshake(): return + # Signal that the WebSocket is connected and ready + logger.info("WebSocket connected to Deepgram") + if hasattr(self, '_ws_connected'): + self._ws_connected.set() + # Run send and receive concurrently await asyncio.gather( self._send_loop(),