Files
local-transcription/server/web_display.py

476 lines
17 KiB
Python
Raw Normal View History

"""Web server for displaying transcriptions in a browser (for OBS browser source)."""
import asyncio
from pathlib import Path
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse, FileResponse
from typing import List, Optional
import json
from datetime import datetime
class TranscriptionWebServer:
"""Web server for displaying transcriptions."""
def __init__(self, host: str = "127.0.0.1", port: int = 8080, show_timestamps: bool = True,
fade_after_seconds: int = 10, max_lines: int = 50, font_family: str = "Arial",
font_size: int = 16, fonts_dir: Optional[Path] = None,
font_source: str = "System Font", websafe_font: str = "Arial",
google_font: str = "Roboto",
user_color: str = "#4CAF50", text_color: str = "#FFFFFF",
background_color: str = "#000000B3"):
"""
Initialize web server.
Args:
host: Server host address
port: Server port
show_timestamps: Whether to show timestamps in transcriptions
fade_after_seconds: Time in seconds before transcriptions fade out (0 = never fade)
max_lines: Maximum number of lines to display at once
font_family: Font family for display (system font)
font_size: Font size in pixels
fonts_dir: Directory containing custom font files
font_source: Font source type ("System Font", "Web-Safe", "Google Font")
websafe_font: Web-safe font name
google_font: Google Font name
user_color: User name color (hex format)
text_color: Text color (hex format)
background_color: Background color (hex format with optional alpha, e.g., #RRGGBBAA)
"""
self.host = host
self.port = port
self.show_timestamps = show_timestamps
self.fade_after_seconds = fade_after_seconds
self.max_lines = max_lines
self.font_family = font_family
self.font_size = font_size
self.fonts_dir = fonts_dir
self.font_source = font_source
self.websafe_font = websafe_font
self.google_font = google_font
self.user_color = user_color
self.text_color = text_color
self.background_color = background_color
self.app = FastAPI()
self.active_connections: List[WebSocket] = []
self.transcriptions = [] # Store recent transcriptions
# Setup routes
self._setup_routes()
def _setup_routes(self):
"""Setup FastAPI routes."""
@self.app.get("/", response_class=HTMLResponse)
async def get_display():
"""Serve the transcription display page."""
return self._get_html()
@self.app.get("/fonts/{font_file}")
async def serve_font(font_file: str):
"""Serve custom font files."""
if self.fonts_dir:
font_path = self.fonts_dir / font_file
if font_path.exists() and font_path.suffix.lower() in {'.ttf', '.otf', '.woff', '.woff2'}:
# Determine MIME type
mime_types = {
'.ttf': 'font/ttf',
'.otf': 'font/otf',
'.woff': 'font/woff',
'.woff2': 'font/woff2'
}
media_type = mime_types.get(font_path.suffix.lower(), 'application/octet-stream')
return FileResponse(font_path, media_type=media_type)
return HTMLResponse(status_code=404, content="Font not found")
@self.app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time updates."""
await websocket.accept()
self.active_connections.append(websocket)
try:
# Send recent transcriptions
for trans in self.transcriptions[-20:]: # Last 20
await websocket.send_json(trans)
# Keep connection alive
while True:
# Wait for ping/pong to keep connection alive
await websocket.receive_text()
except:
self.active_connections.remove(websocket)
def _get_font_face_css(self) -> str:
"""Generate @font-face CSS rules for custom fonts."""
if not self.fonts_dir or not self.fonts_dir.exists():
return ""
css_rules = []
font_extensions = {'.ttf', '.otf', '.woff', '.woff2'}
format_map = {
'.ttf': 'truetype',
'.otf': 'opentype',
'.woff': 'woff',
'.woff2': 'woff2'
}
for font_file in self.fonts_dir.iterdir():
if font_file.suffix.lower() in font_extensions:
font_name = font_file.stem
font_format = format_map.get(font_file.suffix.lower(), 'truetype')
css_rules.append(f"""
@font-face {{
font-family: '{font_name}';
src: url('/fonts/{font_file.name}') format('{font_format}');
font-weight: normal;
font-style: normal;
}}""")
return "\n".join(css_rules)
def _get_effective_font(self) -> str:
"""Get the effective font family based on font_source setting."""
if self.font_source == "Google Font" and self.google_font:
return self.google_font
elif self.font_source == "Web-Safe" and self.websafe_font:
return self.websafe_font
else:
return self.font_family
def _get_google_font_link(self) -> str:
"""Generate Google Fonts link tag if using Google Font."""
if self.font_source == "Google Font" and self.google_font:
font_name = self.google_font.replace(' ', '+')
return f'<link rel="stylesheet" href="https://fonts.googleapis.com/css2?family={font_name}&display=swap">'
return ""
def _hex_to_rgba(self, hex_color: str) -> str:
"""Convert hex color (optionally with alpha) to CSS rgba() format."""
# Remove # if present
hex_color = hex_color.lstrip('#')
if len(hex_color) == 8: # RRGGBBAA
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
a = int(hex_color[6:8], 16) / 255
return f"rgba({r}, {g}, {b}, {a:.2f})"
elif len(hex_color) == 6: # RRGGBB
r = int(hex_color[0:2], 16)
g = int(hex_color[2:4], 16)
b = int(hex_color[4:6], 16)
return f"rgb({r}, {g}, {b})"
else:
return hex_color # Return as-is if format is unknown
def _get_html(self) -> str:
"""Generate HTML for transcription display."""
# Generate custom font CSS
font_face_css = self._get_font_face_css()
google_font_link = self._get_google_font_link()
effective_font = self._get_effective_font()
# Convert background color to rgba for CSS
bg_color_css = self._hex_to_rgba(self.background_color)
return f"""
<!DOCTYPE html>
<html>
<head>
<title>Transcription Display</title>
{google_font_link}
<style>
{font_face_css}
body {{
margin: 0;
padding: 20px;
background: transparent;
font-family: '{effective_font}', sans-serif;
font-size: {self.font_size}px;
color: white;
overflow: hidden;
}}
#transcriptions {{
overflow: hidden;
}}
.transcription {{
margin: 10px 0;
padding: 10px;
background: {bg_color_css};
border-radius: 5px;
animation: slideIn 0.3s ease-out;
transition: opacity 1s ease-out;
}}
.transcription.fading {{
opacity: 0;
}}
.timestamp {{
color: #888;
font-size: 0.9em;
margin-right: 10px;
}}
.user {{
color: {self.user_color};
font-weight: bold;
margin-right: 10px;
}}
.text {{
color: {self.text_color};
}}
.transcription.preview {{
font-style: italic;
}}
.preview-indicator {{
color: #888;
font-size: 0.85em;
margin-right: 5px;
}}
@keyframes slideIn {{
from {{
opacity: 0;
transform: translateY(-10px);
}}
to {{
opacity: 1;
transform: translateY(0);
}}
}}
</style>
</head>
<body>
<div id="transcriptions"></div>
<script>
const container = document.getElementById('transcriptions');
const ws = new WebSocket(`ws://${{window.location.host}}/ws`);
const fadeAfterSeconds = {self.fade_after_seconds};
const maxLines = {self.max_lines};
let currentPreviewElement = null;
ws.onmessage = (event) => {{
const data = JSON.parse(event.data);
if (data.is_preview) {{
handlePreview(data);
}} else {{
addTranscription(data);
}}
}};
ws.onclose = () => {{
console.log('WebSocket closed. Attempting to reconnect...');
setTimeout(() => location.reload(), 3000);
}};
// Send keepalive pings
setInterval(() => {{
if (ws.readyState === WebSocket.OPEN) {{
ws.send('ping');
}}
}}, 30000);
function handlePreview(data) {{
// If there's already a preview, update it
if (currentPreviewElement) {{
updatePreviewContent(currentPreviewElement, data);
}} else {{
// Create new preview element
currentPreviewElement = createTranscriptionElement(data, true);
container.appendChild(currentPreviewElement);
}}
// Enforce max lines limit
while (container.children.length > maxLines) {{
const first = container.firstChild;
if (first === currentPreviewElement) break; // Don't remove current preview
container.removeChild(first);
}}
}}
function updatePreviewContent(element, data) {{
let html = '';
if (data.timestamp) {{
html += `<span class="timestamp">[${{data.timestamp}}]</span>`;
}}
if (data.user_name && data.user_name.trim()) {{
html += `<span class="user">${{data.user_name}}:</span>`;
}}
html += `<span class="preview-indicator">[...]</span>`;
html += `<span class="text">${{data.text}}</span>`;
element.innerHTML = html;
}}
function createTranscriptionElement(data, isPreview) {{
const div = document.createElement('div');
div.className = isPreview ? 'transcription preview' : 'transcription';
let html = '';
if (data.timestamp) {{
html += `<span class="timestamp">[${{data.timestamp}}]</span>`;
}}
if (data.user_name && data.user_name.trim()) {{
html += `<span class="user">${{data.user_name}}:</span>`;
}}
if (isPreview) {{
html += `<span class="preview-indicator">[...]</span>`;
}}
html += `<span class="text">${{data.text}}</span>`;
div.innerHTML = html;
return div;
}}
function addTranscription(data) {{
// If there's a preview, replace it with final transcription
if (currentPreviewElement) {{
currentPreviewElement.className = 'transcription';
let html = '';
if (data.timestamp) {{
html += `<span class="timestamp">[${{data.timestamp}}]</span>`;
}}
if (data.user_name && data.user_name.trim()) {{
html += `<span class="user">${{data.user_name}}:</span>`;
}}
html += `<span class="text">${{data.text}}</span>`;
currentPreviewElement.innerHTML = html;
// Set up fade-out for the final transcription
if (fadeAfterSeconds > 0) {{
setupFadeOut(currentPreviewElement);
}}
currentPreviewElement = null;
}} else {{
// No preview to replace, add new element
const div = createTranscriptionElement(data, false);
container.appendChild(div);
// Set up fade-out if enabled
if (fadeAfterSeconds > 0) {{
setupFadeOut(div);
}}
}}
// Enforce max lines limit
while (container.children.length > maxLines) {{
container.removeChild(container.firstChild);
}}
}}
function setupFadeOut(element) {{
setTimeout(() => {{
// Start fade animation
element.classList.add('fading');
// Remove element after fade completes
setTimeout(() => {{
if (element.parentNode === container) {{
container.removeChild(element);
}}
}}, 1000); // Match the CSS transition duration
}}, fadeAfterSeconds * 1000);
}}
</script>
</body>
</html>
"""
async def broadcast_transcription(self, text: str, user_name: str = "", timestamp: Optional[datetime] = None):
"""
Broadcast a transcription to all connected clients.
Args:
text: Transcription text
user_name: User/speaker name
timestamp: Timestamp of transcription
"""
if timestamp is None:
timestamp = datetime.now()
trans_data = {
"text": text,
"user_name": user_name,
}
# Only include timestamp if enabled
if self.show_timestamps:
trans_data["timestamp"] = timestamp.strftime("%H:%M:%S")
# Store transcription
self.transcriptions.append(trans_data)
if len(self.transcriptions) > 100:
self.transcriptions.pop(0)
# Broadcast to all connected clients
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(trans_data)
except:
disconnected.append(connection)
# Remove disconnected clients
for conn in disconnected:
self.active_connections.remove(conn)
async def broadcast_preview(self, text: str, user_name: str = "", timestamp: Optional[datetime] = None):
"""
Broadcast a preview transcription to all connected clients.
Preview transcriptions are shown in italics and will be replaced by final.
Args:
text: Preview transcription text
user_name: User/speaker name
timestamp: Timestamp of transcription
"""
if timestamp is None:
timestamp = datetime.now()
trans_data = {
"text": text,
"user_name": user_name,
"is_preview": True, # Flag to indicate this is a preview
}
# Only include timestamp if enabled
if self.show_timestamps:
trans_data["timestamp"] = timestamp.strftime("%H:%M:%S")
# Don't store previews in transcriptions list (they're temporary)
# Broadcast to all connected clients
disconnected = []
for connection in self.active_connections:
try:
await connection.send_json(trans_data)
except:
disconnected.append(connection)
# Remove disconnected clients
for conn in disconnected:
self.active_connections.remove(conn)
async def start(self):
"""Start the web server."""
import uvicorn
import logging
# Configure uvicorn to work without console (for PyInstaller builds)
# Suppress uvicorn's default console logging
logging.getLogger("uvicorn").setLevel(logging.ERROR)
logging.getLogger("uvicorn.access").setLevel(logging.ERROR)
logging.getLogger("uvicorn.error").setLevel(logging.ERROR)
config = uvicorn.Config(
self.app,
host=self.host,
port=self.port,
log_level="error", # Only log errors
access_log=False, # Disable access logging
log_config=None # Don't use default logging config
)
server = uvicorn.Server(config)
await server.serve()