"""Message handler registry and routing.""" from __future__ import annotations import sys from collections.abc import Callable from voice_to_notes.ipc.messages import IPCMessage, error_message # Handler function type: takes a message, returns a response message HandlerFunc = Callable[[IPCMessage], IPCMessage | None] class HandlerRegistry: """Registry mapping message types to handler functions.""" def __init__(self) -> None: self._handlers: dict[str, HandlerFunc] = {} def register(self, message_type: str, handler: HandlerFunc) -> None: """Register a handler for a message type.""" self._handlers[message_type] = handler def handle(self, msg: IPCMessage) -> IPCMessage | None: """Route a message to its handler. Returns a response or error.""" handler = self._handlers.get(msg.type) if handler is None: print(f"[sidecar] Unknown message type: {msg.type}", file=sys.stderr, flush=True) return error_message(msg.id, "unknown_type", f"Unknown message type: {msg.type}") try: return handler(msg) except Exception as e: print(f"[sidecar] Handler error for {msg.type}: {e}", file=sys.stderr, flush=True) return error_message(msg.id, "handler_error", str(e)) def ping_handler(msg: IPCMessage) -> IPCMessage: """Simple ping handler for testing connectivity.""" return IPCMessage(id=msg.id, type="pong", payload={"echo": msg.payload}) def make_transcribe_handler() -> HandlerFunc: """Create a transcription handler with a persistent TranscribeService.""" from voice_to_notes.services.transcribe import TranscribeService, result_to_payload service = TranscribeService() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload result = service.transcribe( request_id=msg.id, file_path=payload["file"], model_name=payload.get("model", "base"), device=payload.get("device", "cpu"), compute_type=payload.get("compute_type", "int8"), language=payload.get("language"), ) return IPCMessage( id=msg.id, type="transcribe.result", payload=result_to_payload(result), ) return handler def make_diarize_handler() -> HandlerFunc: """Create a diarization handler with a persistent DiarizeService.""" from voice_to_notes.services.diarize import DiarizeService, diarization_to_payload service = DiarizeService() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload result = service.diarize( request_id=msg.id, file_path=payload["file"], num_speakers=payload.get("num_speakers"), min_speakers=payload.get("min_speakers"), max_speakers=payload.get("max_speakers"), ) return IPCMessage( id=msg.id, type="diarize.result", payload=diarization_to_payload(result), ) return handler def make_diarize_download_handler() -> HandlerFunc: """Create a handler that downloads/validates the diarization model.""" import os def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload hf_token = payload.get("hf_token") try: import huggingface_hub # Disable pyannote telemetry (has a bug in v4.0.4) os.environ.setdefault("PYANNOTE_METRICS_ENABLED", "false") from pyannote.audio import Pipeline # Persist token globally so ALL huggingface_hub downloads use auth. # Setting env var alone isn't enough — pyannote's internal sub-downloads # (e.g. PLDA.from_pretrained) don't forward the token= parameter. # login() writes the token to ~/.cache/huggingface/token which # huggingface_hub reads automatically for all downloads. if hf_token: os.environ["HF_TOKEN"] = hf_token huggingface_hub.login(token=hf_token, add_to_git_credential=False) # Pre-download sub-models that pyannote loads internally. # This ensures they're cached before Pipeline.from_pretrained # tries to load them (where token forwarding can fail). sub_models = [ "pyannote/segmentation-3.0", "pyannote/speaker-diarization-community-1", ] for model_id in sub_models: print(f"[sidecar] Pre-downloading {model_id}...", file=sys.stderr, flush=True) huggingface_hub.snapshot_download(model_id, token=hf_token) print("[sidecar] Downloading diarization pipeline...", file=sys.stderr, flush=True) pipeline = Pipeline.from_pretrained( "pyannote/speaker-diarization-3.1", token=hf_token, ) print("[sidecar] Diarization model downloaded successfully", file=sys.stderr, flush=True) return IPCMessage( id=msg.id, type="diarize.download.result", payload={"ok": True}, ) except Exception as e: error_msg = str(e) print(f"[sidecar] Model download error: {error_msg}", file=sys.stderr, flush=True) # Make common errors more user-friendly if "403" in error_msg or "gated" in error_msg.lower(): # Try to extract the specific model name from the error import re model_match = re.search(r"pyannote/[\w-]+", error_msg) if model_match: model_name = model_match.group(0) error_msg = ( f"Access denied for {model_name}. " f"Please visit huggingface.co/{model_name} " f"and accept the license agreement, then try again." ) else: error_msg = ( "Access denied. Please accept the license agreements for all " "required pyannote models on HuggingFace." ) elif "401" in error_msg: error_msg = "Invalid token. Please check your HuggingFace token." return error_message(msg.id, "download_error", error_msg) return handler def make_pipeline_handler() -> HandlerFunc: """Create a full pipeline handler (transcribe + diarize + merge).""" from voice_to_notes.services.pipeline import PipelineService, pipeline_result_to_payload service = PipelineService() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload result = service.run( request_id=msg.id, file_path=payload["file"], model_name=payload.get("model", "base"), device=payload.get("device", "cpu"), compute_type=payload.get("compute_type", "int8"), language=payload.get("language"), num_speakers=payload.get("num_speakers"), min_speakers=payload.get("min_speakers"), max_speakers=payload.get("max_speakers"), skip_diarization=payload.get("skip_diarization", False), hf_token=payload.get("hf_token"), ) return IPCMessage( id=msg.id, type="pipeline.result", payload=pipeline_result_to_payload(result), ) return handler def make_export_handler() -> HandlerFunc: """Create an export handler.""" from voice_to_notes.services.export import ExportService, make_export_request service = ExportService() def handler(msg: IPCMessage) -> IPCMessage: request = make_export_request(msg.payload) output_path = service.export(request) return IPCMessage( id=msg.id, type="export.result", payload={"output_path": output_path, "format": request.format}, ) return handler def make_ai_chat_handler() -> HandlerFunc: """Create an AI chat handler with persistent AIProviderService.""" from voice_to_notes.services.ai_provider import create_default_service service = create_default_service() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload action = payload.get("action", "chat") if action == "list_providers": return IPCMessage( id=msg.id, type="ai.providers", payload={"providers": service.list_providers()}, ) if action == "set_provider": service.set_active(payload["provider"]) return IPCMessage( id=msg.id, type="ai.provider_set", payload={"provider": payload["provider"]}, ) if action == "configure": # Re-create a provider with custom settings provider_name = payload.get("provider", "") config = payload.get("config", {}) if provider_name == "local": from voice_to_notes.providers.local_provider import LocalProvider service.register_provider("local", LocalProvider( base_url=config.get("base_url", "http://localhost:8080"), model=config.get("model", "local"), )) elif provider_name == "openai": from voice_to_notes.providers.openai_provider import OpenAIProvider service.register_provider("openai", OpenAIProvider( api_key=config.get("api_key"), model=config.get("model", "gpt-4o-mini"), )) elif provider_name == "anthropic": from voice_to_notes.providers.anthropic_provider import AnthropicProvider service.register_provider("anthropic", AnthropicProvider( api_key=config.get("api_key"), model=config.get("model", "claude-sonnet-4-6"), )) elif provider_name == "litellm": from voice_to_notes.providers.litellm_provider import LiteLLMProvider service.register_provider("litellm", LiteLLMProvider( model=config.get("model", "gpt-4o-mini"), )) return IPCMessage( id=msg.id, type="ai.configured", payload={"provider": provider_name}, ) # Default: chat response = service.chat( messages=payload.get("messages", []), transcript_context=payload.get("transcript_context", ""), **{k: v for k, v in payload.items() if k not in ("action", "messages", "transcript_context")}, ) return IPCMessage( id=msg.id, type="ai.response", payload={"response": response}, ) return handler def hardware_detect_handler(msg: IPCMessage) -> IPCMessage: """Detect hardware capabilities and return recommendations.""" from voice_to_notes.hardware.detect import detect_hardware info = detect_hardware() return IPCMessage( id=msg.id, type="hardware.info", payload={ "has_cuda": info.has_cuda, "cuda_device_name": info.cuda_device_name, "vram_mb": info.vram_mb, "ram_mb": info.ram_mb, "cpu_cores": info.cpu_cores, "recommended_model": info.recommended_model, "recommended_device": info.recommended_device, "recommended_compute_type": info.recommended_compute_type, }, )