"""Headless application controller for transcription backend. Extracts orchestration logic from gui/main_window_qt.py into a Qt-free class that manages engine lifecycle, web server, server sync, and configuration -- all accessible via callbacks instead of Qt signals. """ import asyncio import time from datetime import datetime from pathlib import Path from threading import Thread, Lock from typing import Callable, List, Optional import sys # Add project root to path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from client.config import Config from client.device_utils import DeviceManager from client.transcription_engine_realtime import RealtimeTranscriptionEngine, TranscriptionResult from client.deepgram_transcription import DeepgramTranscriptionEngine from client.server_sync import ServerSyncClient from server.web_display import TranscriptionWebServer from version import __version__ class AppState: """Enum-like class for application states.""" INITIALIZING = "initializing" READY = "ready" TRANSCRIBING = "transcribing" RELOADING = "reloading" ERROR = "error" class WebServerThread(Thread): """Thread for running the web server.""" def __init__(self, web_server: TranscriptionWebServer): super().__init__(daemon=True) self.web_server = web_server self.loop: Optional[asyncio.AbstractEventLoop] = None self.error: Optional[Exception] = None def run(self): try: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self.loop.run_until_complete(self.web_server.start()) except Exception as e: self.error = e print(f"ERROR: Web server failed to start: {e}") class EngineInitThread(Thread): """Thread for initializing the transcription engine without blocking.""" def __init__(self, engine, on_complete: Callable[[bool, str], None]): super().__init__(daemon=True) self.engine = engine self.on_complete = on_complete def run(self): try: success = self.engine.initialize() if success: self.on_complete(True, "Engine initialized successfully") else: self.on_complete(False, "Failed to initialize engine") except Exception as e: self.on_complete(False, f"Error initializing engine: {e}") class AppController: """Headless controller managing the transcription application lifecycle. This replaces the orchestration logic that previously lived in MainWindow. It manages: - Transcription engine lifecycle (init, start, stop, reload) - Web server for OBS display - Server sync for multi-user mode - Configuration - Update checking All state changes are communicated via callbacks, making it UI-agnostic. """ def __init__(self, config: Optional[Config] = None): self.config = config or Config() self.device_manager = DeviceManager() # State self._state = AppState.INITIALIZING self._state_lock = Lock() self.is_transcribing = False # Engine self.transcription_engine = None self._engine_init_thread: Optional[EngineInitThread] = None self.current_model_size: Optional[str] = None self.current_device_config: Optional[str] = None # Web server self.web_server: Optional[TranscriptionWebServer] = None self.web_server_thread: Optional[WebServerThread] = None self.actual_web_port: Optional[int] = None # Server sync self.server_sync_client: Optional[ServerSyncClient] = None # Transcription storage self.transcriptions: List[TranscriptionResult] = [] # Callbacks for state notifications (set by the frontend / API server) self.on_state_changed: Optional[Callable[[str, str], None]] = None # (state, message) self.on_transcription: Optional[Callable[[dict], None]] = None # final transcription self.on_preview: Optional[Callable[[dict], None]] = None # realtime preview self.on_error: Optional[Callable[[str], None]] = None self.on_credits_low: Optional[Callable[[int], None]] = None @property def state(self) -> str: with self._state_lock: return self._state def _set_state(self, state: str, message: str = ""): with self._state_lock: self._state = state if self.on_state_changed: self.on_state_changed(state, message) # ── Lifecycle ────────────────────────────────────────────────── def initialize(self): """Initialize the web server and transcription engine. Call this once at startup. Non-blocking -- engine init happens in a background thread. """ self._set_state(AppState.INITIALIZING, "Starting web server...") self._start_web_server() self._set_state(AppState.INITIALIZING, "Loading transcription engine...") self._initialize_engine() def shutdown(self): """Gracefully shut down all components.""" # Stop transcription if self.is_transcribing: self.stop_transcription() # Stop web server if self.web_server_thread and self.web_server_thread.is_alive(): try: if self.web_server_thread.loop: self.web_server_thread.loop.call_soon_threadsafe( self.web_server_thread.loop.stop ) except Exception as e: print(f"Warning: Error stopping web server: {e}") # Stop transcription engine if self.transcription_engine: try: self.transcription_engine.stop() except Exception as e: print(f"Warning: Error stopping engine: {e}") # Wait for engine init thread if self._engine_init_thread and self._engine_init_thread.is_alive(): self._engine_init_thread.join(timeout=5) # ── Web Server ───────────────────────────────────────────────── def _start_web_server(self): """Start the FastAPI web server for OBS display.""" try: host = self.config.get('web_server.host', '127.0.0.1') port = self.config.get('web_server.port', 8080) # Gather display settings ws_kwargs = self._get_web_server_kwargs(host, port) # Try up to 5 ports ports_to_try = [port] + [port + i for i in range(1, 5)] for try_port in ports_to_try: print(f"Attempting to start web server at http://{host}:{try_port}") ws_kwargs['port'] = try_port self.web_server = TranscriptionWebServer(**ws_kwargs) self.web_server_thread = WebServerThread(self.web_server) self.web_server_thread.start() time.sleep(0.5) if self.web_server_thread.error: error_str = str(self.web_server_thread.error) if "address already in use" in error_str.lower() or "errno 98" in error_str.lower(): print(f"Port {try_port} is in use, trying next port...") self.web_server = None self.web_server_thread = None continue else: print(f"Web server failed to start: {self.web_server_thread.error}") self.web_server = None self.web_server_thread = None break else: self.actual_web_port = try_port print(f"Web server started at http://{host}:{try_port}") return print(f"WARNING: Could not start web server on any port") except Exception as e: print(f"ERROR: Failed to initialize web server: {e}") self.web_server = None self.web_server_thread = None def _get_web_server_kwargs(self, host: str, port: int) -> dict: """Build kwargs dict for TranscriptionWebServer from config.""" return dict( host=host, port=port, show_timestamps=self.config.get('display.show_timestamps', True), fade_after_seconds=self.config.get('display.fade_after_seconds', 10), max_lines=self.config.get('display.max_lines', 50), font_family=self.config.get('display.font_family', 'Arial'), font_size=self.config.get('display.font_size', 16), fonts_dir=self.config.fonts_dir, font_source=self.config.get('display.font_source', 'System Font'), websafe_font=self.config.get('display.websafe_font', 'Arial'), google_font=self.config.get('display.google_font', 'Roboto'), user_color=self.config.get('display.user_color', '#4CAF50'), text_color=self.config.get('display.text_color', '#FFFFFF'), background_color=self.config.get('display.background_color', '#000000B3'), ) # ── Transcription Engine ─────────────────────────────────────── def _initialize_engine(self): """Initialize the transcription engine in a background thread.""" device_config = self.config.get('transcription.device', 'auto') self.device_manager.set_device(device_config) audio_device_str = self.config.get('audio.input_device', 'default') audio_device = None if audio_device_str == 'default' else int(audio_device_str) model = self.config.get('transcription.model', 'base.en') language = self.config.get('transcription.language', 'en') device = self.device_manager.get_device_for_whisper() compute_type = self.config.get('transcription.compute_type', 'default') self.current_model_size = model self.current_device_config = device_config user_name = self.config.get('user.name', 'User') continuous_mode = self.config.get('transcription.continuous_mode', False) if continuous_mode: post_speech_silence = 0.15 min_gap = 0.0 min_recording = 0.3 else: post_speech_silence = self.config.get('transcription.post_speech_silence_duration', 0.3) min_gap = self.config.get('transcription.min_gap_between_recordings', 0.0) min_recording = self.config.get('transcription.min_length_of_recording', 0.5) remote_mode = self.config.get('remote.mode', 'local') if remote_mode in ('managed', 'byok'): self.transcription_engine = DeepgramTranscriptionEngine( config=self.config, user_name=user_name, input_device_index=audio_device, ) self.transcription_engine.set_callbacks( realtime_callback=self._on_realtime_transcription, final_callback=self._on_final_transcription, ) self.transcription_engine.set_error_callback(self._on_remote_error) self.transcription_engine.set_credits_low_callback(self._on_credits_low) else: self.transcription_engine = RealtimeTranscriptionEngine( model=model, device=device, language=language, compute_type=compute_type, enable_realtime_transcription=self.config.get('transcription.enable_realtime_transcription', False), realtime_model=self.config.get('transcription.realtime_model', 'tiny.en'), realtime_processing_pause=self.config.get('transcription.realtime_processing_pause', 0.1), silero_sensitivity=self.config.get('transcription.silero_sensitivity', 0.4), silero_use_onnx=self.config.get('transcription.silero_use_onnx', True), webrtc_sensitivity=self.config.get('transcription.webrtc_sensitivity', 3), post_speech_silence_duration=post_speech_silence, min_length_of_recording=min_recording, min_gap_between_recordings=min_gap, pre_recording_buffer_duration=self.config.get('transcription.pre_recording_buffer_duration', 0.2), beam_size=self.config.get('transcription.beam_size', 5), initial_prompt=self.config.get('transcription.initial_prompt', ''), no_log_file=self.config.get('transcription.no_log_file', True), input_device_index=audio_device, user_name=user_name, ) self.transcription_engine.set_callbacks( realtime_callback=self._on_realtime_transcription, final_callback=self._on_final_transcription, ) # Start init in background thread self._engine_init_thread = EngineInitThread( self.transcription_engine, self._on_engine_ready, ) self._engine_init_thread.start() def _on_engine_ready(self, success: bool, message: str): """Called from EngineInitThread when engine init completes.""" if success: remote_mode = self.config.get('remote.mode', 'local') if remote_mode in ('managed', 'byok'): mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK' device_display = f"Deepgram ({mode_label})" elif self.transcription_engine: actual_device = self.transcription_engine.device compute_type = self.transcription_engine.compute_type device_display = f"{actual_device.upper()} ({compute_type})" else: device_display = "Unknown" self._set_state(AppState.READY, f"Ready | Device: {device_display}") else: self._set_state(AppState.ERROR, message) # ── Transcription Control ────────────────────────────────────── def start_transcription(self) -> tuple[bool, str]: """Start transcription. Returns (success, message).""" if self.is_transcribing: return False, "Already transcribing" if not self.transcription_engine or not self.transcription_engine.is_ready(): return False, "Transcription engine not ready" try: success = self.transcription_engine.start_recording() if not success: return False, "Failed to start recording" # Start server sync if enabled if self.config.get('server_sync.enabled', False): self._start_server_sync() self.is_transcribing = True self._set_state(AppState.TRANSCRIBING, "Transcribing...") return True, "Transcription started" except Exception as e: return False, f"Failed to start transcription: {e}" def stop_transcription(self) -> tuple[bool, str]: """Stop transcription. Returns (success, message).""" if not self.is_transcribing: return False, "Not transcribing" try: if self.transcription_engine: self.transcription_engine.stop_recording() if self.server_sync_client: self.server_sync_client.stop() self.server_sync_client = None self.is_transcribing = False self._set_state(AppState.READY, "Ready") return True, "Transcription stopped" except Exception as e: return False, f"Failed to stop transcription: {e}" def clear_transcriptions(self) -> int: """Clear stored transcriptions. Returns count of cleared items.""" count = len(self.transcriptions) self.transcriptions.clear() return count def get_transcriptions_text(self, include_timestamps: bool = True) -> str: """Get all transcriptions as formatted text.""" lines = [] for result in self.transcriptions: parts = [] if include_timestamps: parts.append(f"[{result.timestamp.strftime('%H:%M:%S')}]") if result.user_name and result.user_name.strip(): parts.append(f"{result.user_name}:") parts.append(result.text) lines.append(" ".join(parts)) return "\n".join(lines) def reload_engine(self) -> tuple[bool, str]: """Reload the transcription engine with current config settings.""" try: was_transcribing = self.is_transcribing if was_transcribing: self.stop_transcription() self._set_state(AppState.RELOADING, "Reloading engine...") # Wait for any existing init thread if self._engine_init_thread and self._engine_init_thread.is_alive(): self._engine_init_thread.join(timeout=10) # Stop current engine if self.transcription_engine: try: self.transcription_engine.stop() except Exception as e: print(f"Warning: Error stopping engine: {e}") # Re-initialize self._initialize_engine() return True, "Engine reload initiated" except Exception as e: self._set_state(AppState.ERROR, f"Engine reload failed: {e}") return False, str(e) # ── Transcription Callbacks ──────────────────────────────────── def _on_realtime_transcription(self, result: TranscriptionResult): """Handle realtime (preview) transcription.""" if not self.is_transcribing: return try: # Broadcast to web server if self.web_server and self.web_server_thread and self.web_server_thread.loop: asyncio.run_coroutine_threadsafe( self.web_server.broadcast_preview( result.text, result.user_name, result.timestamp ), self.web_server_thread.loop, ) # Send to server sync if self.server_sync_client: self.server_sync_client.send_preview(result.text, result.timestamp) # Notify frontend if self.on_preview: self.on_preview({ "text": result.text, "user_name": result.user_name, "timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None, "is_preview": True, }) except Exception as e: print(f"Error handling realtime transcription: {e}") def _on_final_transcription(self, result: TranscriptionResult): """Handle final transcription.""" if not self.is_transcribing: return try: self.transcriptions.append(result) # Broadcast to web server if self.web_server and self.web_server_thread and self.web_server_thread.loop: asyncio.run_coroutine_threadsafe( self.web_server.broadcast_transcription( result.text, result.user_name, result.timestamp ), self.web_server_thread.loop, ) # Send to server sync if self.server_sync_client: self.server_sync_client.send_transcription( result.text, result.timestamp ) # Notify frontend if self.on_transcription: self.on_transcription({ "text": result.text, "user_name": result.user_name, "timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None, "is_preview": False, }) except Exception as e: print(f"Error handling final transcription: {e}") def _on_remote_error(self, error_msg: str): """Handle error from remote transcription service.""" print(f"Remote transcription error: {error_msg}") if self.on_error: self.on_error(error_msg) def _on_credits_low(self, seconds_remaining: int): """Handle low credits warning from proxy.""" if self.on_credits_low: self.on_credits_low(seconds_remaining) # ── Server Sync ──────────────────────────────────────────────── def _start_server_sync(self): """Start server sync client.""" try: url = self.config.get('server_sync.url', '') if not url: print("Server sync enabled but no URL configured") return room = self.config.get('server_sync.room', 'default') passphrase = self.config.get('server_sync.passphrase', '') user_name = self.config.get('user.name', 'User') fonts_dir = self.config.fonts_dir font_source = self.config.get('display.font_source', 'System Font') if font_source == "System Font": font_source = "None" self.server_sync_client = ServerSyncClient( url=url, room=room, passphrase=passphrase, user_name=user_name, fonts_dir=fonts_dir, font_source=font_source, websafe_font=self.config.get('display.websafe_font', '') or None, google_font=self.config.get('display.google_font', '') or None, custom_font_file=self.config.get('display.custom_font_file', '') or None, user_color=self.config.get('display.user_color', '#4CAF50'), text_color=self.config.get('display.text_color', '#FFFFFF'), background_color=self.config.get('display.background_color', '#000000B3'), ) self.server_sync_client.start() except Exception as e: print(f"Error starting server sync: {e}") # ── Configuration ────────────────────────────────────────────── def apply_settings(self, new_config: Optional[dict] = None) -> tuple[bool, str]: """Apply settings changes. If new_config is provided, merge it first. Returns (engine_reload_needed, message). """ if new_config: for key, value in new_config.items(): self.config.set(key, value) # Update web server display settings if self.web_server: self.web_server.show_timestamps = self.config.get('display.show_timestamps', True) self.web_server.fade_after_seconds = self.config.get('display.fade_after_seconds', 10) self.web_server.max_lines = self.config.get('display.max_lines', 50) self.web_server.font_family = self.config.get('display.font_family', 'Arial') self.web_server.font_size = self.config.get('display.font_size', 16) self.web_server.font_source = self.config.get('display.font_source', 'System Font') self.web_server.websafe_font = self.config.get('display.websafe_font', 'Arial') self.web_server.google_font = self.config.get('display.google_font', 'Roboto') self.web_server.user_color = self.config.get('display.user_color', '#4CAF50') self.web_server.text_color = self.config.get('display.text_color', '#FFFFFF') self.web_server.background_color = self.config.get('display.background_color', '#000000B3') # Restart server sync if running if self.is_transcribing and self.server_sync_client: self.server_sync_client.stop() self.server_sync_client = None if self.config.get('server_sync.enabled', False): self._start_server_sync() # Check if model/device changed new_model = self.config.get('transcription.model', 'base.en') new_device = self.config.get('transcription.device', 'auto') engine_reload_needed = ( self.current_model_size != new_model or self.current_device_config != new_device ) if engine_reload_needed: self.reload_engine() return True, "Settings applied. Engine reloading with new model/device." else: return False, "Settings applied successfully." def get_status(self) -> dict: """Get current application status as a dict.""" host = self.config.get('web_server.host', '127.0.0.1') port = self.actual_web_port or self.config.get('web_server.port', 8080) device_info = self.device_manager.get_device_info() remote_mode = self.config.get('remote.mode', 'local') if remote_mode in ('managed', 'byok') and self.transcription_engine: mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK' engine_device = f"Deepgram ({mode_label})" elif self.transcription_engine and hasattr(self.transcription_engine, 'device'): engine_device = f"{self.transcription_engine.device.upper()} ({self.transcription_engine.compute_type})" else: engine_device = "Not initialized" return { "state": self.state, "is_transcribing": self.is_transcribing, "version": __version__, "engine_device": engine_device, "web_server": { "host": host, "port": port, "url": f"http://{host}:{port}", "running": self.web_server_thread is not None and self.web_server_thread.is_alive(), }, "transcription_count": len(self.transcriptions), "remote_mode": remote_mode, "server_sync_enabled": self.config.get('server_sync.enabled', False), } def get_audio_devices(self) -> list[dict]: """List available audio input devices.""" import sounddevice as sd devices = [] try: device_list = sd.query_devices() for i, device in enumerate(device_list): if device['max_input_channels'] > 0: devices.append({"index": i, "name": device['name']}) except Exception: pass if not devices: devices = [{"index": 0, "name": "Default"}] return devices def get_compute_devices(self) -> list[dict]: """List available compute devices.""" device_info = self.device_manager.get_device_info() devices = [{"id": "auto", "name": "Auto-detect"}] for dev_id, dev_name in device_info: devices.append({"id": dev_id, "name": dev_name}) return devices # ── Update Checking ──────────────────────────────────────────── def check_for_updates(self) -> dict: """Check for updates synchronously. Returns update info or None.""" from client.update_checker import UpdateChecker gitea_url = self.config.get('updates.gitea_url', 'https://repo.anhonesthost.net') owner = self.config.get('updates.owner', 'streamer-tools') repo = self.config.get('updates.repo', 'local-transcription') if not gitea_url or not owner or not repo: return {"available": False, "error": "Update checking not configured"} checker = UpdateChecker( current_version=__version__, gitea_url=gitea_url, owner=owner, repo=repo, ) try: release_info = checker.check_for_update() self.config.set('updates.last_check', datetime.now().isoformat()) if release_info: skipped = self.config.get('updates.skipped_versions', []) return { "available": True, "version": release_info.version, "download_url": release_info.download_url, "release_notes": release_info.release_notes, "skipped": release_info.version in skipped, } else: return {"available": False, "current_version": __version__} except Exception as e: return {"available": False, "error": str(e)} def skip_version(self, version: str): """Mark a version as skipped for update notifications.""" skipped = self.config.get('updates.skipped_versions', []) if version not in skipped: skipped.append(version) self.config.set('updates.skipped_versions', skipped)