"""Web server for displaying transcriptions in a browser (for OBS browser source).""" import asyncio from pathlib import Path from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse, FileResponse from typing import List, Optional import json from datetime import datetime class TranscriptionWebServer: """Web server for displaying transcriptions.""" def __init__(self, host: str = "127.0.0.1", port: int = 8080, show_timestamps: bool = True, fade_after_seconds: int = 10, max_lines: int = 50, font_family: str = "Arial", font_size: int = 16, fonts_dir: Optional[Path] = None, font_source: str = "System Font", websafe_font: str = "Arial", google_font: str = "Roboto", user_color: str = "#4CAF50", text_color: str = "#FFFFFF", background_color: str = "#000000B3"): """ Initialize web server. Args: host: Server host address port: Server port show_timestamps: Whether to show timestamps in transcriptions fade_after_seconds: Time in seconds before transcriptions fade out (0 = never fade) max_lines: Maximum number of lines to display at once font_family: Font family for display (system font) font_size: Font size in pixels fonts_dir: Directory containing custom font files font_source: Font source type ("System Font", "Web-Safe", "Google Font") websafe_font: Web-safe font name google_font: Google Font name user_color: User name color (hex format) text_color: Text color (hex format) background_color: Background color (hex format with optional alpha, e.g., #RRGGBBAA) """ self.host = host self.port = port self.show_timestamps = show_timestamps self.fade_after_seconds = fade_after_seconds self.max_lines = max_lines self.font_family = font_family self.font_size = font_size self.fonts_dir = fonts_dir self.font_source = font_source self.websafe_font = websafe_font self.google_font = google_font self.user_color = user_color self.text_color = text_color self.background_color = background_color self.app = FastAPI() self.active_connections: List[WebSocket] = [] self.transcriptions = [] # Store recent transcriptions # Setup routes self._setup_routes() def _setup_routes(self): """Setup FastAPI routes.""" @self.app.get("/", response_class=HTMLResponse) async def get_display(): """Serve the transcription display page.""" return self._get_html() @self.app.get("/fonts/{font_file}") async def serve_font(font_file: str): """Serve custom font files.""" if self.fonts_dir: font_path = self.fonts_dir / font_file if font_path.exists() and font_path.suffix.lower() in {'.ttf', '.otf', '.woff', '.woff2'}: # Determine MIME type mime_types = { '.ttf': 'font/ttf', '.otf': 'font/otf', '.woff': 'font/woff', '.woff2': 'font/woff2' } media_type = mime_types.get(font_path.suffix.lower(), 'application/octet-stream') return FileResponse(font_path, media_type=media_type) return HTMLResponse(status_code=404, content="Font not found") @self.app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): """WebSocket endpoint for real-time updates.""" await websocket.accept() self.active_connections.append(websocket) try: # Send recent transcriptions for trans in self.transcriptions[-20:]: # Last 20 await websocket.send_json(trans) # Keep connection alive while True: # Wait for ping/pong to keep connection alive await websocket.receive_text() except: self.active_connections.remove(websocket) def _get_font_face_css(self) -> str: """Generate @font-face CSS rules for custom fonts.""" if not self.fonts_dir or not self.fonts_dir.exists(): return "" css_rules = [] font_extensions = {'.ttf', '.otf', '.woff', '.woff2'} format_map = { '.ttf': 'truetype', '.otf': 'opentype', '.woff': 'woff', '.woff2': 'woff2' } for font_file in self.fonts_dir.iterdir(): if font_file.suffix.lower() in font_extensions: font_name = font_file.stem font_format = format_map.get(font_file.suffix.lower(), 'truetype') css_rules.append(f""" @font-face {{ font-family: '{font_name}'; src: url('/fonts/{font_file.name}') format('{font_format}'); font-weight: normal; font-style: normal; }}""") return "\n".join(css_rules) def _get_effective_font(self) -> str: """Get the effective font family based on font_source setting.""" if self.font_source == "Google Font" and self.google_font: return self.google_font elif self.font_source == "Web-Safe" and self.websafe_font: return self.websafe_font else: return self.font_family def _get_google_font_link(self) -> str: """Generate Google Fonts link tag if using Google Font.""" if self.font_source == "Google Font" and self.google_font: font_name = self.google_font.replace(' ', '+') return f'' return "" def _hex_to_rgba(self, hex_color: str) -> str: """Convert hex color (optionally with alpha) to CSS rgba() format.""" # Remove # if present hex_color = hex_color.lstrip('#') if len(hex_color) == 8: # RRGGBBAA r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) a = int(hex_color[6:8], 16) / 255 return f"rgba({r}, {g}, {b}, {a:.2f})" elif len(hex_color) == 6: # RRGGBB r = int(hex_color[0:2], 16) g = int(hex_color[2:4], 16) b = int(hex_color[4:6], 16) return f"rgb({r}, {g}, {b})" else: return hex_color # Return as-is if format is unknown def _get_html(self) -> str: """Generate HTML for transcription display.""" # Generate custom font CSS font_face_css = self._get_font_face_css() google_font_link = self._get_google_font_link() effective_font = self._get_effective_font() # Convert background color to rgba for CSS bg_color_css = self._hex_to_rgba(self.background_color) return f""" Transcription Display {google_font_link}
""" async def broadcast_transcription(self, text: str, user_name: str = "", timestamp: Optional[datetime] = None): """ Broadcast a transcription to all connected clients. Args: text: Transcription text user_name: User/speaker name timestamp: Timestamp of transcription """ if timestamp is None: timestamp = datetime.now() trans_data = { "text": text, "user_name": user_name, } # Only include timestamp if enabled if self.show_timestamps: trans_data["timestamp"] = timestamp.strftime("%H:%M:%S") # Store transcription self.transcriptions.append(trans_data) if len(self.transcriptions) > 100: self.transcriptions.pop(0) # Broadcast to all connected clients disconnected = [] for connection in self.active_connections: try: await connection.send_json(trans_data) except: disconnected.append(connection) # Remove disconnected clients for conn in disconnected: self.active_connections.remove(conn) async def broadcast_preview(self, text: str, user_name: str = "", timestamp: Optional[datetime] = None): """ Broadcast a preview transcription to all connected clients. Preview transcriptions are shown in italics and will be replaced by final. Args: text: Preview transcription text user_name: User/speaker name timestamp: Timestamp of transcription """ if timestamp is None: timestamp = datetime.now() trans_data = { "text": text, "user_name": user_name, "is_preview": True, # Flag to indicate this is a preview } # Only include timestamp if enabled if self.show_timestamps: trans_data["timestamp"] = timestamp.strftime("%H:%M:%S") # Don't store previews in transcriptions list (they're temporary) # Broadcast to all connected clients disconnected = [] for connection in self.active_connections: try: await connection.send_json(trans_data) except: disconnected.append(connection) # Remove disconnected clients for conn in disconnected: self.active_connections.remove(conn) async def start(self): """Start the web server.""" import uvicorn import logging # Configure uvicorn to work without console (for PyInstaller builds) # Suppress uvicorn's default console logging logging.getLogger("uvicorn").setLevel(logging.ERROR) logging.getLogger("uvicorn.access").setLevel(logging.ERROR) logging.getLogger("uvicorn.error").setLevel(logging.ERROR) config = uvicorn.Config( self.app, host=self.host, port=self.port, log_level="error", # Only log errors access_log=False, # Disable access logging log_config=None # Don't use default logging config ) server = uvicorn.Server(config) await server.serve()