Files
local-transcription/client/server_sync.py

389 lines
14 KiB
Python
Raw Permalink Normal View History

"""Server sync client for multi-user transcription."""
import requests
import json
import base64
from pathlib import Path
from typing import Optional, List
from datetime import datetime
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
class ServerSyncClient:
"""Client for syncing transcriptions to a multi-user server."""
def __init__(self, url: str, room: str, passphrase: str, user_name: str,
fonts_dir: Optional[Path] = None,
font_source: str = "None",
websafe_font: Optional[str] = None,
google_font: Optional[str] = None,
custom_font_file: Optional[str] = None,
user_color: str = "#4CAF50",
text_color: str = "#FFFFFF",
background_color: str = "#000000B3"):
"""
Initialize server sync client.
Args:
url: Server URL (e.g., http://example.com/api/send)
room: Room name
passphrase: Room passphrase
user_name: User's display name
fonts_dir: Optional directory containing custom fonts to upload
font_source: Font source type ("None", "Web-Safe", "Google Font", "Custom File")
websafe_font: Web-safe font name (e.g., "Arial", "Times New Roman")
google_font: Google Font name (e.g., "Roboto", "Open Sans")
custom_font_file: Path to a custom font file for this speaker
user_color: User name color (hex format)
text_color: Text color (hex format)
background_color: Background color (hex format with optional alpha)
"""
self.url = url
self.room = room
self.passphrase = passphrase
self.user_name = user_name
self.fonts_dir = fonts_dir
self.font_source = font_source
self.websafe_font = websafe_font
self.google_font = google_font
self.custom_font_file = custom_font_file
self.user_color = user_color
self.text_color = text_color
self.background_color = background_color
# Font info to send with transcriptions
self.font_family: Optional[str] = None
self.font_type: Optional[str] = None # "websafe", "google", "custom"
# Queue for sending transcriptions asynchronously
self.send_queue = queue.Queue()
self.is_running = False
self.send_thread: Optional[threading.Thread] = None
# Thread pool for parallel HTTP requests (max 3 concurrent)
self.executor = ThreadPoolExecutor(max_workers=3)
# Statistics
self.sent_count = 0
self.error_count = 0
self.last_error: Optional[str] = None
def start(self):
"""Start the sync client."""
if self.is_running:
return
self.is_running = True
self.send_thread = threading.Thread(target=self._send_loop, daemon=True)
self.send_thread.start()
print(f"Server sync started: room={self.room}")
# Set up font based on source type
if self.font_source == "Web-Safe" and self.websafe_font:
self.font_family = self.websafe_font
self.font_type = "websafe"
print(f"Using web-safe font: {self.font_family}")
elif self.font_source == "Google Font" and self.google_font:
self.font_family = self.google_font
self.font_type = "google"
print(f"Using Google Font: {self.font_family}")
elif self.font_source == "Custom File" and self.custom_font_file:
self._upload_custom_font()
# Legacy fallback: upload all fonts from fonts_dir if available
elif self.fonts_dir:
self._upload_fonts()
def _upload_custom_font(self):
"""Upload the user's custom font file to the server for per-speaker fonts."""
if not self.custom_font_file:
return
font_path = Path(self.custom_font_file)
if not font_path.exists():
print(f"Custom font file not found: {self.custom_font_file}")
return
# Validate extension
font_extensions = {'.ttf', '.otf', '.woff', '.woff2'}
if font_path.suffix.lower() not in font_extensions:
print(f"Invalid font file type: {font_path.suffix}")
return
mime_types = {
'.ttf': 'font/ttf',
'.otf': 'font/otf',
'.woff': 'font/woff',
'.woff2': 'font/woff2'
}
try:
# Read and encode font data
with open(font_path, 'rb') as f:
font_data = base64.b64encode(f.read()).decode('utf-8')
# Font family name is filename without extension
self.font_family = font_path.stem
font_filename = font_path.name
print(f"Uploading custom font: {font_filename} (family: {self.font_family})")
# Upload to server
from urllib.parse import urlparse
parsed = urlparse(self.url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
fonts_url = f"{base_url}/api/fonts"
response = requests.post(
fonts_url,
json={
'room': self.room,
'passphrase': self.passphrase,
'fonts': [{
'name': font_filename,
'data': font_data,
'mime': mime_types.get(font_path.suffix.lower(), 'font/ttf')
}]
},
timeout=30.0
)
if response.status_code == 200:
result = response.json()
self.font_type = "custom"
print(f"Custom font uploaded: {self.font_family}")
else:
print(f"Custom font upload failed: {response.status_code}")
self.font_family = None
self.font_type = None
except Exception as e:
print(f"Error uploading custom font: {e}")
self.font_family = None
self.font_type = None
def _upload_fonts(self):
"""Upload custom fonts to the server."""
if not self.fonts_dir or not self.fonts_dir.exists():
return
# Find font files
font_extensions = {'.ttf', '.otf', '.woff', '.woff2'}
font_files = [f for f in self.fonts_dir.iterdir()
if f.is_file() and f.suffix.lower() in font_extensions]
if not font_files:
return
# Prepare font data
fonts = []
mime_types = {
'.ttf': 'font/ttf',
'.otf': 'font/otf',
'.woff': 'font/woff',
'.woff2': 'font/woff2'
}
for font_file in font_files:
try:
with open(font_file, 'rb') as f:
font_data = base64.b64encode(f.read()).decode('utf-8')
fonts.append({
'name': font_file.name,
'data': font_data,
'mime': mime_types.get(font_file.suffix.lower(), 'font/ttf')
})
print(f"Prepared font for upload: {font_file.name}")
except Exception as e:
print(f"Error reading font file {font_file}: {e}")
if not fonts:
return
# Upload to server
try:
# Extract base URL for fonts endpoint
from urllib.parse import urlparse
parsed = urlparse(self.url)
base_url = f"{parsed.scheme}://{parsed.netloc}"
fonts_url = f"{base_url}/api/fonts"
response = requests.post(
fonts_url,
json={
'room': self.room,
'passphrase': self.passphrase,
'fonts': fonts
},
timeout=30.0 # Longer timeout for font uploads
)
if response.status_code == 200:
result = response.json()
print(f"Fonts uploaded successfully: {result.get('message', '')}")
else:
print(f"Font upload failed: {response.status_code}")
except Exception as e:
print(f"Error uploading fonts: {e}")
def stop(self):
"""Stop the sync client."""
self.is_running = False
if self.send_thread:
self.send_thread.join(timeout=2.0)
# Shutdown executor and wait for pending requests
self.executor.shutdown(wait=False) # Don't wait - let pending requests finish in background
print("Server sync stopped")
def send_transcription(self, text: str, timestamp: Optional[datetime] = None, is_preview: bool = False):
"""
Send a transcription to the server (non-blocking).
Args:
text: Transcription text
timestamp: Timestamp (defaults to now)
is_preview: Whether this is a preview transcription
"""
if timestamp is None:
timestamp = datetime.now()
# Debug: Log when transcription is queued
import time
queue_time = time.time()
# Add to queue
self.send_queue.put({
'text': text,
'timestamp': timestamp.strftime("%H:%M:%S"),
'is_preview': is_preview,
'queue_time': queue_time # For debugging
})
def send_preview(self, text: str, timestamp: Optional[datetime] = None):
"""
Send a preview transcription to the server (non-blocking).
Args:
text: Preview transcription text
timestamp: Timestamp (defaults to now)
"""
self.send_transcription(text, timestamp, is_preview=True)
def _send_loop(self):
"""Background thread for sending transcriptions."""
while self.is_running:
try:
# Get transcription from queue (with shorter timeout for responsiveness)
try:
trans_data = self.send_queue.get(timeout=0.1)
except queue.Empty:
continue
# Send to server in parallel using thread pool
# This allows multiple requests to be in-flight simultaneously
self.executor.submit(self._send_to_server, trans_data)
except Exception as e:
print(f"Error in server sync send loop: {e}")
self.error_count += 1
self.last_error = str(e)
def _send_to_server(self, trans_data: dict):
"""
Send a transcription to the server (PHP or Node.js).
Args:
trans_data: Dictionary with 'text' and 'timestamp'
"""
import time
send_start = time.time()
try:
# Debug: Calculate queue delay
if 'queue_time' in trans_data:
queue_delay = (send_start - trans_data['queue_time']) * 1000
print(f"[Server Sync] Queue delay: {queue_delay:.0f}ms")
# Prepare payload
payload = {
'room': self.room,
'passphrase': self.passphrase,
'user_name': self.user_name,
'text': trans_data['text'],
'timestamp': trans_data['timestamp'],
'is_preview': trans_data.get('is_preview', False),
# Always include user's color settings
'user_color': self.user_color,
'text_color': self.text_color,
'background_color': self.background_color
}
# Add font info if user has a custom font configured
if self.font_family:
payload['font_family'] = self.font_family
payload['font_type'] = self.font_type # "websafe", "google", or "custom"
print(f"[Server Sync] Sending with font: {self.font_family} ({self.font_type})")
else:
print(f"[Server Sync] No font configured (font_source={self.font_source})")
# Send to Node.js server
request_start = time.time()
response = requests.post(
self.url,
json=payload,
timeout=2.0 # Reduced timeout for faster failure detection
)
request_time = (time.time() - request_start) * 1000
print(f"[Server Sync] HTTP request: {request_time:.0f}ms, Status: {response.status_code}")
# Check response
if response.status_code == 200:
self.sent_count += 1
self.last_error = None
else:
error_msg = f"Server returned {response.status_code}"
try:
error_data = response.json()
if 'error' in error_data:
error_msg = error_data['error']
except:
pass
print(f"Server sync error: {error_msg}")
self.error_count += 1
self.last_error = error_msg
except requests.exceptions.Timeout:
print("Server sync timeout")
self.error_count += 1
self.last_error = "Request timeout"
except requests.exceptions.ConnectionError:
print("Server sync connection error")
self.error_count += 1
self.last_error = "Connection error"
except Exception as e:
print(f"Server sync error: {e}")
self.error_count += 1
self.last_error = str(e)
def get_stats(self) -> dict:
"""Get sync statistics."""
return {
'sent': self.sent_count,
'errors': self.error_count,
'last_error': self.last_error,
'queue_size': self.send_queue.qsize()
}
def is_healthy(self) -> bool:
"""Check if sync is working (no recent errors)."""
# Consider healthy if less than 10% error rate
total = self.sent_count + self.error_count
if total == 0:
return True
return (self.error_count / total) < 0.1