Add Tauri v2 + Svelte 5 frontend and headless Python backend

Scaffold the cross-platform rewrite from PySide6/Qt to Tauri + Svelte,
following the same architecture as voice-to-notes. The Python backend
runs headless as a sidecar, with a FastAPI control API that the Svelte
frontend connects to via REST and WebSocket.

New files:
- backend/app_controller.py: Headless orchestration (extracted from MainWindow)
- backend/api_server.py: FastAPI control endpoints + /ws/control WebSocket
- backend/main_headless.py: Headless entry point for sidecar mode
- src-tauri/: Tauri v2 Rust shell with sidecar and dialog plugins
- src/: Svelte 5 frontend (App, Settings, Controls, TranscriptionDisplay)
- src/lib/stores/: Reactive stores for backend connection, config, transcriptions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Developer
2026-04-06 10:20:25 -07:00
parent 9ff883e2e3
commit af534bf768
29 changed files with 14008 additions and 0 deletions

1
backend/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Backend package for headless transcription service."""

323
backend/api_server.py Normal file
View File

@@ -0,0 +1,323 @@
"""FastAPI control API server for the headless transcription backend.
Extends the existing OBS display server with REST endpoints and a
control WebSocket channel so that a Tauri (or any other) frontend
can drive the application.
"""
import asyncio
import json
from datetime import datetime
from typing import List, Optional
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from backend.app_controller import AppController
# ── Request / Response Models ──────────────────────────────────────
class ConfigUpdate(BaseModel):
"""Batch config update payload. Keys use dot-notation."""
settings: dict # e.g. {"user.name": "Alice", "transcription.model": "small.en"}
class LoginRequest(BaseModel):
email: str
password: str
server_url: str
class RegisterRequest(BaseModel):
email: str
password: str
server_url: str
class SkipVersionRequest(BaseModel):
version: str
class SaveFileRequest(BaseModel):
path: str
text: str
# ── API Server ─────────────────────────────────────────────────────
class APIServer:
"""Wraps AppController with a FastAPI application exposing control endpoints."""
def __init__(self, controller: AppController):
self.controller = controller
self.control_connections: List[WebSocket] = []
self.app = FastAPI(title="Local Transcription API", version="1.0.0")
# Allow Tauri webview origin
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Tauri uses tauri://localhost or https://tauri.localhost
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
self._setup_routes()
self._wire_controller_callbacks()
def _wire_controller_callbacks(self):
"""Wire AppController callbacks to broadcast over /ws/control."""
original_state_cb = self.controller.on_state_changed
def on_state_changed(state: str, message: str):
if original_state_cb:
original_state_cb(state, message)
self._broadcast_control({"type": "state_changed", "state": state, "message": message})
self.controller.on_state_changed = on_state_changed
def on_transcription(data: dict):
self._broadcast_control({"type": "transcription", **data})
self.controller.on_transcription = on_transcription
def on_preview(data: dict):
self._broadcast_control({"type": "preview", **data})
self.controller.on_preview = on_preview
def on_error(msg: str):
self._broadcast_control({"type": "error", "message": msg})
self.controller.on_error = on_error
def on_credits_low(seconds: int):
self._broadcast_control({"type": "credits_low", "seconds_remaining": seconds})
self.controller.on_credits_low = on_credits_low
def _broadcast_control(self, data: dict):
"""Send a message to all connected /ws/control clients."""
if not self.control_connections:
return
message = json.dumps(data)
disconnected = []
for ws in self.control_connections:
try:
asyncio.run_coroutine_threadsafe(
ws.send_text(message),
asyncio.get_event_loop(),
)
except Exception:
disconnected.append(ws)
for ws in disconnected:
self.control_connections.remove(ws)
def _setup_routes(self):
"""Register all API routes."""
app = self.app
ctrl = self.controller
# ── Status ─────────────────────────────────────────────
@app.get("/api/status")
async def get_status():
return ctrl.get_status()
@app.get("/api/version")
async def get_version():
from version import __version__
return {"version": __version__}
# ── Transcription Control ──────────────────────────────
@app.post("/api/start")
async def start_transcription():
success, message = ctrl.start_transcription()
if not success:
raise HTTPException(status_code=400, detail=message)
return {"status": "ok", "message": message}
@app.post("/api/stop")
async def stop_transcription():
success, message = ctrl.stop_transcription()
if not success:
raise HTTPException(status_code=400, detail=message)
return {"status": "ok", "message": message}
@app.post("/api/clear")
async def clear_transcriptions():
count = ctrl.clear_transcriptions()
return {"status": "ok", "cleared": count}
@app.get("/api/transcriptions")
async def get_transcriptions():
show_timestamps = ctrl.config.get('display.show_timestamps', True)
return {
"count": len(ctrl.transcriptions),
"text": ctrl.get_transcriptions_text(include_timestamps=show_timestamps),
"items": [
{
"text": r.text,
"user_name": r.user_name,
"timestamp": r.timestamp.strftime("%H:%M:%S") if r.timestamp else None,
}
for r in ctrl.transcriptions
],
}
@app.post("/api/save-file")
async def save_file(req: SaveFileRequest):
"""Save text to a file (used by Tauri frontend after dialog)."""
from pathlib import Path
try:
Path(req.path).write_text(req.text, encoding="utf-8")
return {"status": "ok", "path": req.path}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ── Configuration ──────────────────────────────────────
@app.get("/api/config")
async def get_config():
return ctrl.config.config
@app.put("/api/config")
async def update_config(update: ConfigUpdate):
engine_reloaded, message = ctrl.apply_settings(update.settings)
return {
"status": "ok",
"message": message,
"engine_reloaded": engine_reloaded,
}
# ── Devices ────────────────────────────────────────────
@app.get("/api/audio-devices")
async def get_audio_devices():
return {"devices": ctrl.get_audio_devices()}
@app.get("/api/compute-devices")
async def get_compute_devices():
return {"devices": ctrl.get_compute_devices()}
# ── Engine ─────────────────────────────────────────────
@app.post("/api/reload-engine")
async def reload_engine():
success, message = ctrl.reload_engine()
if not success:
raise HTTPException(status_code=500, detail=message)
return {"status": "ok", "message": message}
# ── Updates ────────────────────────────────────────────
@app.get("/api/check-update")
async def check_update():
return ctrl.check_for_updates()
@app.post("/api/skip-version")
async def skip_version(req: SkipVersionRequest):
ctrl.skip_version(req.version)
return {"status": "ok"}
# ── Managed Mode Auth Proxy ────────────────────────────
@app.post("/api/login")
async def login(req: LoginRequest):
"""Proxy login to the transcription proxy server."""
import requests as http_requests
try:
resp = http_requests.post(
f"{req.server_url}/api/auth/login",
json={"email": req.email, "password": req.password},
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
ctrl.config.set('remote.auth_token', data.get('token', ''))
ctrl.config.set('remote.server_url', req.server_url)
return {"status": "ok", "token": data.get('token', '')}
else:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
except http_requests.RequestException as e:
raise HTTPException(status_code=502, detail=str(e))
@app.post("/api/register")
async def register(req: RegisterRequest):
"""Proxy registration to the transcription proxy server."""
import requests as http_requests
try:
resp = http_requests.post(
f"{req.server_url}/api/auth/register",
json={"email": req.email, "password": req.password},
timeout=10,
)
if resp.status_code in (200, 201):
return {"status": "ok", "data": resp.json()}
else:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
except http_requests.RequestException as e:
raise HTTPException(status_code=502, detail=str(e))
@app.get("/api/balance")
async def get_balance():
"""Proxy balance check to the transcription proxy server."""
import requests as http_requests
server_url = ctrl.config.get('remote.server_url', '')
token = ctrl.config.get('remote.auth_token', '')
if not server_url or not token:
raise HTTPException(status_code=400, detail="Not logged in to managed service")
try:
resp = http_requests.get(
f"{server_url}/api/billing/balance",
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
if resp.status_code == 200:
return resp.json()
else:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
except http_requests.RequestException as e:
raise HTTPException(status_code=502, detail=str(e))
# ── Control WebSocket ──────────────────────────────────
@app.websocket("/ws/control")
async def websocket_control(websocket: WebSocket):
"""WebSocket channel for real-time state and transcription push."""
await websocket.accept()
self.control_connections.append(websocket)
# Send current status on connect
try:
await websocket.send_json({
"type": "state_changed",
"state": ctrl.state,
"message": "Connected",
})
except Exception:
pass
try:
while True:
# Keep alive -- client sends pings
await websocket.receive_text()
except Exception:
if websocket in self.control_connections:
self.control_connections.remove(websocket)
# ── Mount the existing OBS display routes ──────────────
# The OBS display (GET / and /ws) is handled by the
# TranscriptionWebServer which shares the same Uvicorn
# instance. We mount it as a sub-application so the
# existing OBS URLs continue to work.
if ctrl.web_server:
app.mount("/obs", ctrl.web_server.app)

692
backend/app_controller.py Normal file
View File

@@ -0,0 +1,692 @@
"""Headless application controller for transcription backend.
Extracts orchestration logic from gui/main_window_qt.py into a
Qt-free class that manages engine lifecycle, web server, server sync,
and configuration -- all accessible via callbacks instead of Qt signals.
"""
import asyncio
import time
from datetime import datetime
from pathlib import Path
from threading import Thread, Lock
from typing import Callable, List, Optional
import sys
# Add project root to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from client.config import Config
from client.device_utils import DeviceManager
from client.transcription_engine_realtime import RealtimeTranscriptionEngine, TranscriptionResult
from client.deepgram_transcription import DeepgramTranscriptionEngine
from client.server_sync import ServerSyncClient
from server.web_display import TranscriptionWebServer
from version import __version__
class AppState:
"""Enum-like class for application states."""
INITIALIZING = "initializing"
READY = "ready"
TRANSCRIBING = "transcribing"
RELOADING = "reloading"
ERROR = "error"
class WebServerThread(Thread):
"""Thread for running the web server."""
def __init__(self, web_server: TranscriptionWebServer):
super().__init__(daemon=True)
self.web_server = web_server
self.loop: Optional[asyncio.AbstractEventLoop] = None
self.error: Optional[Exception] = None
def run(self):
try:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.web_server.start())
except Exception as e:
self.error = e
print(f"ERROR: Web server failed to start: {e}")
class EngineInitThread(Thread):
"""Thread for initializing the transcription engine without blocking."""
def __init__(self, engine, on_complete: Callable[[bool, str], None]):
super().__init__(daemon=True)
self.engine = engine
self.on_complete = on_complete
def run(self):
try:
success = self.engine.initialize()
if success:
self.on_complete(True, "Engine initialized successfully")
else:
self.on_complete(False, "Failed to initialize engine")
except Exception as e:
self.on_complete(False, f"Error initializing engine: {e}")
class AppController:
"""Headless controller managing the transcription application lifecycle.
This replaces the orchestration logic that previously lived in MainWindow.
It manages:
- Transcription engine lifecycle (init, start, stop, reload)
- Web server for OBS display
- Server sync for multi-user mode
- Configuration
- Update checking
All state changes are communicated via callbacks, making it UI-agnostic.
"""
def __init__(self, config: Optional[Config] = None):
self.config = config or Config()
self.device_manager = DeviceManager()
# State
self._state = AppState.INITIALIZING
self._state_lock = Lock()
self.is_transcribing = False
# Engine
self.transcription_engine = None
self._engine_init_thread: Optional[EngineInitThread] = None
self.current_model_size: Optional[str] = None
self.current_device_config: Optional[str] = None
# Web server
self.web_server: Optional[TranscriptionWebServer] = None
self.web_server_thread: Optional[WebServerThread] = None
self.actual_web_port: Optional[int] = None
# Server sync
self.server_sync_client: Optional[ServerSyncClient] = None
# Transcription storage
self.transcriptions: List[TranscriptionResult] = []
# Callbacks for state notifications (set by the frontend / API server)
self.on_state_changed: Optional[Callable[[str, str], None]] = None # (state, message)
self.on_transcription: Optional[Callable[[dict], None]] = None # final transcription
self.on_preview: Optional[Callable[[dict], None]] = None # realtime preview
self.on_error: Optional[Callable[[str], None]] = None
self.on_credits_low: Optional[Callable[[int], None]] = None
@property
def state(self) -> str:
with self._state_lock:
return self._state
def _set_state(self, state: str, message: str = ""):
with self._state_lock:
self._state = state
if self.on_state_changed:
self.on_state_changed(state, message)
# ── Lifecycle ──────────────────────────────────────────────────
def initialize(self):
"""Initialize the web server and transcription engine.
Call this once at startup. Non-blocking -- engine init happens
in a background thread.
"""
self._set_state(AppState.INITIALIZING, "Starting web server...")
self._start_web_server()
self._set_state(AppState.INITIALIZING, "Loading transcription engine...")
self._initialize_engine()
def shutdown(self):
"""Gracefully shut down all components."""
# Stop transcription
if self.is_transcribing:
self.stop_transcription()
# Stop web server
if self.web_server_thread and self.web_server_thread.is_alive():
try:
if self.web_server_thread.loop:
self.web_server_thread.loop.call_soon_threadsafe(
self.web_server_thread.loop.stop
)
except Exception as e:
print(f"Warning: Error stopping web server: {e}")
# Stop transcription engine
if self.transcription_engine:
try:
self.transcription_engine.stop()
except Exception as e:
print(f"Warning: Error stopping engine: {e}")
# Wait for engine init thread
if self._engine_init_thread and self._engine_init_thread.is_alive():
self._engine_init_thread.join(timeout=5)
# ── Web Server ─────────────────────────────────────────────────
def _start_web_server(self):
"""Start the FastAPI web server for OBS display."""
try:
host = self.config.get('web_server.host', '127.0.0.1')
port = self.config.get('web_server.port', 8080)
# Gather display settings
ws_kwargs = self._get_web_server_kwargs(host, port)
# Try up to 5 ports
ports_to_try = [port] + [port + i for i in range(1, 5)]
for try_port in ports_to_try:
print(f"Attempting to start web server at http://{host}:{try_port}")
ws_kwargs['port'] = try_port
self.web_server = TranscriptionWebServer(**ws_kwargs)
self.web_server_thread = WebServerThread(self.web_server)
self.web_server_thread.start()
time.sleep(0.5)
if self.web_server_thread.error:
error_str = str(self.web_server_thread.error)
if "address already in use" in error_str.lower() or "errno 98" in error_str.lower():
print(f"Port {try_port} is in use, trying next port...")
self.web_server = None
self.web_server_thread = None
continue
else:
print(f"Web server failed to start: {self.web_server_thread.error}")
self.web_server = None
self.web_server_thread = None
break
else:
self.actual_web_port = try_port
print(f"Web server started at http://{host}:{try_port}")
return
print(f"WARNING: Could not start web server on any port")
except Exception as e:
print(f"ERROR: Failed to initialize web server: {e}")
self.web_server = None
self.web_server_thread = None
def _get_web_server_kwargs(self, host: str, port: int) -> dict:
"""Build kwargs dict for TranscriptionWebServer from config."""
return dict(
host=host,
port=port,
show_timestamps=self.config.get('display.show_timestamps', True),
fade_after_seconds=self.config.get('display.fade_after_seconds', 10),
max_lines=self.config.get('display.max_lines', 50),
font_family=self.config.get('display.font_family', 'Arial'),
font_size=self.config.get('display.font_size', 16),
fonts_dir=self.config.fonts_dir,
font_source=self.config.get('display.font_source', 'System Font'),
websafe_font=self.config.get('display.websafe_font', 'Arial'),
google_font=self.config.get('display.google_font', 'Roboto'),
user_color=self.config.get('display.user_color', '#4CAF50'),
text_color=self.config.get('display.text_color', '#FFFFFF'),
background_color=self.config.get('display.background_color', '#000000B3'),
)
# ── Transcription Engine ───────────────────────────────────────
def _initialize_engine(self):
"""Initialize the transcription engine in a background thread."""
device_config = self.config.get('transcription.device', 'auto')
self.device_manager.set_device(device_config)
audio_device_str = self.config.get('audio.input_device', 'default')
audio_device = None if audio_device_str == 'default' else int(audio_device_str)
model = self.config.get('transcription.model', 'base.en')
language = self.config.get('transcription.language', 'en')
device = self.device_manager.get_device_for_whisper()
compute_type = self.config.get('transcription.compute_type', 'default')
self.current_model_size = model
self.current_device_config = device_config
user_name = self.config.get('user.name', 'User')
continuous_mode = self.config.get('transcription.continuous_mode', False)
if continuous_mode:
post_speech_silence = 0.15
min_gap = 0.0
min_recording = 0.3
else:
post_speech_silence = self.config.get('transcription.post_speech_silence_duration', 0.3)
min_gap = self.config.get('transcription.min_gap_between_recordings', 0.0)
min_recording = self.config.get('transcription.min_length_of_recording', 0.5)
remote_mode = self.config.get('remote.mode', 'local')
if remote_mode in ('managed', 'byok'):
self.transcription_engine = DeepgramTranscriptionEngine(
config=self.config,
user_name=user_name,
input_device_index=audio_device,
)
self.transcription_engine.set_callbacks(
realtime_callback=self._on_realtime_transcription,
final_callback=self._on_final_transcription,
)
self.transcription_engine.set_error_callback(self._on_remote_error)
self.transcription_engine.set_credits_low_callback(self._on_credits_low)
else:
self.transcription_engine = RealtimeTranscriptionEngine(
model=model,
device=device,
language=language,
compute_type=compute_type,
enable_realtime_transcription=self.config.get('transcription.enable_realtime_transcription', False),
realtime_model=self.config.get('transcription.realtime_model', 'tiny.en'),
realtime_processing_pause=self.config.get('transcription.realtime_processing_pause', 0.1),
silero_sensitivity=self.config.get('transcription.silero_sensitivity', 0.4),
silero_use_onnx=self.config.get('transcription.silero_use_onnx', True),
webrtc_sensitivity=self.config.get('transcription.webrtc_sensitivity', 3),
post_speech_silence_duration=post_speech_silence,
min_length_of_recording=min_recording,
min_gap_between_recordings=min_gap,
pre_recording_buffer_duration=self.config.get('transcription.pre_recording_buffer_duration', 0.2),
beam_size=self.config.get('transcription.beam_size', 5),
initial_prompt=self.config.get('transcription.initial_prompt', ''),
no_log_file=self.config.get('transcription.no_log_file', True),
input_device_index=audio_device,
user_name=user_name,
)
self.transcription_engine.set_callbacks(
realtime_callback=self._on_realtime_transcription,
final_callback=self._on_final_transcription,
)
# Start init in background thread
self._engine_init_thread = EngineInitThread(
self.transcription_engine,
self._on_engine_ready,
)
self._engine_init_thread.start()
def _on_engine_ready(self, success: bool, message: str):
"""Called from EngineInitThread when engine init completes."""
if success:
remote_mode = self.config.get('remote.mode', 'local')
if remote_mode in ('managed', 'byok'):
mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK'
device_display = f"Deepgram ({mode_label})"
elif self.transcription_engine:
actual_device = self.transcription_engine.device
compute_type = self.transcription_engine.compute_type
device_display = f"{actual_device.upper()} ({compute_type})"
else:
device_display = "Unknown"
self._set_state(AppState.READY, f"Ready | Device: {device_display}")
else:
self._set_state(AppState.ERROR, message)
# ── Transcription Control ──────────────────────────────────────
def start_transcription(self) -> tuple[bool, str]:
"""Start transcription. Returns (success, message)."""
if self.is_transcribing:
return False, "Already transcribing"
if not self.transcription_engine or not self.transcription_engine.is_ready():
return False, "Transcription engine not ready"
try:
success = self.transcription_engine.start_recording()
if not success:
return False, "Failed to start recording"
# Start server sync if enabled
if self.config.get('server_sync.enabled', False):
self._start_server_sync()
self.is_transcribing = True
self._set_state(AppState.TRANSCRIBING, "Transcribing...")
return True, "Transcription started"
except Exception as e:
return False, f"Failed to start transcription: {e}"
def stop_transcription(self) -> tuple[bool, str]:
"""Stop transcription. Returns (success, message)."""
if not self.is_transcribing:
return False, "Not transcribing"
try:
if self.transcription_engine:
self.transcription_engine.stop_recording()
if self.server_sync_client:
self.server_sync_client.stop()
self.server_sync_client = None
self.is_transcribing = False
self._set_state(AppState.READY, "Ready")
return True, "Transcription stopped"
except Exception as e:
return False, f"Failed to stop transcription: {e}"
def clear_transcriptions(self) -> int:
"""Clear stored transcriptions. Returns count of cleared items."""
count = len(self.transcriptions)
self.transcriptions.clear()
return count
def get_transcriptions_text(self, include_timestamps: bool = True) -> str:
"""Get all transcriptions as formatted text."""
lines = []
for result in self.transcriptions:
parts = []
if include_timestamps:
parts.append(f"[{result.timestamp.strftime('%H:%M:%S')}]")
if result.user_name and result.user_name.strip():
parts.append(f"{result.user_name}:")
parts.append(result.text)
lines.append(" ".join(parts))
return "\n".join(lines)
def reload_engine(self) -> tuple[bool, str]:
"""Reload the transcription engine with current config settings."""
try:
was_transcribing = self.is_transcribing
if was_transcribing:
self.stop_transcription()
self._set_state(AppState.RELOADING, "Reloading engine...")
# Wait for any existing init thread
if self._engine_init_thread and self._engine_init_thread.is_alive():
self._engine_init_thread.join(timeout=10)
# Stop current engine
if self.transcription_engine:
try:
self.transcription_engine.stop()
except Exception as e:
print(f"Warning: Error stopping engine: {e}")
# Re-initialize
self._initialize_engine()
return True, "Engine reload initiated"
except Exception as e:
self._set_state(AppState.ERROR, f"Engine reload failed: {e}")
return False, str(e)
# ── Transcription Callbacks ────────────────────────────────────
def _on_realtime_transcription(self, result: TranscriptionResult):
"""Handle realtime (preview) transcription."""
if not self.is_transcribing:
return
try:
# Broadcast to web server
if self.web_server and self.web_server_thread and self.web_server_thread.loop:
asyncio.run_coroutine_threadsafe(
self.web_server.broadcast_preview(
result.text, result.user_name, result.timestamp
),
self.web_server_thread.loop,
)
# Send to server sync
if self.server_sync_client:
self.server_sync_client.send_preview(result.text, result.timestamp)
# Notify frontend
if self.on_preview:
self.on_preview({
"text": result.text,
"user_name": result.user_name,
"timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None,
"is_preview": True,
})
except Exception as e:
print(f"Error handling realtime transcription: {e}")
def _on_final_transcription(self, result: TranscriptionResult):
"""Handle final transcription."""
if not self.is_transcribing:
return
try:
self.transcriptions.append(result)
# Broadcast to web server
if self.web_server and self.web_server_thread and self.web_server_thread.loop:
asyncio.run_coroutine_threadsafe(
self.web_server.broadcast_transcription(
result.text, result.user_name, result.timestamp
),
self.web_server_thread.loop,
)
# Send to server sync
if self.server_sync_client:
self.server_sync_client.send_transcription(
result.text, result.timestamp
)
# Notify frontend
if self.on_transcription:
self.on_transcription({
"text": result.text,
"user_name": result.user_name,
"timestamp": result.timestamp.strftime("%H:%M:%S") if result.timestamp else None,
"is_preview": False,
})
except Exception as e:
print(f"Error handling final transcription: {e}")
def _on_remote_error(self, error_msg: str):
"""Handle error from remote transcription service."""
print(f"Remote transcription error: {error_msg}")
if self.on_error:
self.on_error(error_msg)
def _on_credits_low(self, seconds_remaining: int):
"""Handle low credits warning from proxy."""
if self.on_credits_low:
self.on_credits_low(seconds_remaining)
# ── Server Sync ────────────────────────────────────────────────
def _start_server_sync(self):
"""Start server sync client."""
try:
url = self.config.get('server_sync.url', '')
if not url:
print("Server sync enabled but no URL configured")
return
room = self.config.get('server_sync.room', 'default')
passphrase = self.config.get('server_sync.passphrase', '')
user_name = self.config.get('user.name', 'User')
fonts_dir = self.config.fonts_dir
font_source = self.config.get('display.font_source', 'System Font')
if font_source == "System Font":
font_source = "None"
self.server_sync_client = ServerSyncClient(
url=url,
room=room,
passphrase=passphrase,
user_name=user_name,
fonts_dir=fonts_dir,
font_source=font_source,
websafe_font=self.config.get('display.websafe_font', '') or None,
google_font=self.config.get('display.google_font', '') or None,
custom_font_file=self.config.get('display.custom_font_file', '') or None,
user_color=self.config.get('display.user_color', '#4CAF50'),
text_color=self.config.get('display.text_color', '#FFFFFF'),
background_color=self.config.get('display.background_color', '#000000B3'),
)
self.server_sync_client.start()
except Exception as e:
print(f"Error starting server sync: {e}")
# ── Configuration ──────────────────────────────────────────────
def apply_settings(self, new_config: Optional[dict] = None) -> tuple[bool, str]:
"""Apply settings changes. If new_config is provided, merge it first.
Returns (engine_reload_needed, message).
"""
if new_config:
for key, value in new_config.items():
self.config.set(key, value)
# Update web server display settings
if self.web_server:
self.web_server.show_timestamps = self.config.get('display.show_timestamps', True)
self.web_server.fade_after_seconds = self.config.get('display.fade_after_seconds', 10)
self.web_server.max_lines = self.config.get('display.max_lines', 50)
self.web_server.font_family = self.config.get('display.font_family', 'Arial')
self.web_server.font_size = self.config.get('display.font_size', 16)
self.web_server.font_source = self.config.get('display.font_source', 'System Font')
self.web_server.websafe_font = self.config.get('display.websafe_font', 'Arial')
self.web_server.google_font = self.config.get('display.google_font', 'Roboto')
self.web_server.user_color = self.config.get('display.user_color', '#4CAF50')
self.web_server.text_color = self.config.get('display.text_color', '#FFFFFF')
self.web_server.background_color = self.config.get('display.background_color', '#000000B3')
# Restart server sync if running
if self.is_transcribing and self.server_sync_client:
self.server_sync_client.stop()
self.server_sync_client = None
if self.config.get('server_sync.enabled', False):
self._start_server_sync()
# Check if model/device changed
new_model = self.config.get('transcription.model', 'base.en')
new_device = self.config.get('transcription.device', 'auto')
engine_reload_needed = (
self.current_model_size != new_model
or self.current_device_config != new_device
)
if engine_reload_needed:
self.reload_engine()
return True, "Settings applied. Engine reloading with new model/device."
else:
return False, "Settings applied successfully."
def get_status(self) -> dict:
"""Get current application status as a dict."""
host = self.config.get('web_server.host', '127.0.0.1')
port = self.actual_web_port or self.config.get('web_server.port', 8080)
device_info = self.device_manager.get_device_info()
remote_mode = self.config.get('remote.mode', 'local')
if remote_mode in ('managed', 'byok') and self.transcription_engine:
mode_label = 'Managed' if remote_mode == 'managed' else 'BYOK'
engine_device = f"Deepgram ({mode_label})"
elif self.transcription_engine and hasattr(self.transcription_engine, 'device'):
engine_device = f"{self.transcription_engine.device.upper()} ({self.transcription_engine.compute_type})"
else:
engine_device = "Not initialized"
return {
"state": self.state,
"is_transcribing": self.is_transcribing,
"version": __version__,
"engine_device": engine_device,
"web_server": {
"host": host,
"port": port,
"url": f"http://{host}:{port}",
"running": self.web_server_thread is not None and self.web_server_thread.is_alive(),
},
"transcription_count": len(self.transcriptions),
"remote_mode": remote_mode,
"server_sync_enabled": self.config.get('server_sync.enabled', False),
}
def get_audio_devices(self) -> list[dict]:
"""List available audio input devices."""
import sounddevice as sd
devices = []
try:
device_list = sd.query_devices()
for i, device in enumerate(device_list):
if device['max_input_channels'] > 0:
devices.append({"index": i, "name": device['name']})
except Exception:
pass
if not devices:
devices = [{"index": 0, "name": "Default"}]
return devices
def get_compute_devices(self) -> list[dict]:
"""List available compute devices."""
device_info = self.device_manager.get_device_info()
devices = [{"id": "auto", "name": "Auto-detect"}]
for dev_id, dev_name in device_info:
devices.append({"id": dev_id, "name": dev_name})
return devices
# ── Update Checking ────────────────────────────────────────────
def check_for_updates(self) -> dict:
"""Check for updates synchronously. Returns update info or None."""
from client.update_checker import UpdateChecker
gitea_url = self.config.get('updates.gitea_url', 'https://repo.anhonesthost.net')
owner = self.config.get('updates.owner', 'streamer-tools')
repo = self.config.get('updates.repo', 'local-transcription')
if not gitea_url or not owner or not repo:
return {"available": False, "error": "Update checking not configured"}
checker = UpdateChecker(
current_version=__version__,
gitea_url=gitea_url,
owner=owner,
repo=repo,
)
try:
release_info = checker.check_for_update()
self.config.set('updates.last_check', datetime.now().isoformat())
if release_info:
skipped = self.config.get('updates.skipped_versions', [])
return {
"available": True,
"version": release_info.version,
"download_url": release_info.download_url,
"release_notes": release_info.release_notes,
"skipped": release_info.version in skipped,
}
else:
return {"available": False, "current_version": __version__}
except Exception as e:
return {"available": False, "error": str(e)}
def skip_version(self, version: str):
"""Mark a version as skipped for update notifications."""
skipped = self.config.get('updates.skipped_versions', [])
if version not in skipped:
skipped.append(version)
self.config.set('updates.skipped_versions', skipped)

126
backend/main_headless.py Normal file
View File

@@ -0,0 +1,126 @@
#!/usr/bin/env python3
"""Headless entry point for the Local Transcription backend.
Runs the transcription engine + API server without any GUI (no PySide6).
Designed to be launched as a Tauri sidecar or run standalone for development.
Usage:
python -m backend.main_headless [--port PORT] [--host HOST]
The backend prints the actual port to stdout as JSON on startup:
{"event": "ready", "port": 8080}
This allows the Tauri shell to discover which port the backend bound to.
"""
import argparse
import json
import multiprocessing
import os
import signal
import sys
from pathlib import Path
# Must be called before anything else for PyInstaller compatibility
multiprocessing.freeze_support()
if __name__ == "__main__":
try:
multiprocessing.set_start_method('spawn', force=True)
except RuntimeError:
pass
# Add project root to path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
os.chdir(project_root)
from client.instance_lock import InstanceLock
def main():
parser = argparse.ArgumentParser(description="Local Transcription headless backend")
parser.add_argument("--host", default="127.0.0.1", help="API server host (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8080, help="API server port (default: 8080)")
args = parser.parse_args()
instance_lock = InstanceLock()
if not instance_lock.acquire():
print(json.dumps({"event": "error", "message": "Another instance is already running"}),
flush=True)
sys.exit(1)
def handle_shutdown(signum, frame):
print(json.dumps({"event": "shutdown"}), flush=True)
if controller:
controller.shutdown()
instance_lock.release()
sys.exit(0)
signal.signal(signal.SIGTERM, handle_shutdown)
signal.signal(signal.SIGINT, handle_shutdown)
controller = None
try:
from backend.app_controller import AppController
from backend.api_server import APIServer
# Override web server port from CLI arg
from client.config import Config
config = Config()
config.set('web_server.host', args.host)
config.set('web_server.port', args.port)
# Create controller and initialize
controller = AppController(config=config)
# Wire a state callback that prints the ready event
def on_state_changed(state, message):
event = {"event": "state", "state": state, "message": message}
print(json.dumps(event), flush=True)
controller.on_state_changed = on_state_changed
# Initialize engine + web server
controller.initialize()
# Create API server wrapping the controller
api_server = APIServer(controller)
# Determine actual port (web server may have shifted if port was in use)
actual_port = controller.actual_web_port or args.port
# Print ready event so Tauri can discover the port
print(json.dumps({"event": "ready", "port": actual_port}), flush=True)
# Run the API server (blocks)
import uvicorn
import logging
logging.getLogger("uvicorn").setLevel(logging.ERROR)
logging.getLogger("uvicorn.access").setLevel(logging.ERROR)
uvicorn.run(
api_server.app,
host=args.host,
port=actual_port + 1, # API on port+1, OBS display on the main port
log_level="error",
access_log=False,
)
except KeyboardInterrupt:
print(json.dumps({"event": "shutdown", "reason": "keyboard_interrupt"}), flush=True)
except Exception as e:
print(json.dumps({"event": "error", "message": str(e)}), flush=True)
import traceback
traceback.print_exc()
sys.exit(1)
finally:
if controller:
controller.shutdown()
instance_lock.release()
if __name__ == "__main__":
main()