Files
local-transcription/client/transcription_engine_realtime.py
Developer 1c8c6ad7e8
All checks were successful
Tests / Python Backend Tests (push) Successful in 5s
Tests / Frontend Tests (push) Successful in 7s
Tests / Rust Sidecar Tests (push) Successful in 3m12s
Fix display user not updating locally until app restart
Engines now read user.name from the config object at transcription time
instead of caching it at init, so name changes take effect immediately.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-12 10:40:46 -07:00

420 lines
15 KiB
Python

"""RealtimeSTT-based transcription engine with advanced VAD and word-loss prevention."""
import numpy as np
from RealtimeSTT import AudioToTextRecorder
from typing import Optional, Callable
from datetime import datetime
from threading import Lock
import logging
# Re-export TranscriptionResult from the shared models module for backward compatibility
from client.models import TranscriptionResult # noqa: F401
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
'text': self.text,
'is_final': self.is_final,
'timestamp': self.timestamp.isoformat(),
'user_name': self.user_name
}
class RealtimeTranscriptionEngine:
"""
Transcription engine using RealtimeSTT for advanced VAD-based speech detection.
This engine eliminates word loss by:
- Using dual-layer VAD (WebRTC + Silero) to detect speech boundaries
- Pre-recording buffer to capture word starts
- Post-speech silence detection to avoid cutting off endings
- Optional realtime preview with faster model + final transcription with better model
"""
def __init__(
self,
model: str = "base.en",
device: str = "auto",
language: str = "en",
compute_type: str = "default",
# Realtime preview settings
enable_realtime_transcription: bool = False,
realtime_model: str = "tiny.en",
realtime_processing_pause: float = 0.1, # How often to update preview (lower = more frequent)
# VAD settings
silero_sensitivity: float = 0.4,
silero_use_onnx: bool = True,
webrtc_sensitivity: int = 3,
# Post-processing settings
post_speech_silence_duration: float = 0.3,
min_length_of_recording: float = 0.5,
min_gap_between_recordings: float = 0.0,
pre_recording_buffer_duration: float = 0.2,
# Quality settings
beam_size: int = 5,
initial_prompt: str = "",
# Performance
no_log_file: bool = True,
# Audio device
input_device_index: Optional[int] = None,
# App config (for reading user.name at transcription time)
app_config=None
):
"""
Initialize RealtimeSTT transcription engine.
Args:
model: Whisper model for final transcription
device: Device to use ('auto', 'cuda', 'cpu')
language: Language code for transcription
compute_type: Compute type ('default', 'int8', 'float16', 'float32')
enable_realtime_transcription: Enable live preview with faster model
realtime_model: Model for realtime preview (should be tiny/base)
silero_sensitivity: Silero VAD sensitivity (0.0-1.0, lower = more sensitive)
silero_use_onnx: Use ONNX for faster VAD
webrtc_sensitivity: WebRTC VAD sensitivity (0-3, lower = more sensitive)
post_speech_silence_duration: Silence duration before finalizing
min_length_of_recording: Minimum recording length
min_gap_between_recordings: Minimum gap between recordings
pre_recording_buffer_duration: Pre-recording buffer to capture word starts
beam_size: Beam size for decoding (higher = better quality)
initial_prompt: Optional prompt to guide transcription
no_log_file: Disable RealtimeSTT logging
input_device_index: Audio input device index
app_config: App Config object for reading user.name dynamically
"""
self.model = model
self.language = language
self.compute_type = compute_type
# Resolve device - 'auto' means use CUDA if available, else CPU
if device == 'auto':
try:
import torch
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
except:
self.device = 'cpu'
else:
self.device = device
self.enable_realtime = enable_realtime_transcription
self.realtime_model = realtime_model
self.realtime_processing_pause = realtime_processing_pause
self.app_config = app_config
# Callbacks
self.realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None
self.final_callback: Optional[Callable[[TranscriptionResult], None]] = None
# RealtimeSTT recorder
self.recorder: Optional[AudioToTextRecorder] = None
self.is_initialized = False
self.is_recording = False
self.transcription_thread = None
self.lock = Lock()
# Disable RealtimeSTT logging if requested
if no_log_file:
logging.getLogger('RealtimeSTT').setLevel(logging.ERROR)
# Store configuration for recorder initialization
self.config = {
'model': model,
'device': self.device, # Use resolved device (auto -> cuda/cpu)
'language': language if language != 'auto' else None,
'compute_type': compute_type if compute_type != 'default' else 'default',
'input_device_index': input_device_index,
'silero_sensitivity': silero_sensitivity,
'silero_use_onnx': silero_use_onnx,
'webrtc_sensitivity': webrtc_sensitivity,
'post_speech_silence_duration': post_speech_silence_duration,
'min_length_of_recording': min_length_of_recording,
'min_gap_between_recordings': min_gap_between_recordings,
'pre_recording_buffer_duration': pre_recording_buffer_duration,
'beam_size': beam_size,
'initial_prompt': initial_prompt if initial_prompt else None,
'enable_realtime_transcription': enable_realtime_transcription,
'realtime_model_type': realtime_model if enable_realtime_transcription else None,
'realtime_processing_pause': realtime_processing_pause if enable_realtime_transcription else 0.2,
# The realtime callback is added during initialize() after set_callbacks is called
}
def _is_cuda_available(self) -> bool:
"""Check if CUDA is available."""
try:
import torch
return torch.cuda.is_available()
except:
return False
def set_callbacks(
self,
realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None,
final_callback: Optional[Callable[[TranscriptionResult], None]] = None
):
"""
Set callbacks for realtime and final transcriptions.
Args:
realtime_callback: Called for realtime preview transcriptions
final_callback: Called for final transcriptions
"""
self.realtime_callback = realtime_callback
self.final_callback = final_callback
def _get_user_name(self) -> str:
if self.app_config:
return self.app_config.get('user.name', '')
return ''
def _on_realtime_transcription(self, text: str):
"""Internal callback for realtime transcriptions."""
if self.realtime_callback and text.strip():
result = TranscriptionResult(
text=text,
is_final=False,
timestamp=datetime.now(),
user_name=self._get_user_name()
)
self.realtime_callback(result)
def _on_final_transcription(self, text: str):
"""Internal callback for final transcriptions."""
if self.final_callback and text.strip():
result = TranscriptionResult(
text=text,
is_final=True,
timestamp=datetime.now(),
user_name=self._get_user_name()
)
self.final_callback(result)
def initialize(self) -> bool:
"""
Initialize the transcription engine (load models, setup VAD).
Does NOT start recording yet.
Returns:
True if initialized successfully, False otherwise
"""
with self.lock:
if self.is_initialized:
return True
try:
print(f"Initializing RealtimeSTT with model: {self.model}")
print(f" Device: {self.device}, Compute type: {self.compute_type}")
if self.enable_realtime:
print(f" Realtime preview enabled with model: {self.realtime_model}")
print(f" Realtime processing pause: {self.realtime_processing_pause}s")
# Add realtime transcription callback if enabled
# This provides word-by-word updates as speech is being processed
if self.enable_realtime:
self.config['on_realtime_transcription_update'] = self._on_realtime_transcription
# Create recorder with configuration
self.recorder = AudioToTextRecorder(**self.config)
self.is_initialized = True
print("RealtimeSTT initialized successfully")
return True
except Exception as e:
print(f"Error initializing RealtimeSTT: {e}")
self.is_initialized = False
return False
def start_recording(self) -> bool:
"""
Start recording and transcription.
Must call initialize() first.
Returns:
True if started successfully, False otherwise
"""
with self.lock:
if not self.is_initialized:
print("Error: Engine not initialized. Call initialize() first.")
return False
if self.is_recording:
return True
try:
import threading
def transcription_loop():
"""Run transcription loop in background thread."""
while self.is_recording:
try:
# Get transcription (this blocks until speech is detected and processed)
# Will raise exception when recorder is stopped
text = self.recorder.text()
if text and text.strip() and self.is_recording:
# This is always a final transcription
self._on_final_transcription(text)
except Exception as e:
# Expected when stopping - recorder.stop() will cause text() to raise exception
if self.is_recording: # Only print if we're still supposed to be recording
print(f"Error in transcription loop: {e}")
break
# Start the recorder
self.recorder.start()
# Start transcription loop in background thread
self.is_recording = True
self.transcription_thread = threading.Thread(target=transcription_loop, daemon=True)
self.transcription_thread.start()
print("Recording started")
return True
except Exception as e:
print(f"Error starting recording: {e}")
self.is_recording = False
return False
def stop_recording(self):
"""Stop recording and transcription."""
import time
# Check if already stopped
with self.lock:
if not self.is_recording:
return
# Set flag first so transcription loop can exit
self.is_recording = False
# Stop the recorder outside the lock (it may block)
try:
if self.recorder:
# Stop the recorder - this should unblock the text() call
self.recorder.stop()
# Give the transcription thread a moment to exit cleanly
time.sleep(0.1)
print("Recording stopped")
except Exception as e:
print(f"Error stopping recording: {e}")
def stop(self):
"""Stop recording and shutdown the engine completely."""
self.stop_recording()
with self.lock:
try:
if self.recorder:
self.recorder.shutdown()
self.recorder = None
self.is_initialized = False
print("RealtimeSTT shutdown")
except Exception as e:
print(f"Error shutting down RealtimeSTT: {e}")
def is_recording_active(self) -> bool:
"""Check if recording is currently active."""
return self.is_recording
def is_ready(self) -> bool:
"""Check if engine is initialized and ready."""
return self.is_initialized
def change_model(self, model: str, realtime_model: Optional[str] = None) -> bool:
"""
Change the transcription model.
Args:
model: New model for final transcription
realtime_model: Optional new model for realtime preview
Returns:
True if model changed successfully
"""
was_running = self.is_recording
# Stop current recording
self.stop()
# Update configuration
self.model = model
self.config['model'] = model
if realtime_model:
self.realtime_model = realtime_model
self.config['realtime_model_type'] = realtime_model
# Restart if it was running
if was_running:
return self.start()
return True
def change_device(self, device: str, compute_type: Optional[str] = None) -> bool:
"""
Change compute device.
Args:
device: New device ('auto', 'cuda', 'cpu')
compute_type: Optional new compute type
Returns:
True if device changed successfully
"""
was_running = self.is_recording
# Stop current recording
self.stop()
# Update configuration
self.device = device
self.config['device'] = device
if compute_type:
self.compute_type = compute_type
self.config['compute_type'] = compute_type
# Restart if it was running
if was_running:
return self.start()
return True
def change_language(self, language: str):
"""
Change transcription language.
Args:
language: Language code or 'auto'
"""
self.language = language
self.config['language'] = language if language != 'auto' else None
def update_vad_sensitivity(self, silero_sensitivity: float, webrtc_sensitivity: int):
"""
Update VAD sensitivity settings.
Args:
silero_sensitivity: Silero VAD sensitivity (0.0-1.0)
webrtc_sensitivity: WebRTC VAD sensitivity (0-3)
"""
self.config['silero_sensitivity'] = silero_sensitivity
self.config['webrtc_sensitivity'] = webrtc_sensitivity
# If running, need to restart to apply changes
if self.is_recording:
print("VAD settings updated. Restart transcription to apply changes.")
def __repr__(self) -> str:
return f"RealtimeTranscriptionEngine(model={self.model}, device={self.device}, running={self.is_recording})"
def __del__(self):
"""Cleanup when object is destroyed."""
self.stop()