The cloud sidecar excludes the local Whisper engine module, but on first launch the config defaults to remote.mode="local" which tries to import it. Now catches the ImportError gracefully and shows an error message telling the user to switch to Cloud (Deepgram) mode in Settings. The API server still starts so Settings is accessible. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
736 lines
31 KiB
Python
736 lines
31 KiB
Python
"""Headless application controller for transcription backend.
|
|
|
|
Extracts orchestration logic from gui/main_window_qt.py into a
|
|
Qt-free class that manages engine lifecycle, web server, server sync,
|
|
and configuration -- all accessible via callbacks instead of Qt signals.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from threading import Thread, Lock
|
|
from typing import Callable, List, Optional
|
|
|
|
import sys
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from client.config import Config
|
|
from client.models import TranscriptionResult
|
|
from client.deepgram_transcription import DeepgramTranscriptionEngine
|
|
from client.server_sync import ServerSyncClient
|
|
from server.web_display import TranscriptionWebServer
|
|
from version import __version__
|
|
|
|
# Heavy imports (torch, RealtimeSTT, faster-whisper) are deferred so
|
|
# the cloud-only sidecar build can exclude them entirely.
|
|
# Imported lazily in _initialize_engine() when remote.mode == "local".
|
|
RealtimeTranscriptionEngine = None
|
|
DeviceManager = None
|
|
|
|
|
|
class AppState:
|
|
"""Enum-like class for application states."""
|
|
INITIALIZING = "initializing"
|
|
READY = "ready"
|
|
TRANSCRIBING = "transcribing"
|
|
RELOADING = "reloading"
|
|
ERROR = "error"
|
|
|
|
|
|
class WebServerThread(Thread):
|
|
"""Thread for running the web server."""
|
|
|
|
def __init__(self, web_server: TranscriptionWebServer):
|
|
super().__init__(daemon=True)
|
|
self.web_server = web_server
|
|
self.loop: Optional[asyncio.AbstractEventLoop] = None
|
|
self.error: Optional[Exception] = None
|
|
|
|
def run(self):
|
|
try:
|
|
self.loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.loop)
|
|
self.loop.run_until_complete(self.web_server.start())
|
|
except Exception as e:
|
|
self.error = e
|
|
print(f"ERROR: Web server failed to start: {e}")
|
|
|
|
|
|
class EngineInitThread(Thread):
|
|
"""Thread for initializing the transcription engine without blocking."""
|
|
|
|
def __init__(self, engine, on_complete: Callable[[bool, str], None]):
|
|
super().__init__(daemon=True)
|
|
self.engine = engine
|
|
self.on_complete = on_complete
|
|
|
|
def run(self):
|
|
try:
|
|
success = self.engine.initialize()
|
|
if success:
|
|
self.on_complete(True, "Engine initialized successfully")
|
|
else:
|
|
self.on_complete(False, "Failed to initialize engine")
|
|
except Exception as e:
|
|
self.on_complete(False, f"Error initializing engine: {e}")
|
|
|
|
|
|
class AppController:
|
|
"""Headless controller managing the transcription application lifecycle.
|
|
|
|
This replaces the orchestration logic that previously lived in MainWindow.
|
|
It manages:
|
|
- Transcription engine lifecycle (init, start, stop, reload)
|
|
- Web server for OBS display
|
|
- Server sync for multi-user mode
|
|
- Configuration
|
|
- Update checking
|
|
|
|
All state changes are communicated via callbacks, making it UI-agnostic.
|
|
"""
|
|
|
|
def __init__(self, config: Optional[Config] = None):
|
|
self.config = config or Config()
|
|
|
|
# DeviceManager is only needed for local Whisper mode.
|
|
# Lazy-import to keep the cloud-only sidecar lightweight.
|
|
global DeviceManager
|
|
if DeviceManager is None:
|
|
try:
|
|
from client.device_utils import DeviceManager as _DM
|
|
DeviceManager = _DM
|
|
except ImportError:
|
|
DeviceManager = None
|
|
|
|
self.device_manager = DeviceManager() if DeviceManager else None
|
|
|
|
# State
|
|
self._state = AppState.INITIALIZING
|
|
self._state_lock = Lock()
|
|
self.is_transcribing = False
|
|
|
|
# Engine
|
|
self.transcription_engine = None
|
|
self._engine_init_thread: Optional[EngineInitThread] = None
|
|
self.current_model_size: Optional[str] = None
|
|
self.current_device_config: Optional[str] = None
|
|
|
|
# Web server
|
|
self.web_server: Optional[TranscriptionWebServer] = None
|
|
self.web_server_thread: Optional[WebServerThread] = None
|
|
self.actual_web_port: Optional[int] = None
|
|
|
|
# Server sync
|
|
self.server_sync_client: Optional[ServerSyncClient] = None
|
|
|
|
# Transcription storage
|
|
self.transcriptions: List[TranscriptionResult] = []
|
|
|
|
# Callbacks for state notifications (set by the frontend / API server)
|
|
self.on_state_changed: Optional[Callable[[str, str], None]] = None # (state, message)
|
|
self.on_transcription: Optional[Callable[[dict], None]] = None # final transcription
|
|
self.on_preview: Optional[Callable[[dict], None]] = None # realtime preview
|
|
self.on_error: Optional[Callable[[str], None]] = None
|
|
self.on_credits_low: Optional[Callable[[int], None]] = None
|
|
|
|
@property
|
|
def state(self) -> str:
|
|
with self._state_lock:
|
|
return self._state
|
|
|
|
def _set_state(self, state: str, message: str = ""):
|
|
with self._state_lock:
|
|
self._state = state
|
|
if self.on_state_changed:
|
|
self.on_state_changed(state, message)
|
|
|
|
# ── Lifecycle ──────────────────────────────────────────────────
|
|
|
|
def initialize(self):
|
|
"""Initialize the web server and transcription engine.
|
|
|
|
Call this once at startup. Non-blocking -- engine init happens
|
|
in a background thread.
|
|
"""
|
|
self._set_state(AppState.INITIALIZING, "Starting web server...")
|
|
self._start_web_server()
|
|
|
|
self._set_state(AppState.INITIALIZING, "Loading transcription engine...")
|
|
self._initialize_engine()
|
|
|
|
def shutdown(self):
|
|
"""Gracefully shut down all components."""
|
|
# Stop transcription
|
|
if self.is_transcribing:
|
|
self.stop_transcription()
|
|
|
|
# Stop web server
|
|
if self.web_server_thread and self.web_server_thread.is_alive():
|
|
try:
|
|
if self.web_server_thread.loop:
|
|
self.web_server_thread.loop.call_soon_threadsafe(
|
|
self.web_server_thread.loop.stop
|
|
)
|
|
except Exception as e:
|
|
print(f"Warning: Error stopping web server: {e}")
|
|
|
|
# Stop transcription engine
|
|
if self.transcription_engine:
|
|
try:
|
|
self.transcription_engine.stop()
|
|
except Exception as e:
|
|
print(f"Warning: Error stopping engine: {e}")
|
|
|
|
# Wait for engine init thread
|
|
if self._engine_init_thread and self._engine_init_thread.is_alive():
|
|
self._engine_init_thread.join(timeout=5)
|
|
|
|
# ── Web Server ─────────────────────────────────────────────────
|
|
|
|
def _start_web_server(self):
|
|
"""Start the FastAPI web server for OBS display."""
|
|
try:
|
|
host = self.config.get('web_server.host', '127.0.0.1')
|
|
port = self.config.get('web_server.port', 8080)
|
|
|
|
# Gather display settings
|
|
ws_kwargs = self._get_web_server_kwargs(host, port)
|
|
|
|
# Try up to 5 ports
|
|
ports_to_try = [port] + [port + i for i in range(1, 5)]
|
|
|
|
for try_port in ports_to_try:
|
|
print(f"Attempting to start web server at http://{host}:{try_port}")
|
|
ws_kwargs['port'] = try_port
|
|
|
|
self.web_server = TranscriptionWebServer(**ws_kwargs)
|
|
self.web_server_thread = WebServerThread(self.web_server)
|
|
self.web_server_thread.start()
|
|
|
|
time.sleep(0.5)
|
|
|
|
if self.web_server_thread.error:
|
|
error_str = str(self.web_server_thread.error)
|
|
if "address already in use" in error_str.lower() or "errno 98" in error_str.lower():
|
|
print(f"Port {try_port} is in use, trying next port...")
|
|
self.web_server = None
|
|
self.web_server_thread = None
|
|
continue
|
|
else:
|
|
print(f"Web server failed to start: {self.web_server_thread.error}")
|
|
self.web_server = None
|
|
self.web_server_thread = None
|
|
break
|
|
else:
|
|
self.actual_web_port = try_port
|
|
print(f"Web server started at http://{host}:{try_port}")
|
|
return
|
|
|
|
print(f"WARNING: Could not start web server on any port")
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: Failed to initialize web server: {e}")
|
|
self.web_server = None
|
|
self.web_server_thread = None
|
|
|
|
def _get_web_server_kwargs(self, host: str, port: int) -> dict:
|
|
"""Build kwargs dict for TranscriptionWebServer from config."""
|
|
return dict(
|
|
host=host,
|
|
port=port,
|
|
show_timestamps=self.config.get('display.show_timestamps', True),
|
|
fade_after_seconds=self.config.get('display.fade_after_seconds', 10),
|
|
max_lines=self.config.get('display.max_lines', 50),
|
|
font_family=self.config.get('display.font_family', 'Arial'),
|
|
font_size=self.config.get('display.font_size', 16),
|
|
fonts_dir=self.config.fonts_dir,
|
|
font_source=self.config.get('display.font_source', 'System Font'),
|
|
websafe_font=self.config.get('display.websafe_font', 'Arial'),
|
|
google_font=self.config.get('display.google_font', 'Roboto'),
|
|
user_color=self.config.get('display.user_color', '#4CAF50'),
|
|
text_color=self.config.get('display.text_color', '#FFFFFF'),
|
|
background_color=self.config.get('display.background_color', '#000000B3'),
|
|
)
|
|
|
|
# ── Transcription Engine ───────────────────────────────────────
|
|
|
|
def _initialize_engine(self):
|
|
"""Initialize the transcription engine in a background thread."""
|
|
audio_device_str = self.config.get('audio.input_device', 'default')
|
|
audio_device = None if audio_device_str == 'default' else int(audio_device_str)
|
|
|
|
model = self.config.get('transcription.model', 'base.en')
|
|
language = self.config.get('transcription.language', 'en')
|
|
device_config = self.config.get('transcription.device', 'auto')
|
|
compute_type = self.config.get('transcription.compute_type', 'default')
|
|
|
|
self.current_model_size = model
|
|
self.current_device_config = device_config
|
|
|
|
user_name = self.config.get('user.name', 'User')
|
|
continuous_mode = self.config.get('transcription.continuous_mode', False)
|
|
|
|
if continuous_mode:
|
|
post_speech_silence = 0.15
|
|
min_gap = 0.0
|
|
min_recording = 0.3
|
|
else:
|
|
post_speech_silence = self.config.get('transcription.post_speech_silence_duration', 0.3)
|
|
min_gap = self.config.get('transcription.min_gap_between_recordings', 0.0)
|
|
min_recording = self.config.get('transcription.min_length_of_recording', 0.5)
|
|
|
|
remote_mode = self.config.get('remote.mode', 'local')
|
|
|
|
if remote_mode in ('managed', 'byok'):
|
|
self.transcription_engine = DeepgramTranscriptionEngine(
|
|
config=self.config,
|
|
user_name=user_name,
|
|
input_device_index=audio_device,
|
|
)
|
|
self.transcription_engine.set_callbacks(
|
|
realtime_callback=self._on_realtime_transcription,
|
|
final_callback=self._on_final_transcription,
|
|
)
|
|
self.transcription_engine.set_error_callback(self._on_remote_error)
|
|
self.transcription_engine.set_credits_low_callback(self._on_credits_low)
|
|
else:
|
|
# Lazy-import heavy local transcription dependencies
|
|
global RealtimeTranscriptionEngine
|
|
if RealtimeTranscriptionEngine is None:
|
|
try:
|
|
from client.transcription_engine_realtime import RealtimeTranscriptionEngine as _RTE
|
|
RealtimeTranscriptionEngine = _RTE
|
|
except ImportError:
|
|
# Cloud-only sidecar -- local engine not available
|
|
self._set_state(
|
|
AppState.ERROR,
|
|
"Local transcription not available in this build. "
|
|
"Please switch to Cloud (Deepgram) mode in Settings."
|
|
)
|
|
return
|
|
|
|
if self.device_manager:
|
|
self.device_manager.set_device(device_config)
|
|
device = self.device_manager.get_device_for_whisper()
|
|
else:
|
|
device = "cpu"
|
|
|
|
self.transcription_engine = RealtimeTranscriptionEngine(
|
|
model=model,
|
|
device=device,
|
|
language=language,
|
|
compute_type=compute_type,
|
|
enable_realtime_transcription=self.config.get('transcription.enable_realtime_transcription', False),
|
|
realtime_model=self.config.get('transcription.realtime_model', 'tiny.en'),
|
|
realtime_processing_pause=self.config.get('transcription.realtime_processing_pause', 0.1),
|
|
silero_sensitivity=self.config.get('transcription.silero_sensitivity', 0.4),
|
|
silero_use_onnx=self.config.get('transcription.silero_use_onnx', True),
|
|
webrtc_sensitivity=self.config.get('transcription.webrtc_sensitivity', 3),
|
|
post_speech_silence_duration=post_speech_silence,
|
|
min_length_of_recording=min_recording,
|
|
min_gap_between_recordings=min_gap,
|
|
pre_recording_buffer_duration=self.config.get('transcription.pre_recording_buffer_duration', 0.2),
|
|
beam_size=self.config.get('transcription.beam_size', 5),
|
|
initial_prompt=self.config.get('transcription.initial_prompt', ''),
|
|
no_log_file=self.config.get('transcription.no_log_file', True),
|
|
input_device_index=audio_device,
|
|
user_name=user_name,
|
|
)
|
|
self.transcription_engine.set_callbacks(
|
|
realtime_callback=self._on_realtime_transcription,
|
|
final_callback=self._on_final_transcription,
|
|
)
|
|
|
|
# Start init in background thread
|
|
self._engine_init_thread = EngineInitThread(
|
|
self.transcription_engine,
|
|
self._on_engine_ready,
|
|
)
|
|
self._engine_init_thread.start()
|
|
|
|
def _on_engine_ready(self, success: bool, message: str):
|
|
"""Called from EngineInitThread when engine init completes."""
|
|
if success:
|
|
remote_mode = self.config.get('remote.mode', 'local')
|
|
if remote_mode in ('managed', 'byok'):
|
|
mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK'
|
|
device_display = f"Deepgram ({mode_label})"
|
|
elif self.transcription_engine:
|
|
actual_device = self.transcription_engine.device
|
|
compute_type = self.transcription_engine.compute_type
|
|
device_display = f"{actual_device.upper()} ({compute_type})"
|
|
else:
|
|
device_display = "Unknown"
|
|
|
|
self._set_state(AppState.READY, f"Ready | Device: {device_display}")
|
|
else:
|
|
self._set_state(AppState.ERROR, message)
|
|
|
|
# ── Transcription Control ──────────────────────────────────────
|
|
|
|
def start_transcription(self) -> tuple[bool, str]:
|
|
"""Start transcription. Returns (success, message)."""
|
|
if self.is_transcribing:
|
|
return False, "Already transcribing"
|
|
|
|
if not self.transcription_engine or not self.transcription_engine.is_ready():
|
|
return False, "Transcription engine not ready"
|
|
|
|
try:
|
|
success = self.transcription_engine.start_recording()
|
|
if not success:
|
|
return False, "Failed to start recording"
|
|
|
|
# Start server sync if enabled
|
|
if self.config.get('server_sync.enabled', False):
|
|
self._start_server_sync()
|
|
|
|
self.is_transcribing = True
|
|
self._set_state(AppState.TRANSCRIBING, "Transcribing...")
|
|
return True, "Transcription started"
|
|
|
|
except Exception as e:
|
|
return False, f"Failed to start transcription: {e}"
|
|
|
|
def stop_transcription(self) -> tuple[bool, str]:
|
|
"""Stop transcription. Returns (success, message)."""
|
|
if not self.is_transcribing:
|
|
return False, "Not transcribing"
|
|
|
|
try:
|
|
if self.transcription_engine:
|
|
self.transcription_engine.stop_recording()
|
|
|
|
if self.server_sync_client:
|
|
self.server_sync_client.stop()
|
|
self.server_sync_client = None
|
|
|
|
self.is_transcribing = False
|
|
self._set_state(AppState.READY, "Ready")
|
|
return True, "Transcription stopped"
|
|
|
|
except Exception as e:
|
|
return False, f"Failed to stop transcription: {e}"
|
|
|
|
def clear_transcriptions(self) -> int:
|
|
"""Clear stored transcriptions. Returns count of cleared items."""
|
|
count = len(self.transcriptions)
|
|
self.transcriptions.clear()
|
|
return count
|
|
|
|
def get_transcriptions_text(self, include_timestamps: bool = True) -> str:
|
|
"""Get all transcriptions as formatted text."""
|
|
lines = []
|
|
for result in self.transcriptions:
|
|
parts = []
|
|
if include_timestamps:
|
|
parts.append(f"[{result.timestamp.strftime('%H:%M:%S')}]")
|
|
if result.user_name and result.user_name.strip():
|
|
parts.append(f"{result.user_name}:")
|
|
parts.append(result.text)
|
|
lines.append(" ".join(parts))
|
|
return "\n".join(lines)
|
|
|
|
def reload_engine(self) -> tuple[bool, str]:
|
|
"""Reload the transcription engine with current config settings."""
|
|
try:
|
|
was_transcribing = self.is_transcribing
|
|
if was_transcribing:
|
|
self.stop_transcription()
|
|
|
|
self._set_state(AppState.RELOADING, "Reloading engine...")
|
|
|
|
# Wait for any existing init thread
|
|
if self._engine_init_thread and self._engine_init_thread.is_alive():
|
|
self._engine_init_thread.join(timeout=10)
|
|
|
|
# Stop current engine
|
|
if self.transcription_engine:
|
|
try:
|
|
self.transcription_engine.stop()
|
|
except Exception as e:
|
|
print(f"Warning: Error stopping engine: {e}")
|
|
|
|
# Re-initialize
|
|
self._initialize_engine()
|
|
return True, "Engine reload initiated"
|
|
|
|
except Exception as e:
|
|
self._set_state(AppState.ERROR, f"Engine reload failed: {e}")
|
|
return False, str(e)
|
|
|
|
# ── Transcription Callbacks ────────────────────────────────────
|
|
|
|
def _on_realtime_transcription(self, result: TranscriptionResult):
|
|
"""Handle realtime (preview) transcription."""
|
|
if not self.is_transcribing:
|
|
return
|
|
|
|
try:
|
|
# Broadcast to web server
|
|
if self.web_server and self.web_server_thread and self.web_server_thread.loop:
|
|
asyncio.run_coroutine_threadsafe(
|
|
self.web_server.broadcast_preview(
|
|
result.text, result.user_name, result.timestamp
|
|
),
|
|
self.web_server_thread.loop,
|
|
)
|
|
|
|
# Send to server sync
|
|
if self.server_sync_client:
|
|
self.server_sync_client.send_preview(result.text, result.timestamp)
|
|
|
|
# Notify frontend
|
|
if self.on_preview:
|
|
self.on_preview({
|
|
"text": result.text,
|
|
"user_name": result.user_name,
|
|
"timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None,
|
|
"is_preview": True,
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Error handling realtime transcription: {e}")
|
|
|
|
def _on_final_transcription(self, result: TranscriptionResult):
|
|
"""Handle final transcription."""
|
|
if not self.is_transcribing:
|
|
return
|
|
|
|
try:
|
|
self.transcriptions.append(result)
|
|
|
|
# Broadcast to web server
|
|
if self.web_server and self.web_server_thread and self.web_server_thread.loop:
|
|
asyncio.run_coroutine_threadsafe(
|
|
self.web_server.broadcast_transcription(
|
|
result.text, result.user_name, result.timestamp
|
|
),
|
|
self.web_server_thread.loop,
|
|
)
|
|
|
|
# Send to server sync
|
|
if self.server_sync_client:
|
|
self.server_sync_client.send_transcription(
|
|
result.text, result.timestamp
|
|
)
|
|
|
|
# Notify frontend
|
|
if self.on_transcription:
|
|
self.on_transcription({
|
|
"text": result.text,
|
|
"user_name": result.user_name,
|
|
"timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None,
|
|
"is_preview": False,
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Error handling final transcription: {e}")
|
|
|
|
def _on_remote_error(self, error_msg: str):
|
|
"""Handle error from remote transcription service."""
|
|
print(f"Remote transcription error: {error_msg}")
|
|
if self.on_error:
|
|
self.on_error(error_msg)
|
|
|
|
def _on_credits_low(self, seconds_remaining: int):
|
|
"""Handle low credits warning from proxy."""
|
|
if self.on_credits_low:
|
|
self.on_credits_low(seconds_remaining)
|
|
|
|
# ── Server Sync ────────────────────────────────────────────────
|
|
|
|
def _start_server_sync(self):
|
|
"""Start server sync client."""
|
|
try:
|
|
url = self.config.get('server_sync.url', '')
|
|
if not url:
|
|
print("Server sync enabled but no URL configured")
|
|
return
|
|
|
|
room = self.config.get('server_sync.room', 'default')
|
|
passphrase = self.config.get('server_sync.passphrase', '')
|
|
user_name = self.config.get('user.name', 'User')
|
|
fonts_dir = self.config.fonts_dir
|
|
|
|
font_source = self.config.get('display.font_source', 'System Font')
|
|
if font_source == "System Font":
|
|
font_source = "None"
|
|
|
|
self.server_sync_client = ServerSyncClient(
|
|
url=url,
|
|
room=room,
|
|
passphrase=passphrase,
|
|
user_name=user_name,
|
|
fonts_dir=fonts_dir,
|
|
font_source=font_source,
|
|
websafe_font=self.config.get('display.websafe_font', '') or None,
|
|
google_font=self.config.get('display.google_font', '') or None,
|
|
custom_font_file=self.config.get('display.custom_font_file', '') or None,
|
|
user_color=self.config.get('display.user_color', '#4CAF50'),
|
|
text_color=self.config.get('display.text_color', '#FFFFFF'),
|
|
background_color=self.config.get('display.background_color', '#000000B3'),
|
|
)
|
|
self.server_sync_client.start()
|
|
|
|
except Exception as e:
|
|
print(f"Error starting server sync: {e}")
|
|
|
|
# ── Configuration ──────────────────────────────────────────────
|
|
|
|
def apply_settings(self, new_config: Optional[dict] = None) -> tuple[bool, str]:
|
|
"""Apply settings changes. If new_config is provided, merge it first.
|
|
|
|
Returns (engine_reload_needed, message).
|
|
"""
|
|
if new_config:
|
|
for key, value in new_config.items():
|
|
self.config.set(key, value)
|
|
|
|
# Update web server display settings
|
|
if self.web_server:
|
|
self.web_server.show_timestamps = self.config.get('display.show_timestamps', True)
|
|
self.web_server.fade_after_seconds = self.config.get('display.fade_after_seconds', 10)
|
|
self.web_server.max_lines = self.config.get('display.max_lines', 50)
|
|
self.web_server.font_family = self.config.get('display.font_family', 'Arial')
|
|
self.web_server.font_size = self.config.get('display.font_size', 16)
|
|
self.web_server.font_source = self.config.get('display.font_source', 'System Font')
|
|
self.web_server.websafe_font = self.config.get('display.websafe_font', 'Arial')
|
|
self.web_server.google_font = self.config.get('display.google_font', 'Roboto')
|
|
self.web_server.user_color = self.config.get('display.user_color', '#4CAF50')
|
|
self.web_server.text_color = self.config.get('display.text_color', '#FFFFFF')
|
|
self.web_server.background_color = self.config.get('display.background_color', '#000000B3')
|
|
|
|
# Restart server sync if running
|
|
if self.is_transcribing and self.server_sync_client:
|
|
self.server_sync_client.stop()
|
|
self.server_sync_client = None
|
|
if self.config.get('server_sync.enabled', False):
|
|
self._start_server_sync()
|
|
|
|
# Check if model/device/remote mode changed -- any of these require
|
|
# a full engine reload since they change which engine class is used
|
|
new_model = self.config.get('transcription.model', 'base.en')
|
|
new_device = self.config.get('transcription.device', 'auto')
|
|
new_remote_mode = self.config.get('remote.mode', 'local')
|
|
current_remote_mode = 'local'
|
|
if self.transcription_engine:
|
|
current_remote_mode = getattr(self.transcription_engine, 'mode', 'local')
|
|
engine_reload_needed = (
|
|
self.current_model_size != new_model
|
|
or self.current_device_config != new_device
|
|
or current_remote_mode != new_remote_mode
|
|
)
|
|
|
|
if engine_reload_needed:
|
|
self.reload_engine()
|
|
return True, "Settings applied. Engine reloading with new model/device."
|
|
else:
|
|
return False, "Settings applied successfully."
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current application status as a dict."""
|
|
host = self.config.get('web_server.host', '127.0.0.1')
|
|
port = self.actual_web_port or self.config.get('web_server.port', 8080)
|
|
|
|
device_info = self.device_manager.get_device_info() if self.device_manager else []
|
|
|
|
remote_mode = self.config.get('remote.mode', 'local')
|
|
if remote_mode in ('managed', 'byok') and self.transcription_engine:
|
|
mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK'
|
|
engine_device = f"Deepgram ({mode_label})"
|
|
elif self.transcription_engine and hasattr(self.transcription_engine, 'device'):
|
|
engine_device = f"{self.transcription_engine.device.upper()} ({self.transcription_engine.compute_type})"
|
|
else:
|
|
engine_device = "Not initialized"
|
|
|
|
return {
|
|
"state": self.state,
|
|
"is_transcribing": self.is_transcribing,
|
|
"version": __version__,
|
|
"engine_device": engine_device,
|
|
"web_server": {
|
|
"host": host,
|
|
"port": port,
|
|
"url": f"http://{host}:{port}",
|
|
"running": self.web_server_thread is not None and self.web_server_thread.is_alive(),
|
|
},
|
|
"transcription_count": len(self.transcriptions),
|
|
"remote_mode": remote_mode,
|
|
"server_sync_enabled": self.config.get('server_sync.enabled', False),
|
|
}
|
|
|
|
def get_audio_devices(self) -> list[dict]:
|
|
"""List available audio input devices."""
|
|
import sounddevice as sd
|
|
devices = []
|
|
try:
|
|
device_list = sd.query_devices()
|
|
for i, device in enumerate(device_list):
|
|
if device['max_input_channels'] > 0:
|
|
devices.append({"index": i, "name": device['name']})
|
|
except Exception:
|
|
pass
|
|
if not devices:
|
|
devices = [{"index": 0, "name": "Default"}]
|
|
return devices
|
|
|
|
def get_compute_devices(self) -> list[dict]:
|
|
"""List available compute devices."""
|
|
devices = [{"id": "auto", "name": "Auto-detect"}]
|
|
if self.device_manager:
|
|
device_info = self.device_manager.get_device_info()
|
|
for dev_id, dev_name in device_info:
|
|
devices.append({"id": dev_id, "name": dev_name})
|
|
else:
|
|
devices.append({"id": "cloud", "name": "Cloud (Deepgram)"})
|
|
return devices
|
|
|
|
# ── Update Checking ────────────────────────────────────────────
|
|
|
|
def check_for_updates(self) -> dict:
|
|
"""Check for updates synchronously. Returns update info or None."""
|
|
from client.update_checker import UpdateChecker
|
|
|
|
gitea_url = self.config.get('updates.gitea_url', 'https://repo.anhonesthost.net')
|
|
owner = self.config.get('updates.owner', 'streamer-tools')
|
|
repo = self.config.get('updates.repo', 'local-transcription')
|
|
|
|
if not gitea_url or not owner or not repo:
|
|
return {"available": False, "error": "Update checking not configured"}
|
|
|
|
checker = UpdateChecker(
|
|
current_version=__version__,
|
|
gitea_url=gitea_url,
|
|
owner=owner,
|
|
repo=repo,
|
|
)
|
|
|
|
try:
|
|
release_info = checker.check_for_update()
|
|
self.config.set('updates.last_check', datetime.now().isoformat())
|
|
|
|
if release_info:
|
|
skipped = self.config.get('updates.skipped_versions', [])
|
|
return {
|
|
"available": True,
|
|
"version": release_info.version,
|
|
"download_url": release_info.download_url,
|
|
"release_notes": release_info.release_notes,
|
|
"skipped": release_info.version in skipped,
|
|
}
|
|
else:
|
|
return {"available": False, "current_version": __version__}
|
|
except Exception as e:
|
|
return {"available": False, "error": str(e)}
|
|
|
|
def skip_version(self, version: str):
|
|
"""Mark a version as skipped for update notifications."""
|
|
skipped = self.config.get('updates.skipped_versions', [])
|
|
if version not in skipped:
|
|
skipped.append(version)
|
|
self.config.set('updates.skipped_versions', skipped)
|