Files
local-transcription/backend/app_controller.py

699 lines
29 KiB
Python
Raw Normal View History

"""Headless application controller for transcription backend.
Extracts orchestration logic from gui/main_window_qt.py into a
Qt-free class that manages engine lifecycle, web server, server sync,
and configuration -- all accessible via callbacks instead of Qt signals.
"""
import asyncio
import time
from datetime import datetime
from pathlib import Path
from threading import Thread, Lock
from typing import Callable, List, Optional
import sys
# Add project root to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from client.config import Config
from client.device_utils import DeviceManager
from client.transcription_engine_realtime import RealtimeTranscriptionEngine, TranscriptionResult
from client.deepgram_transcription import DeepgramTranscriptionEngine
from client.server_sync import ServerSyncClient
from server.web_display import TranscriptionWebServer
from version import __version__
class AppState:
"""Enum-like class for application states."""
INITIALIZING = "initializing"
READY = "ready"
TRANSCRIBING = "transcribing"
RELOADING = "reloading"
ERROR = "error"
class WebServerThread(Thread):
"""Thread for running the web server."""
def __init__(self, web_server: TranscriptionWebServer):
super().__init__(daemon=True)
self.web_server = web_server
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.error: Optional[Exception] = None
def run(self):
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.web_server.start())
except Exception as e:
self.error = e
print(f"ERROR: Web server failed to start: {e}")
class EngineInitThread(Thread):
"""Thread for initializing the transcription engine without blocking."""
def __init__(self, engine, on_complete: Callable[[bool, str], None]):
super().__init__(daemon=True)
self.engine = engine
self.on_complete = on_complete
def run(self):
try:
success = self.engine.initialize()
if success:
self.on_complete(True, "Engine initialized successfully")
else:
self.on_complete(False, "Failed to initialize engine")
except Exception as e:
self.on_complete(False, f"Error initializing engine: {e}")
class AppController:
"""Headless controller managing the transcription application lifecycle.
This replaces the orchestration logic that previously lived in MainWindow.
It manages:
- Transcription engine lifecycle (init, start, stop, reload)
- Web server for OBS display
- Server sync for multi-user mode
- Configuration
- Update checking
All state changes are communicated via callbacks, making it UI-agnostic.
"""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.device_manager = DeviceManager()
# State
self._state = AppState.INITIALIZING
self._state_lock = Lock()
self.is_transcribing = False
# Engine
self.transcription_engine = None
self._engine_init_thread: Optional[EngineInitThread] = None
self.current_model_size: Optional[str] = None
self.current_device_config: Optional[str] = None
# Web server
self.web_server: Optional[TranscriptionWebServer] = None
self.web_server_thread: Optional[WebServerThread] = None
self.actual_web_port: Optional[int] = None
# Server sync
self.server_sync_client: Optional[ServerSyncClient] = None
# Transcription storage
self.transcriptions: List[TranscriptionResult] = []
# Callbacks for state notifications (set by the frontend / API server)
self.on_state_changed: Optional[Callable[[str, str], None]] = None # (state, message)
self.on_transcription: Optional[Callable[[dict], None]] = None # final transcription
self.on_preview: Optional[Callable[[dict], None]] = None # realtime preview
self.on_error: Optional[Callable[[str], None]] = None
self.on_credits_low: Optional[Callable[[int], None]] = None
@property
def state(self) -> str:
with self._state_lock:
return self._state
def _set_state(self, state: str, message: str = ""):
with self._state_lock:
self._state = state
if self.on_state_changed:
self.on_state_changed(state, message)
# ── Lifecycle ──────────────────────────────────────────────────
def initialize(self):
"""Initialize the web server and transcription engine.
Call this once at startup. Non-blocking -- engine init happens
in a background thread.
"""
self._set_state(AppState.INITIALIZING, "Starting web server...")
self._start_web_server()
self._set_state(AppState.INITIALIZING, "Loading transcription engine...")
self._initialize_engine()
def shutdown(self):
"""Gracefully shut down all components."""
# Stop transcription
if self.is_transcribing:
self.stop_transcription()
# Stop web server
if self.web_server_thread and self.web_server_thread.is_alive():
try:
if self.web_server_thread.loop:
self.web_server_thread.loop.call_soon_threadsafe(
self.web_server_thread.loop.stop
)
except Exception as e:
print(f"Warning: Error stopping web server: {e}")
# Stop transcription engine
if self.transcription_engine:
try:
self.transcription_engine.stop()
except Exception as e:
print(f"Warning: Error stopping engine: {e}")
# Wait for engine init thread
if self._engine_init_thread and self._engine_init_thread.is_alive():
self._engine_init_thread.join(timeout=5)
# ── Web Server ─────────────────────────────────────────────────
def _start_web_server(self):
"""Start the FastAPI web server for OBS display."""
try:
host = self.config.get('web_server.host', '127.0.0.1')
port = self.config.get('web_server.port', 8080)
# Gather display settings
ws_kwargs = self._get_web_server_kwargs(host, port)
# Try up to 5 ports
ports_to_try = [port] + [port + i for i in range(1, 5)]
for try_port in ports_to_try:
print(f"Attempting to start web server at http://{host}:{try_port}")
ws_kwargs['port'] = try_port
self.web_server = TranscriptionWebServer(**ws_kwargs)
self.web_server_thread = WebServerThread(self.web_server)
self.web_server_thread.start()
time.sleep(0.5)
if self.web_server_thread.error:
error_str = str(self.web_server_thread.error)
if "address already in use" in error_str.lower() or "errno 98" in error_str.lower():
print(f"Port {try_port} is in use, trying next port...")
self.web_server = None
self.web_server_thread = None
continue
else:
print(f"Web server failed to start: {self.web_server_thread.error}")
self.web_server = None
self.web_server_thread = None
break
else:
self.actual_web_port = try_port
print(f"Web server started at http://{host}:{try_port}")
return
print(f"WARNING: Could not start web server on any port")
except Exception as e:
print(f"ERROR: Failed to initialize web server: {e}")
self.web_server = None
self.web_server_thread = None
def _get_web_server_kwargs(self, host: str, port: int) -> dict:
"""Build kwargs dict for TranscriptionWebServer from config."""
return dict(
host=host,
port=port,
show_timestamps=self.config.get('display.show_timestamps', True),
fade_after_seconds=self.config.get('display.fade_after_seconds', 10),
max_lines=self.config.get('display.max_lines', 50),
font_family=self.config.get('display.font_family', 'Arial'),
font_size=self.config.get('display.font_size', 16),
fonts_dir=self.config.fonts_dir,
font_source=self.config.get('display.font_source', 'System Font'),
websafe_font=self.config.get('display.websafe_font', 'Arial'),
google_font=self.config.get('display.google_font', 'Roboto'),
user_color=self.config.get('display.user_color', '#4CAF50'),
text_color=self.config.get('display.text_color', '#FFFFFF'),
background_color=self.config.get('display.background_color', '#000000B3'),
)
# ── Transcription Engine ───────────────────────────────────────
def _initialize_engine(self):
"""Initialize the transcription engine in a background thread."""
device_config = self.config.get('transcription.device', 'auto')
self.device_manager.set_device(device_config)
audio_device_str = self.config.get('audio.input_device', 'default')
audio_device = None if audio_device_str == 'default' else int(audio_device_str)
model = self.config.get('transcription.model', 'base.en')
language = self.config.get('transcription.language', 'en')
device = self.device_manager.get_device_for_whisper()
compute_type = self.config.get('transcription.compute_type', 'default')
self.current_model_size = model
self.current_device_config = device_config
user_name = self.config.get('user.name', 'User')
continuous_mode = self.config.get('transcription.continuous_mode', False)
if continuous_mode:
post_speech_silence = 0.15
min_gap = 0.0
min_recording = 0.3
else:
post_speech_silence = self.config.get('transcription.post_speech_silence_duration', 0.3)
min_gap = self.config.get('transcription.min_gap_between_recordings', 0.0)
min_recording = self.config.get('transcription.min_length_of_recording', 0.5)
remote_mode = self.config.get('remote.mode', 'local')
if remote_mode in ('managed', 'byok'):
self.transcription_engine = DeepgramTranscriptionEngine(
config=self.config,
user_name=user_name,
input_device_index=audio_device,
)
self.transcription_engine.set_callbacks(
realtime_callback=self._on_realtime_transcription,
final_callback=self._on_final_transcription,
)
self.transcription_engine.set_error_callback(self._on_remote_error)
self.transcription_engine.set_credits_low_callback(self._on_credits_low)
else:
self.transcription_engine = RealtimeTranscriptionEngine(
model=model,
device=device,
language=language,
compute_type=compute_type,
enable_realtime_transcription=self.config.get('transcription.enable_realtime_transcription', False),
realtime_model=self.config.get('transcription.realtime_model', 'tiny.en'),
realtime_processing_pause=self.config.get('transcription.realtime_processing_pause', 0.1),
silero_sensitivity=self.config.get('transcription.silero_sensitivity', 0.4),
silero_use_onnx=self.config.get('transcription.silero_use_onnx', True),
webrtc_sensitivity=self.config.get('transcription.webrtc_sensitivity', 3),
post_speech_silence_duration=post_speech_silence,
min_length_of_recording=min_recording,
min_gap_between_recordings=min_gap,
pre_recording_buffer_duration=self.config.get('transcription.pre_recording_buffer_duration', 0.2),
beam_size=self.config.get('transcription.beam_size', 5),
initial_prompt=self.config.get('transcription.initial_prompt', ''),
no_log_file=self.config.get('transcription.no_log_file', True),
input_device_index=audio_device,
user_name=user_name,
)
self.transcription_engine.set_callbacks(
realtime_callback=self._on_realtime_transcription,
final_callback=self._on_final_transcription,
)
# Start init in background thread
self._engine_init_thread = EngineInitThread(
self.transcription_engine,
self._on_engine_ready,
)
self._engine_init_thread.start()
def _on_engine_ready(self, success: bool, message: str):
"""Called from EngineInitThread when engine init completes."""
if success:
remote_mode = self.config.get('remote.mode', 'local')
if remote_mode in ('managed', 'byok'):
mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK'
device_display = f"Deepgram ({mode_label})"
elif self.transcription_engine:
actual_device = self.transcription_engine.device
compute_type = self.transcription_engine.compute_type
device_display = f"{actual_device.upper()} ({compute_type})"
else:
device_display = "Unknown"
self._set_state(AppState.READY, f"Ready | Device: {device_display}")
else:
self._set_state(AppState.ERROR, message)
# ── Transcription Control ──────────────────────────────────────
def start_transcription(self) -> tuple[bool, str]:
"""Start transcription. Returns (success, message)."""
if self.is_transcribing:
return False, "Already transcribing"
if not self.transcription_engine or not self.transcription_engine.is_ready():
return False, "Transcription engine not ready"
try:
success = self.transcription_engine.start_recording()
if not success:
return False, "Failed to start recording"
# Start server sync if enabled
if self.config.get('server_sync.enabled', False):
self._start_server_sync()
self.is_transcribing = True
self._set_state(AppState.TRANSCRIBING, "Transcribing...")
return True, "Transcription started"
except Exception as e:
return False, f"Failed to start transcription: {e}"
def stop_transcription(self) -> tuple[bool, str]:
"""Stop transcription. Returns (success, message)."""
if not self.is_transcribing:
return False, "Not transcribing"
try:
if self.transcription_engine:
self.transcription_engine.stop_recording()
if self.server_sync_client:
self.server_sync_client.stop()
self.server_sync_client = None
self.is_transcribing = False
self._set_state(AppState.READY, "Ready")
return True, "Transcription stopped"
except Exception as e:
return False, f"Failed to stop transcription: {e}"
def clear_transcriptions(self) -> int:
"""Clear stored transcriptions. Returns count of cleared items."""
count = len(self.transcriptions)
self.transcriptions.clear()
return count
def get_transcriptions_text(self, include_timestamps: bool = True) -> str:
"""Get all transcriptions as formatted text."""
lines = []
for result in self.transcriptions:
parts = []
if include_timestamps:
parts.append(f"[{result.timestamp.strftime('%H:%M:%S')}]")
if result.user_name and result.user_name.strip():
parts.append(f"{result.user_name}:")
parts.append(result.text)
lines.append(" ".join(parts))
return "\n".join(lines)
def reload_engine(self) -> tuple[bool, str]:
"""Reload the transcription engine with current config settings."""
try:
was_transcribing = self.is_transcribing
if was_transcribing:
self.stop_transcription()
self._set_state(AppState.RELOADING, "Reloading engine...")
# Wait for any existing init thread
if self._engine_init_thread and self._engine_init_thread.is_alive():
self._engine_init_thread.join(timeout=10)
# Stop current engine
if self.transcription_engine:
try:
self.transcription_engine.stop()
except Exception as e:
print(f"Warning: Error stopping engine: {e}")
# Re-initialize
self._initialize_engine()
return True, "Engine reload initiated"
except Exception as e:
self._set_state(AppState.ERROR, f"Engine reload failed: {e}")
return False, str(e)
# ── Transcription Callbacks ────────────────────────────────────
def _on_realtime_transcription(self, result: TranscriptionResult):
"""Handle realtime (preview) transcription."""
if not self.is_transcribing:
return
try:
# Broadcast to web server
if self.web_server and self.web_server_thread and self.web_server_thread.loop:
asyncio.run_coroutine_threadsafe(
self.web_server.broadcast_preview(
result.text, result.user_name, result.timestamp
),
self.web_server_thread.loop,
)
# Send to server sync
if self.server_sync_client:
self.server_sync_client.send_preview(result.text, result.timestamp)
# Notify frontend
if self.on_preview:
self.on_preview({
"text": result.text,
"user_name": result.user_name,
"timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None,
"is_preview": True,
})
except Exception as e:
print(f"Error handling realtime transcription: {e}")
def _on_final_transcription(self, result: TranscriptionResult):
"""Handle final transcription."""
if not self.is_transcribing:
return
try:
self.transcriptions.append(result)
# Broadcast to web server
if self.web_server and self.web_server_thread and self.web_server_thread.loop:
asyncio.run_coroutine_threadsafe(
self.web_server.broadcast_transcription(
result.text, result.user_name, result.timestamp
),
self.web_server_thread.loop,
)
# Send to server sync
if self.server_sync_client:
self.server_sync_client.send_transcription(
result.text, result.timestamp
)
# Notify frontend
if self.on_transcription:
self.on_transcription({
"text": result.text,
"user_name": result.user_name,
"timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None,
"is_preview": False,
})
except Exception as e:
print(f"Error handling final transcription: {e}")
def _on_remote_error(self, error_msg: str):
"""Handle error from remote transcription service."""
print(f"Remote transcription error: {error_msg}")
if self.on_error:
self.on_error(error_msg)
def _on_credits_low(self, seconds_remaining: int):
"""Handle low credits warning from proxy."""
if self.on_credits_low:
self.on_credits_low(seconds_remaining)
# ── Server Sync ────────────────────────────────────────────────
def _start_server_sync(self):
"""Start server sync client."""
try:
url = self.config.get('server_sync.url', '')
if not url:
print("Server sync enabled but no URL configured")
return
room = self.config.get('server_sync.room', 'default')
passphrase = self.config.get('server_sync.passphrase', '')
user_name = self.config.get('user.name', 'User')
fonts_dir = self.config.fonts_dir
font_source = self.config.get('display.font_source', 'System Font')
if font_source == "System Font":
font_source = "None"
self.server_sync_client = ServerSyncClient(
url=url,
room=room,
passphrase=passphrase,
user_name=user_name,
fonts_dir=fonts_dir,
font_source=font_source,
websafe_font=self.config.get('display.websafe_font', '') or None,
google_font=self.config.get('display.google_font', '') or None,
custom_font_file=self.config.get('display.custom_font_file', '') or None,
user_color=self.config.get('display.user_color', '#4CAF50'),
text_color=self.config.get('display.text_color', '#FFFFFF'),
background_color=self.config.get('display.background_color', '#000000B3'),
)
self.server_sync_client.start()
except Exception as e:
print(f"Error starting server sync: {e}")
# ── Configuration ──────────────────────────────────────────────
def apply_settings(self, new_config: Optional[dict] = None) -> tuple[bool, str]:
"""Apply settings changes. If new_config is provided, merge it first.
Returns (engine_reload_needed, message).
"""
if new_config:
for key, value in new_config.items():
self.config.set(key, value)
# Update web server display settings
if self.web_server:
self.web_server.show_timestamps = self.config.get('display.show_timestamps', True)
self.web_server.fade_after_seconds = self.config.get('display.fade_after_seconds', 10)
self.web_server.max_lines = self.config.get('display.max_lines', 50)
self.web_server.font_family = self.config.get('display.font_family', 'Arial')
self.web_server.font_size = self.config.get('display.font_size', 16)
self.web_server.font_source = self.config.get('display.font_source', 'System Font')
self.web_server.websafe_font = self.config.get('display.websafe_font', 'Arial')
self.web_server.google_font = self.config.get('display.google_font', 'Roboto')
self.web_server.user_color = self.config.get('display.user_color', '#4CAF50')
self.web_server.text_color = self.config.get('display.text_color', '#FFFFFF')
self.web_server.background_color = self.config.get('display.background_color', '#000000B3')
# Restart server sync if running
if self.is_transcribing and self.server_sync_client:
self.server_sync_client.stop()
self.server_sync_client = None
if self.config.get('server_sync.enabled', False):
self._start_server_sync()
# Check if model/device/remote mode changed -- any of these require
# a full engine reload since they change which engine class is used
new_model = self.config.get('transcription.model', 'base.en')
new_device = self.config.get('transcription.device', 'auto')
new_remote_mode = self.config.get('remote.mode', 'local')
current_remote_mode = 'local'
if self.transcription_engine:
current_remote_mode = getattr(self.transcription_engine, 'mode', 'local')
engine_reload_needed = (
self.current_model_size != new_model
or self.current_device_config != new_device
or current_remote_mode != new_remote_mode
)
if engine_reload_needed:
self.reload_engine()
return True, "Settings applied. Engine reloading with new model/device."
else:
return False, "Settings applied successfully."
def get_status(self) -> dict:
"""Get current application status as a dict."""
host = self.config.get('web_server.host', '127.0.0.1')
port = self.actual_web_port or self.config.get('web_server.port', 8080)
device_info = self.device_manager.get_device_info()
remote_mode = self.config.get('remote.mode', 'local')
if remote_mode in ('managed', 'byok') and self.transcription_engine:
mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK'
engine_device = f"Deepgram ({mode_label})"
elif self.transcription_engine and hasattr(self.transcription_engine, 'device'):
engine_device = f"{self.transcription_engine.device.upper()} ({self.transcription_engine.compute_type})"
else:
engine_device = "Not initialized"
return {
"state": self.state,
"is_transcribing": self.is_transcribing,
"version": __version__,
"engine_device": engine_device,
"web_server": {
"host": host,
"port": port,
"url": f"http://{host}:{port}",
"running": self.web_server_thread is not None and self.web_server_thread.is_alive(),
},
"transcription_count": len(self.transcriptions),
"remote_mode": remote_mode,
"server_sync_enabled": self.config.get('server_sync.enabled', False),
}
def get_audio_devices(self) -> list[dict]:
"""List available audio input devices."""
import sounddevice as sd
devices = []
try:
device_list = sd.query_devices()
for i, device in enumerate(device_list):
if device['max_input_channels'] > 0:
devices.append({"index": i, "name": device['name']})
except Exception:
pass
if not devices:
devices = [{"index": 0, "name": "Default"}]
return devices
def get_compute_devices(self) -> list[dict]:
"""List available compute devices."""
device_info = self.device_manager.get_device_info()
devices = [{"id": "auto", "name": "Auto-detect"}]
for dev_id, dev_name in device_info:
devices.append({"id": dev_id, "name": dev_name})
return devices
# ── Update Checking ────────────────────────────────────────────
def check_for_updates(self) -> dict:
"""Check for updates synchronously. Returns update info or None."""
from client.update_checker import UpdateChecker
gitea_url = self.config.get('updates.gitea_url', 'https://repo.anhonesthost.net')
owner = self.config.get('updates.owner', 'streamer-tools')
repo = self.config.get('updates.repo', 'local-transcription')
if not gitea_url or not owner or not repo:
return {"available": False, "error": "Update checking not configured"}
checker = UpdateChecker(
current_version=__version__,
gitea_url=gitea_url,
owner=owner,
repo=repo,
)
try:
release_info = checker.check_for_update()
self.config.set('updates.last_check', datetime.now().isoformat())
if release_info:
skipped = self.config.get('updates.skipped_versions', [])
return {
"available": True,
"version": release_info.version,
"download_url": release_info.download_url,
"release_notes": release_info.release_notes,
"skipped": release_info.version in skipped,
}
else:
return {"available": False, "current_version": __version__}
except Exception as e:
return {"available": False, "error": str(e)}
def skip_version(self, version: str):
"""Mark a version as skipped for update notifications."""
skipped = self.config.get('updates.skipped_versions', [])
if version not in skipped:
skipped.append(version)
self.config.set('updates.skipped_versions', skipped)