Add unified per-speaker font support and remote transcription service

Font changes:
- Consolidate font settings into single Display Settings section
- Support Web-Safe, Google Fonts, and Custom File uploads for both displays
- Fix Google Fonts URL encoding (use + instead of %2B for spaces)
- Fix per-speaker font inline style quote escaping in Node.js display
- Add font debug logging to help diagnose font issues
- Update web server to sync all font settings on settings change
- Remove deprecated PHP server documentation files

New features:
- Add remote transcription service for GPU offloading
- Add instance lock to prevent multiple app instances
- Add version tracking

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-11 18:56:12 -08:00
parent f035bdb927
commit ff067b3368
23 changed files with 2486 additions and 1160 deletions

View File

@@ -19,6 +19,10 @@ class Config:
self.app_dir = Path.home() / ".local-transcription"
self.app_dir.mkdir(parents=True, exist_ok=True)
# Fonts directory for custom font files
self.fonts_dir = self.app_dir / "fonts"
self.fonts_dir.mkdir(parents=True, exist_ok=True)
if config_path is None:
self.config_path = self.app_dir / "config.yaml"
else:
@@ -34,7 +38,7 @@ class Config:
self.config = yaml.safe_load(f) or {}
else:
# Load default configuration
default_config_path = Path(__file__).parent.parent / "config" / "default_config.yaml"
default_config_path = Path(__file__).resolve().parent.parent / "config" / "default_config.yaml"
if default_config_path.exists():
with open(default_config_path, 'r') as f:
self.config = yaml.safe_load(f) or {}
@@ -137,5 +141,24 @@ class Config:
self.config = self._get_default_config()
self.save()
def get_custom_fonts(self) -> list:
"""
Get list of custom font files in the fonts directory.
Returns:
List of (font_name, font_path) tuples
"""
fonts = []
font_extensions = {'.ttf', '.otf', '.woff', '.woff2'}
if self.fonts_dir.exists():
for font_file in self.fonts_dir.iterdir():
if font_file.suffix.lower() in font_extensions:
# Use filename without extension as font name
font_name = font_file.stem
fonts.append((font_name, font_file))
return sorted(fonts, key=lambda x: x[0].lower())
def __repr__(self) -> str:
return f"Config(path={self.config_path})"

94
client/instance_lock.py Normal file
View File

@@ -0,0 +1,94 @@
"""Single instance lock management for Local Transcription application."""
import os
import sys
from pathlib import Path
class InstanceLock:
"""Manages single instance lock using a PID file."""
def __init__(self):
"""Initialize the instance lock."""
self.lock_dir = Path.home() / '.local-transcription'
self.lock_file = self.lock_dir / 'app.lock'
def acquire(self) -> bool:
"""
Try to acquire the instance lock.
Returns:
True if lock acquired (no other instance running),
False if another instance is already running.
"""
# Ensure lock directory exists
self.lock_dir.mkdir(parents=True, exist_ok=True)
if self.lock_file.exists():
try:
pid_str = self.lock_file.read_text().strip()
if pid_str:
pid = int(pid_str)
if self._is_process_running(pid):
return False
except (ValueError, OSError):
# Invalid PID file, we can overwrite it
pass
# Write our PID to the lock file
try:
self.lock_file.write_text(str(os.getpid()))
return True
except OSError:
return False
def release(self):
"""Release the instance lock."""
try:
if self.lock_file.exists():
# Only remove if it contains our PID
pid_str = self.lock_file.read_text().strip()
if pid_str and int(pid_str) == os.getpid():
self.lock_file.unlink()
except (ValueError, OSError):
pass
def _is_process_running(self, pid: int) -> bool:
"""
Check if a process with the given PID is running.
Args:
pid: Process ID to check
Returns:
True if process is running, False otherwise
"""
if sys.platform == 'win32':
# Windows
try:
import ctypes
kernel32 = ctypes.windll.kernel32
SYNCHRONIZE = 0x00100000
process = kernel32.OpenProcess(SYNCHRONIZE, False, pid)
if process:
kernel32.CloseHandle(process)
return True
return False
except Exception:
return False
else:
# Unix/Linux/macOS
try:
os.kill(pid, 0)
return True
except OSError:
return False
def __enter__(self):
"""Context manager entry."""
return self.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.release()
return False

