2025-12-28 18:48:29 -08:00
|
|
|
"""RealtimeSTT-based transcription engine with advanced VAD and word-loss prevention."""
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
from RealtimeSTT import AudioToTextRecorder
|
|
|
|
|
from typing import Optional, Callable
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
from threading import Lock
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TranscriptionResult:
|
|
|
|
|
"""Represents a transcription result."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, text: str, is_final: bool, timestamp: datetime, user_name: str = ""):
|
|
|
|
|
"""
|
|
|
|
|
Initialize transcription result.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
text: Transcribed text
|
|
|
|
|
is_final: Whether this is a final transcription or realtime preview
|
|
|
|
|
timestamp: Timestamp of transcription
|
|
|
|
|
user_name: Name of the user/speaker
|
|
|
|
|
"""
|
|
|
|
|
self.text = text.strip()
|
|
|
|
|
self.is_final = is_final
|
|
|
|
|
self.timestamp = timestamp
|
|
|
|
|
self.user_name = user_name
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
|
|
time_str = self.timestamp.strftime("%H:%M:%S")
|
|
|
|
|
prefix = "[FINAL]" if self.is_final else "[PREVIEW]"
|
2026-01-11 18:56:12 -08:00
|
|
|
if self.user_name and self.user_name.strip():
|
2025-12-28 18:48:29 -08:00
|
|
|
return f"{prefix} [{time_str}] {self.user_name}: {self.text}"
|
|
|
|
|
return f"{prefix} [{time_str}] {self.text}"
|
|
|
|
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
|
|
"""Convert to dictionary."""
|
|
|
|
|
return {
|
|
|
|
|
'text': self.text,
|
|
|
|
|
'is_final': self.is_final,
|
|
|
|
|
'timestamp': self.timestamp.isoformat(),
|
|
|
|
|
'user_name': self.user_name
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RealtimeTranscriptionEngine:
|
|
|
|
|
"""
|
|
|
|
|
Transcription engine using RealtimeSTT for advanced VAD-based speech detection.
|
|
|
|
|
|
|
|
|
|
This engine eliminates word loss by:
|
|
|
|
|
- Using dual-layer VAD (WebRTC + Silero) to detect speech boundaries
|
|
|
|
|
- Pre-recording buffer to capture word starts
|
|
|
|
|
- Post-speech silence detection to avoid cutting off endings
|
|
|
|
|
- Optional realtime preview with faster model + final transcription with better model
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
|
self,
|
|
|
|
|
model: str = "base.en",
|
|
|
|
|
device: str = "auto",
|
|
|
|
|
language: str = "en",
|
|
|
|
|
compute_type: str = "default",
|
|
|
|
|
# Realtime preview settings
|
|
|
|
|
enable_realtime_transcription: bool = False,
|
|
|
|
|
realtime_model: str = "tiny.en",
|
2026-01-11 18:56:12 -08:00
|
|
|
realtime_processing_pause: float = 0.1, # How often to update preview (lower = more frequent)
|
2025-12-28 18:48:29 -08:00
|
|
|
# VAD settings
|
|
|
|
|
silero_sensitivity: float = 0.4,
|
|
|
|
|
silero_use_onnx: bool = True,
|
|
|
|
|
webrtc_sensitivity: int = 3,
|
|
|
|
|
# Post-processing settings
|
|
|
|
|
post_speech_silence_duration: float = 0.3,
|
|
|
|
|
min_length_of_recording: float = 0.5,
|
|
|
|
|
min_gap_between_recordings: float = 0.0,
|
|
|
|
|
pre_recording_buffer_duration: float = 0.2,
|
|
|
|
|
# Quality settings
|
|
|
|
|
beam_size: int = 5,
|
|
|
|
|
initial_prompt: str = "",
|
|
|
|
|
# Performance
|
|
|
|
|
no_log_file: bool = True,
|
|
|
|
|
# Audio device
|
|
|
|
|
input_device_index: Optional[int] = None,
|
|
|
|
|
# User name
|
|
|
|
|
user_name: str = ""
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Initialize RealtimeSTT transcription engine.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
model: Whisper model for final transcription
|
|
|
|
|
device: Device to use ('auto', 'cuda', 'cpu')
|
|
|
|
|
language: Language code for transcription
|
|
|
|
|
compute_type: Compute type ('default', 'int8', 'float16', 'float32')
|
|
|
|
|
enable_realtime_transcription: Enable live preview with faster model
|
|
|
|
|
realtime_model: Model for realtime preview (should be tiny/base)
|
|
|
|
|
silero_sensitivity: Silero VAD sensitivity (0.0-1.0, lower = more sensitive)
|
|
|
|
|
silero_use_onnx: Use ONNX for faster VAD
|
|
|
|
|
webrtc_sensitivity: WebRTC VAD sensitivity (0-3, lower = more sensitive)
|
|
|
|
|
post_speech_silence_duration: Silence duration before finalizing
|
|
|
|
|
min_length_of_recording: Minimum recording length
|
|
|
|
|
min_gap_between_recordings: Minimum gap between recordings
|
|
|
|
|
pre_recording_buffer_duration: Pre-recording buffer to capture word starts
|
|
|
|
|
beam_size: Beam size for decoding (higher = better quality)
|
|
|
|
|
initial_prompt: Optional prompt to guide transcription
|
|
|
|
|
no_log_file: Disable RealtimeSTT logging
|
|
|
|
|
input_device_index: Audio input device index
|
|
|
|
|
user_name: User name for transcriptions
|
|
|
|
|
"""
|
|
|
|
|
self.model = model
|
|
|
|
|
self.language = language
|
|
|
|
|
self.compute_type = compute_type
|
2026-01-11 18:56:12 -08:00
|
|
|
|
|
|
|
|
# Resolve device - 'auto' means use CUDA if available, else CPU
|
|
|
|
|
if device == 'auto':
|
|
|
|
|
try:
|
|
|
|
|
import torch
|
|
|
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
|
|
|
except:
|
|
|
|
|
self.device = 'cpu'
|
|
|
|
|
else:
|
|
|
|
|
self.device = device
|
2025-12-28 18:48:29 -08:00
|
|
|
self.enable_realtime = enable_realtime_transcription
|
|
|
|
|
self.realtime_model = realtime_model
|
2026-01-11 18:56:12 -08:00
|
|
|
self.realtime_processing_pause = realtime_processing_pause
|
2025-12-28 18:48:29 -08:00
|
|
|
self.user_name = user_name
|
|
|
|
|
|
|
|
|
|
# Callbacks
|
|
|
|
|
self.realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
|
|
|
|
self.final_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
|
|
|
|
|
|
|
|
|
# RealtimeSTT recorder
|
|
|
|
|
self.recorder: Optional[AudioToTextRecorder] = None
|
|
|
|
|
self.is_initialized = False
|
|
|
|
|
self.is_recording = False
|
|
|
|
|
self.transcription_thread = None
|
|
|
|
|
self.lock = Lock()
|
|
|
|
|
|
|
|
|
|
# Disable RealtimeSTT logging if requested
|
|
|
|
|
if no_log_file:
|
|
|
|
|
logging.getLogger('RealtimeSTT').setLevel(logging.ERROR)
|
|
|
|
|
|
|
|
|
|
# Store configuration for recorder initialization
|
|
|
|
|
self.config = {
|
|
|
|
|
'model': model,
|
2026-01-11 18:56:12 -08:00
|
|
|
'device': self.device, # Use resolved device (auto -> cuda/cpu)
|
2025-12-28 18:48:29 -08:00
|
|
|
'language': language if language != 'auto' else None,
|
|
|
|
|
'compute_type': compute_type if compute_type != 'default' else 'default',
|
|
|
|
|
'input_device_index': input_device_index,
|
|
|
|
|
'silero_sensitivity': silero_sensitivity,
|
|
|
|
|
'silero_use_onnx': silero_use_onnx,
|
|
|
|
|
'webrtc_sensitivity': webrtc_sensitivity,
|
|
|
|
|
'post_speech_silence_duration': post_speech_silence_duration,
|
|
|
|
|
'min_length_of_recording': min_length_of_recording,
|
|
|
|
|
'min_gap_between_recordings': min_gap_between_recordings,
|
|
|
|
|
'pre_recording_buffer_duration': pre_recording_buffer_duration,
|
|
|
|
|
'beam_size': beam_size,
|
|
|
|
|
'initial_prompt': initial_prompt if initial_prompt else None,
|
|
|
|
|
'enable_realtime_transcription': enable_realtime_transcription,
|
|
|
|
|
'realtime_model_type': realtime_model if enable_realtime_transcription else None,
|
2026-01-11 18:56:12 -08:00
|
|
|
'realtime_processing_pause': realtime_processing_pause if enable_realtime_transcription else 0.2,
|
|
|
|
|
# The realtime callback is added during initialize() after set_callbacks is called
|
2025-12-28 18:48:29 -08:00
|
|
|
}
|
|
|
|
|
|
2026-01-11 18:56:12 -08:00
|
|
|
def _is_cuda_available(self) -> bool:
|
|
|
|
|
"""Check if CUDA is available."""
|
|
|
|
|
try:
|
|
|
|
|
import torch
|
|
|
|
|
return torch.cuda.is_available()
|
|
|
|
|
except:
|
|
|
|
|
return False
|
|
|
|
|
|
2025-12-28 18:48:29 -08:00
|
|
|
def set_callbacks(
|
|
|
|
|
self,
|
|
|
|
|
realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None,
|
|
|
|
|
final_callback: Optional[Callable[[TranscriptionResult], None]] = None
|
|
|
|
|
):
|
|
|
|
|
"""
|
|
|
|
|
Set callbacks for realtime and final transcriptions.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
realtime_callback: Called for realtime preview transcriptions
|
|
|
|
|
final_callback: Called for final transcriptions
|
|
|
|
|
"""
|
|
|
|
|
self.realtime_callback = realtime_callback
|
|
|
|
|
self.final_callback = final_callback
|
|
|
|
|
|
|
|
|
|
def _on_realtime_transcription(self, text: str):
|
|
|
|
|
"""Internal callback for realtime transcriptions."""
|
|
|
|
|
if self.realtime_callback and text.strip():
|
|
|
|
|
result = TranscriptionResult(
|
|
|
|
|
text=text,
|
|
|
|
|
is_final=False,
|
|
|
|
|
timestamp=datetime.now(),
|
|
|
|
|
user_name=self.user_name
|
|
|
|
|
)
|
|
|
|
|
self.realtime_callback(result)
|
|
|
|
|
|
|
|
|
|
def _on_final_transcription(self, text: str):
|
|
|
|
|
"""Internal callback for final transcriptions."""
|
|
|
|
|
if self.final_callback and text.strip():
|
|
|
|
|
result = TranscriptionResult(
|
|
|
|
|
text=text,
|
|
|
|
|
is_final=True,
|
|
|
|
|
timestamp=datetime.now(),
|
|
|
|
|
user_name=self.user_name
|
|
|
|
|
)
|
|
|
|
|
self.final_callback(result)
|
|
|
|
|
|
|
|
|
|
def initialize(self) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Initialize the transcription engine (load models, setup VAD).
|
|
|
|
|
Does NOT start recording yet.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if initialized successfully, False otherwise
|
|
|
|
|
"""
|
|
|
|
|
with self.lock:
|
|
|
|
|
if self.is_initialized:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
print(f"Initializing RealtimeSTT with model: {self.model}")
|
2026-01-11 18:56:12 -08:00
|
|
|
print(f" Device: {self.device}, Compute type: {self.compute_type}")
|
2025-12-28 18:48:29 -08:00
|
|
|
if self.enable_realtime:
|
|
|
|
|
print(f" Realtime preview enabled with model: {self.realtime_model}")
|
2026-01-11 18:56:12 -08:00
|
|
|
print(f" Realtime processing pause: {self.realtime_processing_pause}s")
|
|
|
|
|
|
|
|
|
|
# Add realtime transcription callback if enabled
|
|
|
|
|
# This provides word-by-word updates as speech is being processed
|
|
|
|
|
if self.enable_realtime:
|
|
|
|
|
self.config['on_realtime_transcription_update'] = self._on_realtime_transcription
|
2025-12-28 18:48:29 -08:00
|
|
|
|
|
|
|
|
# Create recorder with configuration
|
|
|
|
|
self.recorder = AudioToTextRecorder(**self.config)
|
|
|
|
|
|
|
|
|
|
self.is_initialized = True
|
|
|
|
|
print("RealtimeSTT initialized successfully")
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error initializing RealtimeSTT: {e}")
|
|
|
|
|
self.is_initialized = False
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def start_recording(self) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Start recording and transcription.
|
|
|
|
|
Must call initialize() first.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if started successfully, False otherwise
|
|
|
|
|
"""
|
|
|
|
|
with self.lock:
|
|
|
|
|
if not self.is_initialized:
|
|
|
|
|
print("Error: Engine not initialized. Call initialize() first.")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
if self.is_recording:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import threading
|
|
|
|
|
|
|
|
|
|
def transcription_loop():
|
|
|
|
|
"""Run transcription loop in background thread."""
|
|
|
|
|
while self.is_recording:
|
|
|
|
|
try:
|
|
|
|
|
# Get transcription (this blocks until speech is detected and processed)
|
|
|
|
|
# Will raise exception when recorder is stopped
|
|
|
|
|
text = self.recorder.text()
|
|
|
|
|
if text and text.strip() and self.is_recording:
|
|
|
|
|
# This is always a final transcription
|
|
|
|
|
self._on_final_transcription(text)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# Expected when stopping - recorder.stop() will cause text() to raise exception
|
|
|
|
|
if self.is_recording: # Only print if we're still supposed to be recording
|
|
|
|
|
print(f"Error in transcription loop: {e}")
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
# Start the recorder
|
|
|
|
|
self.recorder.start()
|
|
|
|
|
|
|
|
|
|
# Start transcription loop in background thread
|
|
|
|
|
self.is_recording = True
|
|
|
|
|
self.transcription_thread = threading.Thread(target=transcription_loop, daemon=True)
|
|
|
|
|
self.transcription_thread.start()
|
|
|
|
|
|
|
|
|
|
print("Recording started")
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error starting recording: {e}")
|
|
|
|
|
self.is_recording = False
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def stop_recording(self):
|
|
|
|
|
"""Stop recording and transcription."""
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
# Check if already stopped
|
|
|
|
|
with self.lock:
|
|
|
|
|
if not self.is_recording:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Set flag first so transcription loop can exit
|
|
|
|
|
self.is_recording = False
|
|
|
|
|
|
|
|
|
|
# Stop the recorder outside the lock (it may block)
|
|
|
|
|
try:
|
|
|
|
|
if self.recorder:
|
|
|
|
|
# Stop the recorder - this should unblock the text() call
|
|
|
|
|
self.recorder.stop()
|
|
|
|
|
|
|
|
|
|
# Give the transcription thread a moment to exit cleanly
|
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
|
|
|
|
|
print("Recording stopped")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error stopping recording: {e}")
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
"""Stop recording and shutdown the engine completely."""
|
|
|
|
|
self.stop_recording()
|
|
|
|
|
|
|
|
|
|
with self.lock:
|
|
|
|
|
try:
|
|
|
|
|
if self.recorder:
|
|
|
|
|
self.recorder.shutdown()
|
|
|
|
|
self.recorder = None
|
|
|
|
|
|
|
|
|
|
self.is_initialized = False
|
|
|
|
|
print("RealtimeSTT shutdown")
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f"Error shutting down RealtimeSTT: {e}")
|
|
|
|
|
|
|
|
|
|
def is_recording_active(self) -> bool:
|
|
|
|
|
"""Check if recording is currently active."""
|
|
|
|
|
return self.is_recording
|
|
|
|
|
|
|
|
|
|
def is_ready(self) -> bool:
|
|
|
|
|
"""Check if engine is initialized and ready."""
|
|
|
|
|
return self.is_initialized
|
|
|
|
|
|
|
|
|
|
def change_model(self, model: str, realtime_model: Optional[str] = None) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Change the transcription model.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
model: New model for final transcription
|
|
|
|
|
realtime_model: Optional new model for realtime preview
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if model changed successfully
|
|
|
|
|
"""
|
2026-01-11 18:56:12 -08:00
|
|
|
was_running = self.is_recording
|
2025-12-28 18:48:29 -08:00
|
|
|
|
|
|
|
|
# Stop current recording
|
|
|
|
|
self.stop()
|
|
|
|
|
|
|
|
|
|
# Update configuration
|
|
|
|
|
self.model = model
|
|
|
|
|
self.config['model'] = model
|
|
|
|
|
|
|
|
|
|
if realtime_model:
|
|
|
|
|
self.realtime_model = realtime_model
|
|
|
|
|
self.config['realtime_model_type'] = realtime_model
|
|
|
|
|
|
|
|
|
|
# Restart if it was running
|
|
|
|
|
if was_running:
|
|
|
|
|
return self.start()
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def change_device(self, device: str, compute_type: Optional[str] = None) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Change compute device.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
device: New device ('auto', 'cuda', 'cpu')
|
|
|
|
|
compute_type: Optional new compute type
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
True if device changed successfully
|
|
|
|
|
"""
|
2026-01-11 18:56:12 -08:00
|
|
|
was_running = self.is_recording
|
2025-12-28 18:48:29 -08:00
|
|
|
|
|
|
|
|
# Stop current recording
|
|
|
|
|
self.stop()
|
|
|
|
|
|
|
|
|
|
# Update configuration
|
|
|
|
|
self.device = device
|
|
|
|
|
self.config['device'] = device
|
|
|
|
|
|
|
|
|
|
if compute_type:
|
|
|
|
|
self.compute_type = compute_type
|
|
|
|
|
self.config['compute_type'] = compute_type
|
|
|
|
|
|
|
|
|
|
# Restart if it was running
|
|
|
|
|
if was_running:
|
|
|
|
|
return self.start()
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def change_language(self, language: str):
|
|
|
|
|
"""
|
|
|
|
|
Change transcription language.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
language: Language code or 'auto'
|
|
|
|
|
"""
|
|
|
|
|
self.language = language
|
|
|
|
|
self.config['language'] = language if language != 'auto' else None
|
|
|
|
|
|
|
|
|
|
def update_vad_sensitivity(self, silero_sensitivity: float, webrtc_sensitivity: int):
|
|
|
|
|
"""
|
|
|
|
|
Update VAD sensitivity settings.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
silero_sensitivity: Silero VAD sensitivity (0.0-1.0)
|
|
|
|
|
webrtc_sensitivity: WebRTC VAD sensitivity (0-3)
|
|
|
|
|
"""
|
|
|
|
|
self.config['silero_sensitivity'] = silero_sensitivity
|
|
|
|
|
self.config['webrtc_sensitivity'] = webrtc_sensitivity
|
|
|
|
|
|
|
|
|
|
# If running, need to restart to apply changes
|
2026-01-11 18:56:12 -08:00
|
|
|
if self.is_recording:
|
2025-12-28 18:48:29 -08:00
|
|
|
print("VAD settings updated. Restart transcription to apply changes.")
|
|
|
|
|
|
|
|
|
|
def set_user_name(self, user_name: str):
|
|
|
|
|
"""Set the user name for transcriptions."""
|
|
|
|
|
self.user_name = user_name
|
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str:
|
2026-01-11 18:56:12 -08:00
|
|
|
return f"RealtimeTranscriptionEngine(model={self.model}, device={self.device}, running={self.is_recording})"
|
2025-12-28 18:48:29 -08:00
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
"""Cleanup when object is destroyed."""
|
|
|
|
|
self.stop()
|