Files

199 lines
6.6 KiB
Python
Raw Permalink Normal View History

"""Server sync client for multi-user transcription."""
import requests
import json
from typing import Optional
from datetime import datetime
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
class ServerSyncClient:
"""Client for syncing transcriptions to a PHP server."""
def __init__(self, url: str, room: str, passphrase: str, user_name: str):
"""
Initialize server sync client.
Args:
url: Server URL (e.g., http://example.com/transcription/server.php)
room: Room name
passphrase: Room passphrase
user_name: User's display name
"""
self.url = url
self.room = room
self.passphrase = passphrase
self.user_name = user_name
# Queue for sending transcriptions asynchronously
self.send_queue = queue.Queue()
self.is_running = False
self.send_thread: Optional[threading.Thread] = None
# Thread pool for parallel HTTP requests (max 3 concurrent)
self.executor = ThreadPoolExecutor(max_workers=3)
# Statistics
self.sent_count = 0
self.error_count = 0
self.last_error: Optional[str] = None
def start(self):
"""Start the sync client."""
if self.is_running:
return
self.is_running = True
self.send_thread = threading.Thread(target=self._send_loop, daemon=True)
self.send_thread.start()
print(f"Server sync started: room={self.room}")
def stop(self):
"""Stop the sync client."""
self.is_running = False
if self.send_thread:
self.send_thread.join(timeout=2.0)
# Shutdown executor and wait for pending requests
self.executor.shutdown(wait=False) # Don't wait - let pending requests finish in background
print("Server sync stopped")
def send_transcription(self, text: str, timestamp: Optional[datetime] = None):
"""
Send a transcription to the server (non-blocking).
Args:
text: Transcription text
timestamp: Timestamp (defaults to now)
"""
if timestamp is None:
timestamp = datetime.now()
# Debug: Log when transcription is queued
import time
queue_time = time.time()
# Add to queue
self.send_queue.put({
'text': text,
'timestamp': timestamp.strftime("%H:%M:%S"),
'queue_time': queue_time # For debugging
})
def _send_loop(self):
"""Background thread for sending transcriptions."""
while self.is_running:
try:
# Get transcription from queue (with shorter timeout for responsiveness)
try:
trans_data = self.send_queue.get(timeout=0.1)
except queue.Empty:
continue
# Send to server in parallel using thread pool
# This allows multiple requests to be in-flight simultaneously
self.executor.submit(self._send_to_server, trans_data)
except Exception as e:
print(f"Error in server sync send loop: {e}")
self.error_count += 1
self.last_error = str(e)
def _send_to_server(self, trans_data: dict):
"""
Send a transcription to the server (PHP or Node.js).
Args:
trans_data: Dictionary with 'text' and 'timestamp'
"""
import time
send_start = time.time()
try:
# Debug: Calculate queue delay
if 'queue_time' in trans_data:
queue_delay = (send_start - trans_data['queue_time']) * 1000
print(f"[Server Sync] Queue delay: {queue_delay:.0f}ms")
# Prepare payload
payload = {
'room': self.room,
'passphrase': self.passphrase,
'user_name': self.user_name,
'text': trans_data['text'],
'timestamp': trans_data['timestamp']
}
# Detect server type and send appropriately
# PHP servers have "server.php" in URL and need ?action=send
# Node.js servers have "/api/send" in URL and don't need it
request_start = time.time()
if 'server.php' in self.url:
# PHP server - add action parameter
response = requests.post(
self.url,
params={'action': 'send'},
json=payload,
timeout=2.0 # Reduced timeout for faster failure detection
)
else:
# Node.js server - no action parameter
response = requests.post(
self.url,
json=payload,
timeout=2.0 # Reduced timeout for faster failure detection
)
request_time = (time.time() - request_start) * 1000
print(f"[Server Sync] HTTP request: {request_time:.0f}ms, Status: {response.status_code}")
# Check response
if response.status_code == 200:
self.sent_count += 1
self.last_error = None
else:
error_msg = f"Server returned {response.status_code}"
try:
error_data = response.json()
if 'error' in error_data:
error_msg = error_data['error']
except:
pass
print(f"Server sync error: {error_msg}")
self.error_count += 1
self.last_error = error_msg
except requests.exceptions.Timeout:
print("Server sync timeout")
self.error_count += 1
self.last_error = "Request timeout"
except requests.exceptions.ConnectionError:
print("Server sync connection error")
self.error_count += 1
self.last_error = "Connection error"
except Exception as e:
print(f"Server sync error: {e}")
self.error_count += 1
self.last_error = str(e)
def get_stats(self) -> dict:
"""Get sync statistics."""
return {
'sent': self.sent_count,
'errors': self.error_count,
'last_error': self.last_error,
'queue_size': self.send_queue.qsize()
}
def is_healthy(self) -> bool:
"""Check if sync is working (no recent errors)."""
# Consider healthy if less than 10% error rate
total = self.sent_count + self.error_count
if total == 0:
return True
return (self.error_count / total) < 0.1