View File

@@ -0,0 +1,346 @@
"""
Remote Transcription Client
Handles streaming audio to a remote transcription service and receiving transcriptions.
Provides fallback to local transcription if the remote service is unavailable.
"""
import asyncio
import base64
import json
import logging
import numpy as np
from datetime import datetime
from threading import Thread, Lock
from typing import Optional, Callable
from queue import Queue, Empty
logger = logging.getLogger(__name__)
class RemoteTranscriptionClient:
"""
Client for remote transcription service.
Streams audio to a remote server and receives transcriptions.
"""
def __init__(
self,
server_url: str,
api_key: str,
on_transcription: Optional[Callable[[str, bool], None]] = None,
on_error: Optional[Callable[[str], None]] = None,
on_connection_change: Optional[Callable[[bool], None]] = None,
sample_rate: int = 16000
):
"""
Initialize remote transcription client.
Args:
server_url: WebSocket URL of the transcription service
api_key: API key for authentication
on_transcription: Callback for transcriptions (text, is_preview)
on_error: Callback for errors
on_connection_change: Callback for connection status changes
sample_rate: Audio sample rate
"""
self.server_url = server_url
self.api_key = api_key
self.sample_rate = sample_rate
self.on_transcription = on_transcription
self.on_error = on_error
self.on_connection_change = on_connection_change
self.websocket = None
self.is_connected = False
self.is_authenticated = False
self.is_running = False
self.audio_queue: Queue = Queue()
self.send_thread: Optional[Thread] = None
self.receive_thread: Optional[Thread] = None
self.loop: Optional[asyncio.AbstractEventLoop] = None
self._lock = Lock()
async def _connect(self):
"""Establish WebSocket connection and authenticate."""
try:
import websockets
logger.info(f"Connecting to {self.server_url}")
self.websocket = await websockets.connect(
self.server_url,
ping_interval=30,
ping_timeout=10
)
# Authenticate
auth_message = {
"type": "auth",
"api_key": self.api_key
}
await self.websocket.send(json.dumps(auth_message))
# Wait for auth response
response = await asyncio.wait_for(
self.websocket.recv(),
timeout=10.0
)
auth_result = json.loads(response)
if auth_result.get("type") == "auth_result" and auth_result.get("success"):
self.is_connected = True
self.is_authenticated = True
logger.info("Connected and authenticated to remote transcription service")
if self.on_connection_change:
self.on_connection_change(True)
return True
else:
error_msg = auth_result.get("message", "Authentication failed")
logger.error(f"Authentication failed: {error_msg}")
if self.on_error:
self.on_error(f"Authentication failed: {error_msg}")
return False
except Exception as e:
logger.error(f"Connection failed: {e}")
if self.on_error:
self.on_error(f"Connection failed: {e}")
return False
async def _send_loop(self):
"""Send audio chunks from the queue."""
while self.is_running and self.websocket:
try:
# Get audio from queue with timeout
try:
audio_data = self.audio_queue.get(timeout=0.1)
except Empty:
continue
if audio_data is None:
continue
# Encode audio as base64
audio_bytes = audio_data.astype(np.float32).tobytes()
audio_b64 = base64.b64encode(audio_bytes).decode('utf-8')
# Send to server
message = {
"type": "audio",
"data": audio_b64,
"sample_rate": self.sample_rate
}
await self.websocket.send(json.dumps(message))
except Exception as e:
if self.is_running:
logger.error(f"Send error: {e}")
break
async def _receive_loop(self):
"""Receive transcriptions from the server."""
while self.is_running and self.websocket:
try:
message = await asyncio.wait_for(
self.websocket.recv(),
timeout=1.0
)
data = json.loads(message)
msg_type = data.get("type", "")
if msg_type == "transcription":
text = data.get("text", "")
is_preview = data.get("is_preview", False)
if text and self.on_transcription:
self.on_transcription(text, is_preview)
elif msg_type == "error":
error_msg = data.get("message", "Unknown error")
logger.error(f"Server error: {error_msg}")
if self.on_error:
self.on_error(error_msg)
elif msg_type == "pong":
pass # Keep-alive response
except asyncio.TimeoutError:
continue
except Exception as e:
if self.is_running:
logger.error(f"Receive error: {e}")
break
# Connection lost
self.is_connected = False
self.is_authenticated = False
if self.on_connection_change:
self.on_connection_change(False)
def _run_async(self):
"""Run the async event loop in a thread."""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
try:
# Connect
connected = self.loop.run_until_complete(self._connect())
if not connected:
return
# Run send and receive loops
tasks = [
self._send_loop(),
self._receive_loop()
]
self.loop.run_until_complete(asyncio.gather(*tasks))
except Exception as e:
logger.error(f"Async loop error: {e}")
finally:
if self.websocket:
try:
self.loop.run_until_complete(self.websocket.close())
except:
pass
self.loop.close()
def start(self):
"""Start the remote transcription client."""
with self._lock:
if self.is_running:
return
self.is_running = True
# Start async loop in background thread
self.send_thread = Thread(target=self._run_async, daemon=True)
self.send_thread.start()
def stop(self):
"""Stop the remote transcription client."""
with self._lock:
self.is_running = False
# Signal end to server
if self.websocket and self.loop:
try:
asyncio.run_coroutine_threadsafe(
self.websocket.send(json.dumps({"type": "end"})),
self.loop
)
except:
pass
self.is_connected = False
self.is_authenticated = False
def send_audio(self, audio_data: np.ndarray):
"""
Send audio data for transcription.
Args:
audio_data: Audio data as numpy array (float32, mono, sample_rate)
"""
if self.is_connected and self.is_authenticated:
self.audio_queue.put(audio_data)
@property
def connected(self) -> bool:
"""Check if connected and authenticated."""
return self.is_connected and self.is_authenticated
class RemoteTranscriptionManager:
"""
Manages remote transcription with fallback to local processing.
"""
def __init__(
self,
server_url: str,
api_key: str,
local_engine=None,
on_transcription: Optional[Callable] = None,
on_preview: Optional[Callable] = None
):
"""
Initialize the remote transcription manager.
Args:
server_url: Remote transcription service URL
api_key: API key for authentication
local_engine: Local transcription engine for fallback
on_transcription: Callback for final transcriptions
on_preview: Callback for preview transcriptions
"""
self.server_url = server_url
self.api_key = api_key
self.local_engine = local_engine
self.on_transcription = on_transcription
self.on_preview = on_preview
self.client: Optional[RemoteTranscriptionClient] = None
self.use_remote = True
self.is_running = False
def _handle_transcription(self, text: str, is_preview: bool):
"""Handle transcription from remote service."""
if is_preview:
if self.on_preview:
self.on_preview(text)
else:
if self.on_transcription:
self.on_transcription(text)
def _handle_error(self, error: str):
"""Handle error from remote service."""
logger.error(f"Remote transcription error: {error}")
# Could switch to local fallback here
def _handle_connection_change(self, connected: bool):
"""Handle connection status change."""
if connected:
logger.info("Remote transcription connected")
else:
logger.warning("Remote transcription disconnected")
# Could switch to local fallback here
def start(self):
"""Start remote transcription."""
if self.is_running:
return
self.is_running = True
if self.use_remote and self.server_url and self.api_key:
self.client = RemoteTranscriptionClient(
server_url=self.server_url,
api_key=self.api_key,
on_transcription=self._handle_transcription,
on_error=self._handle_error,
on_connection_change=self._handle_connection_change
)
self.client.start()
def stop(self):
"""Stop remote transcription."""
self.is_running = False
if self.client:
self.client.stop()
self.client = None
def send_audio(self, audio_data: np.ndarray):
"""Send audio for transcription."""
if self.client and self.client.connected:
self.client.send_audio(audio_data)
elif self.local_engine:
# Fallback to local processing
pass # Local engine handles its own audio capture
@property
def is_connected(self) -> bool:
"""Check if remote service is connected."""
return self.client is not None and self.client.connected

