Fix Deepgram broken pipe: wait for WebSocket before starting audio
Audio capture started immediately after spawning the WebSocket thread, but the WebSocket hadn't connected yet. Audio chunks sent to the unconnected WebSocket caused a broken pipe error. Fix: added a threading.Event that start_recording() waits on (up to 15s) before opening the audio stream. The event is set in _ws_lifecycle after the WebSocket connects and handshake completes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -156,12 +156,23 @@ class DeepgramTranscriptionEngine:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
self._stop_event.clear()
|
self._stop_event.clear()
|
||||||
|
self._ws_connected = threading.Event()
|
||||||
self._is_recording = True
|
self._is_recording = True
|
||||||
|
|
||||||
# Start the asyncio event-loop thread (handles WS send/receive)
|
# Start the asyncio event-loop thread (handles WS send/receive)
|
||||||
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
|
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
|
||||||
self._thread.start()
|
self._thread.start()
|
||||||
|
|
||||||
|
# Wait for the WebSocket to connect before starting audio capture.
|
||||||
|
# Without this, audio chunks arrive before the WS is open -> broken pipe.
|
||||||
|
if not self._ws_connected.wait(timeout=15):
|
||||||
|
logger.error("Timed out waiting for Deepgram WebSocket connection")
|
||||||
|
print("ERROR: Timed out waiting for Deepgram WebSocket connection")
|
||||||
|
self._last_error = "Timed out connecting to Deepgram"
|
||||||
|
self._is_recording = False
|
||||||
|
self._stop_event.set()
|
||||||
|
return False
|
||||||
|
|
||||||
# Start the audio capture stream
|
# Start the audio capture stream
|
||||||
try:
|
try:
|
||||||
self._start_audio_stream()
|
self._start_audio_stream()
|
||||||
@@ -285,6 +296,11 @@ class DeepgramTranscriptionEngine:
|
|||||||
if not await self._managed_handshake():
|
if not await self._managed_handshake():
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Signal that the WebSocket is connected and ready
|
||||||
|
logger.info("WebSocket connected to Deepgram")
|
||||||
|
if hasattr(self, '_ws_connected'):
|
||||||
|
self._ws_connected.set()
|
||||||
|
|
||||||
# Run send and receive concurrently
|
# Run send and receive concurrently
|
||||||
await asyncio.gather(
|
await asyncio.gather(
|
||||||
self._send_loop(),
|
self._send_loop(),
|
||||||
|
|||||||
Reference in New Issue
Block a user