"""Server sync client for multi-user transcription.""" import requests import json from typing import Optional from datetime import datetime import threading import queue from concurrent.futures import ThreadPoolExecutor class ServerSyncClient: """Client for syncing transcriptions to a PHP server.""" def __init__(self, url: str, room: str, passphrase: str, user_name: str): """ Initialize server sync client. Args: url: Server URL (e.g., http://example.com/transcription/server.php) room: Room name passphrase: Room passphrase user_name: User's display name """ self.url = url self.room = room self.passphrase = passphrase self.user_name = user_name # Queue for sending transcriptions asynchronously self.send_queue = queue.Queue() self.is_running = False self.send_thread: Optional[threading.Thread] = None # Thread pool for parallel HTTP requests (max 3 concurrent) self.executor = ThreadPoolExecutor(max_workers=3) # Statistics self.sent_count = 0 self.error_count = 0 self.last_error: Optional[str] = None def start(self): """Start the sync client.""" if self.is_running: return self.is_running = True self.send_thread = threading.Thread(target=self._send_loop, daemon=True) self.send_thread.start() print(f"Server sync started: room={self.room}") def stop(self): """Stop the sync client.""" self.is_running = False if self.send_thread: self.send_thread.join(timeout=2.0) # Shutdown executor and wait for pending requests self.executor.shutdown(wait=False) # Don't wait - let pending requests finish in background print("Server sync stopped") def send_transcription(self, text: str, timestamp: Optional[datetime] = None): """ Send a transcription to the server (non-blocking). Args: text: Transcription text timestamp: Timestamp (defaults to now) """ if timestamp is None: timestamp = datetime.now() # Debug: Log when transcription is queued import time queue_time = time.time() # Add to queue self.send_queue.put({ 'text': text, 'timestamp': timestamp.strftime("%H:%M:%S"), 'queue_time': queue_time # For debugging }) def _send_loop(self): """Background thread for sending transcriptions.""" while self.is_running: try: # Get transcription from queue (with shorter timeout for responsiveness) try: trans_data = self.send_queue.get(timeout=0.1) except queue.Empty: continue # Send to server in parallel using thread pool # This allows multiple requests to be in-flight simultaneously self.executor.submit(self._send_to_server, trans_data) except Exception as e: print(f"Error in server sync send loop: {e}") self.error_count += 1 self.last_error = str(e) def _send_to_server(self, trans_data: dict): """ Send a transcription to the server (PHP or Node.js). Args: trans_data: Dictionary with 'text' and 'timestamp' """ import time send_start = time.time() try: # Debug: Calculate queue delay if 'queue_time' in trans_data: queue_delay = (send_start - trans_data['queue_time']) * 1000 print(f"[Server Sync] Queue delay: {queue_delay:.0f}ms") # Prepare payload payload = { 'room': self.room, 'passphrase': self.passphrase, 'user_name': self.user_name, 'text': trans_data['text'], 'timestamp': trans_data['timestamp'] } # Detect server type and send appropriately # PHP servers have "server.php" in URL and need ?action=send # Node.js servers have "/api/send" in URL and don't need it request_start = time.time() if 'server.php' in self.url: # PHP server - add action parameter response = requests.post( self.url, params={'action': 'send'}, json=payload, timeout=2.0 # Reduced timeout for faster failure detection ) else: # Node.js server - no action parameter response = requests.post( self.url, json=payload, timeout=2.0 # Reduced timeout for faster failure detection ) request_time = (time.time() - request_start) * 1000 print(f"[Server Sync] HTTP request: {request_time:.0f}ms, Status: {response.status_code}") # Check response if response.status_code == 200: self.sent_count += 1 self.last_error = None else: error_msg = f"Server returned {response.status_code}" try: error_data = response.json() if 'error' in error_data: error_msg = error_data['error'] except: pass print(f"Server sync error: {error_msg}") self.error_count += 1 self.last_error = error_msg except requests.exceptions.Timeout: print("Server sync timeout") self.error_count += 1 self.last_error = "Request timeout" except requests.exceptions.ConnectionError: print("Server sync connection error") self.error_count += 1 self.last_error = "Connection error" except Exception as e: print(f"Server sync error: {e}") self.error_count += 1 self.last_error = str(e) def get_stats(self) -> dict: """Get sync statistics.""" return { 'sent': self.sent_count, 'errors': self.error_count, 'last_error': self.last_error, 'queue_size': self.send_queue.qsize() } def is_healthy(self) -> bool: """Check if sync is working (no recent errors).""" # Consider healthy if less than 10% error rate total = self.sent_count + self.error_count if total == 0: return True return (self.error_count / total) < 0.1