View File

@@ -2,7 +2,9 @@
import requests
import json
from typing import Optional
import base64
from pathlib import Path
from typing import Optional, List
from datetime import datetime
import threading
import queue
@@ -10,22 +12,41 @@ from concurrent.futures import ThreadPoolExecutor
class ServerSyncClient:
"""Client for syncing transcriptions to a PHP server."""
"""Client for syncing transcriptions to a multi-user server."""
def __init__(self, url: str, room: str, passphrase: str, user_name: str):
def __init__(self, url: str, room: str, passphrase: str, user_name: str,
fonts_dir: Optional[Path] = None,
font_source: str = "None",
websafe_font: Optional[str] = None,
google_font: Optional[str] = None,
custom_font_file: Optional[str] = None):
"""
Initialize server sync client.
Args:
url: Server URL (e.g., http://example.com/transcription/server.php)
url: Server URL (e.g., http://example.com/api/send)
room: Room name
passphrase: Room passphrase
user_name: User's display name
fonts_dir: Optional directory containing custom fonts to upload
font_source: Font source type ("None", "Web-Safe", "Google Font", "Custom File")
websafe_font: Web-safe font name (e.g., "Arial", "Times New Roman")
google_font: Google Font name (e.g., "Roboto", "Open Sans")
custom_font_file: Path to a custom font file for this speaker
"""
self.url = url
self.room = room
self.passphrase = passphrase
self.user_name = user_name
self.fonts_dir = fonts_dir
self.font_source = font_source
self.websafe_font = websafe_font
self.google_font = google_font
self.custom_font_file = custom_font_file
# Font info to send with transcriptions
self.font_family: Optional[str] = None
self.font_type: Optional[str] = None # "websafe", "google", "custom"
# Queue for sending transcriptions asynchronously
self.send_queue = queue.Queue()
@@ -50,6 +71,153 @@ class ServerSyncClient:
self.send_thread.start()
print(f"Server sync started: room={self.room}")
# Set up font based on source type
if self.font_source == "Web-Safe" and self.websafe_font:
self.font_family = self.websafe_font
self.font_type = "websafe"
print(f"Using web-safe font: {self.font_family}")
elif self.font_source == "Google Font" and self.google_font:
self.font_family = self.google_font
self.font_type = "google"
print(f"Using Google Font: {self.font_family}")
elif self.font_source == "Custom File" and self.custom_font_file:
self._upload_custom_font()
# Legacy fallback: upload all fonts from fonts_dir if available
elif self.fonts_dir:
self._upload_fonts()
def _upload_custom_font(self):
"""Upload the user's custom font file to the server for per-speaker fonts."""
if not self.custom_font_file:
return
font_path = Path(self.custom_font_file)
if not font_path.exists():
print(f"Custom font file not found: {self.custom_font_file}")
return
# Validate extension
font_extensions = {'.ttf', '.otf', '.woff', '.woff2'}
if font_path.suffix.lower() not in font_extensions:
print(f"Invalid font file type: {font_path.suffix}")
return
mime_types = {
'.ttf': 'font/ttf',
'.otf': 'font/otf',
'.woff': 'font/woff',
'.woff2': 'font/woff2'
}
try:
# Read and encode font data
with open(font_path, 'rb') as f:
font_data = base64.b64encode(f.read()).decode('utf-8')
# Font family name is filename without extension
self.font_family = font_path.stem
font_filename = font_path.name
print(f"Uploading custom font: {font_filename} (family: {self.font_family})")
# Upload to server
from urllib.parse import urlparse
parsed = urlparse(self.url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
fonts_url = f"{base_url}/api/fonts"
response = requests.post(
fonts_url,
json={
'room': self.room,
'passphrase': self.passphrase,
'fonts': [{
'name': font_filename,
'data': font_data,
'mime': mime_types.get(font_path.suffix.lower(), 'font/ttf')
}]
},
timeout=30.0
)
if response.status_code == 200:
result = response.json()
self.font_type = "custom"
print(f"Custom font uploaded: {self.font_family}")
else:
print(f"Custom font upload failed: {response.status_code}")
self.font_family = None
self.font_type = None
except Exception as e:
print(f"Error uploading custom font: {e}")
self.font_family = None
self.font_type = None
def _upload_fonts(self):
"""Upload custom fonts to the server."""
if not self.fonts_dir or not self.fonts_dir.exists():
return
# Find font files
font_extensions = {'.ttf', '.otf', '.woff', '.woff2'}
font_files = [f for f in self.fonts_dir.iterdir()
if f.is_file() and f.suffix.lower() in font_extensions]
if not font_files:
return
# Prepare font data
fonts = []
mime_types = {
'.ttf': 'font/ttf',
'.otf': 'font/otf',
'.woff': 'font/woff',
'.woff2': 'font/woff2'
}
for font_file in font_files:
try:
with open(font_file, 'rb') as f:
font_data = base64.b64encode(f.read()).decode('utf-8')
fonts.append({
'name': font_file.name,
'data': font_data,
'mime': mime_types.get(font_file.suffix.lower(), 'font/ttf')
})
print(f"Prepared font for upload: {font_file.name}")
except Exception as e:
print(f"Error reading font file {font_file}: {e}")
if not fonts:
return
# Upload to server
try:
# Extract base URL for fonts endpoint
from urllib.parse import urlparse
parsed = urlparse(self.url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
fonts_url = f"{base_url}/api/fonts"
response = requests.post(
fonts_url,
json={
'room': self.room,
'passphrase': self.passphrase,
'fonts': fonts
},
timeout=30.0 # Longer timeout for font uploads
)
if response.status_code == 200:
result = response.json()
print(f"Fonts uploaded successfully: {result.get('message', '')}")
else:
print(f"Font upload failed: {response.status_code}")
except Exception as e:
print(f"Error uploading fonts: {e}")
def stop(self):
"""Stop the sync client."""
self.is_running = False
@@ -59,13 +227,14 @@ class ServerSyncClient:
self.executor.shutdown(wait=False) # Don't wait - let pending requests finish in background
print("Server sync stopped")
def send_transcription(self, text: str, timestamp: Optional[datetime] = None):
def send_transcription(self, text: str, timestamp: Optional[datetime] = None, is_preview: bool = False):
"""
Send a transcription to the server (non-blocking).
Args:
text: Transcription text
timestamp: Timestamp (defaults to now)
is_preview: Whether this is a preview transcription
"""
if timestamp is None:
timestamp = datetime.now()
@@ -78,9 +247,20 @@ class ServerSyncClient:
self.send_queue.put({
'text': text,
'timestamp': timestamp.strftime("%H:%M:%S"),
'is_preview': is_preview,
'queue_time': queue_time # For debugging
})
def send_preview(self, text: str, timestamp: Optional[datetime] = None):
"""
Send a preview transcription to the server (non-blocking).
Args:
text: Preview transcription text
timestamp: Timestamp (defaults to now)
"""
self.send_transcription(text, timestamp, is_preview=True)
def _send_loop(self):
"""Background thread for sending transcriptions."""
while self.is_running:
@@ -122,28 +302,25 @@ class ServerSyncClient:
'passphrase': self.passphrase,
'user_name': self.user_name,
'text': trans_data['text'],
'timestamp': trans_data['timestamp']
'timestamp': trans_data['timestamp'],
'is_preview': trans_data.get('is_preview', False)
}
# Detect server type and send appropriately
# PHP servers have "server.php" in URL and need ?action=send
# Node.js servers have "/api/send" in URL and don't need it
request_start = time.time()
if 'server.php' in self.url:
# PHP server - add action parameter
response = requests.post(
self.url,
params={'action': 'send'},
json=payload,
timeout=2.0 # Reduced timeout for faster failure detection
)
# Add font info if user has a custom font configured
if self.font_family:
payload['font_family'] = self.font_family
payload['font_type'] = self.font_type # "websafe", "google", or "custom"
print(f"[Server Sync] Sending with font: {self.font_family} ({self.font_type})")
else:
# Node.js server - no action parameter
response = requests.post(
self.url,
json=payload,
timeout=2.0 # Reduced timeout for faster failure detection
)
print(f"[Server Sync] No font configured (font_source={self.font_source})")
# Send to Node.js server
request_start = time.time()
response = requests.post(
self.url,
json=payload,
timeout=2.0 # Reduced timeout for faster failure detection
)
request_time = (time.time() - request_start) * 1000
print(f"[Server Sync] HTTP request: {request_time:.0f}ms, Status: {response.status_code}")

View File

@@ -29,7 +29,7 @@ class TranscriptionResult:
def __repr__(self) -> str:
time_str = self.timestamp.strftime("%H:%M:%S")
prefix = "[FINAL]" if self.is_final else "[PREVIEW]"
if self.user_name:
if self.user_name and self.user_name.strip():
return f"{prefix} [{time_str}] {self.user_name}: {self.text}"
return f"{prefix} [{time_str}] {self.text}"
@@ -63,6 +63,7 @@ class RealtimeTranscriptionEngine:
# Realtime preview settings
enable_realtime_transcription: bool = False,
realtime_model: str = "tiny.en",
realtime_processing_pause: float = 0.1, # How often to update preview (lower = more frequent)
# VAD settings
silero_sensitivity: float = 0.4,
silero_use_onnx: bool = True,
@@ -106,11 +107,21 @@ class RealtimeTranscriptionEngine:
user_name: User name for transcriptions
"""
self.model = model
self.device = device
self.language = language
self.compute_type = compute_type
# Resolve device - 'auto' means use CUDA if available, else CPU
if device == 'auto':
try:
import torch
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
except:
self.device = 'cpu'
else:
self.device = device
self.enable_realtime = enable_realtime_transcription
self.realtime_model = realtime_model
self.realtime_processing_pause = realtime_processing_pause
self.user_name = user_name
# Callbacks
@@ -131,6 +142,7 @@ class RealtimeTranscriptionEngine:
# Store configuration for recorder initialization
self.config = {
'model': model,
'device': self.device, # Use resolved device (auto -> cuda/cpu)
'language': language if language != 'auto' else None,
'compute_type': compute_type if compute_type != 'default' else 'default',
'input_device_index': input_device_index,
@@ -145,8 +157,18 @@ class RealtimeTranscriptionEngine:
'initial_prompt': initial_prompt if initial_prompt else None,
'enable_realtime_transcription': enable_realtime_transcription,
'realtime_model_type': realtime_model if enable_realtime_transcription else None,
'realtime_processing_pause': realtime_processing_pause if enable_realtime_transcription else 0.2,
# The realtime callback is added during initialize() after set_callbacks is called
}
def _is_cuda_available(self) -> bool:
"""Check if CUDA is available."""
try:
import torch
return torch.cuda.is_available()
except:
return False
def set_callbacks(
self,
realtime_callback: Optional[Callable[[TranscriptionResult], None]] = None,
@@ -198,8 +220,15 @@ class RealtimeTranscriptionEngine:
try:
print(f"Initializing RealtimeSTT with model: {self.model}")
print(f" Device: {self.device}, Compute type: {self.compute_type}")
if self.enable_realtime:
print(f" Realtime preview enabled with model: {self.realtime_model}")
print(f" Realtime processing pause: {self.realtime_processing_pause}s")
# Add realtime transcription callback if enabled
# This provides word-by-word updates as speech is being processed
if self.enable_realtime:
self.config['on_realtime_transcription_update'] = self._on_realtime_transcription
# Create recorder with configuration
self.recorder = AudioToTextRecorder(**self.config)
@@ -325,7 +354,7 @@ class RealtimeTranscriptionEngine:
Returns:
True if model changed successfully
"""
was_running = self.is_running
was_running = self.is_recording
# Stop current recording
self.stop()
@@ -355,7 +384,7 @@ class RealtimeTranscriptionEngine:
Returns:
True if device changed successfully
"""
was_running = self.is_running
was_running = self.is_recording
# Stop current recording
self.stop()
@@ -396,7 +425,7 @@ class RealtimeTranscriptionEngine:
self.config['webrtc_sensitivity'] = webrtc_sensitivity
# If running, need to restart to apply changes
if self.is_running:
if self.is_recording:
print("VAD settings updated. Restart transcription to apply changes.")
def set_user_name(self, user_name: str):
@@ -404,7 +433,7 @@ class RealtimeTranscriptionEngine:
self.user_name = user_name
def __repr__(self) -> str:
return f"RealtimeTranscriptionEngine(model={self.model}, device={self.device}, running={self.is_running})"
return f"RealtimeTranscriptionEngine(model={self.model}, device={self.device}, running={self.is_recording})"
def __del__(self):
"""Cleanup when object is destroyed."""