"""Message handler registry and routing.""" from __future__ import annotations import sys from collections.abc import Callable from voice_to_notes.ipc.messages import IPCMessage, error_message # Handler function type: takes a message, returns a response message HandlerFunc = Callable[[IPCMessage], IPCMessage | None] class HandlerRegistry: """Registry mapping message types to handler functions.""" def __init__(self) -> None: self._handlers: dict[str, HandlerFunc] = {} def register(self, message_type: str, handler: HandlerFunc) -> None: """Register a handler for a message type.""" self._handlers[message_type] = handler def handle(self, msg: IPCMessage) -> IPCMessage | None: """Route a message to its handler. Returns a response or error.""" handler = self._handlers.get(msg.type) if handler is None: print(f"[sidecar] Unknown message type: {msg.type}", file=sys.stderr, flush=True) return error_message(msg.id, "unknown_type", f"Unknown message type: {msg.type}") try: return handler(msg) except Exception as e: print(f"[sidecar] Handler error for {msg.type}: {e}", file=sys.stderr, flush=True) return error_message(msg.id, "handler_error", str(e)) def ping_handler(msg: IPCMessage) -> IPCMessage: """Simple ping handler for testing connectivity.""" return IPCMessage(id=msg.id, type="pong", payload={"echo": msg.payload}) def make_transcribe_handler() -> HandlerFunc: """Create a transcription handler with a persistent TranscribeService.""" from voice_to_notes.services.transcribe import TranscribeService, result_to_payload service = TranscribeService() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload result = service.transcribe( request_id=msg.id, file_path=payload["file"], model_name=payload.get("model", "base"), device=payload.get("device", "cpu"), compute_type=payload.get("compute_type", "int8"), language=payload.get("language"), ) return IPCMessage( id=msg.id, type="transcribe.result", payload=result_to_payload(result), ) return handler def make_diarize_handler() -> HandlerFunc: """Create a diarization handler with a persistent DiarizeService.""" from voice_to_notes.services.diarize import DiarizeService, diarization_to_payload service = DiarizeService() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload result = service.diarize( request_id=msg.id, file_path=payload["file"], num_speakers=payload.get("num_speakers"), min_speakers=payload.get("min_speakers"), max_speakers=payload.get("max_speakers"), ) return IPCMessage( id=msg.id, type="diarize.result", payload=diarization_to_payload(result), ) return handler def make_pipeline_handler() -> HandlerFunc: """Create a full pipeline handler (transcribe + diarize + merge).""" from voice_to_notes.services.pipeline import PipelineService, pipeline_result_to_payload service = PipelineService() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload result = service.run( request_id=msg.id, file_path=payload["file"], model_name=payload.get("model", "base"), device=payload.get("device", "cpu"), compute_type=payload.get("compute_type", "int8"), language=payload.get("language"), num_speakers=payload.get("num_speakers"), min_speakers=payload.get("min_speakers"), max_speakers=payload.get("max_speakers"), skip_diarization=payload.get("skip_diarization", False), ) return IPCMessage( id=msg.id, type="pipeline.result", payload=pipeline_result_to_payload(result), ) return handler def make_export_handler() -> HandlerFunc: """Create an export handler.""" from voice_to_notes.services.export import ExportService, make_export_request service = ExportService() def handler(msg: IPCMessage) -> IPCMessage: request = make_export_request(msg.payload) output_path = service.export(request) return IPCMessage( id=msg.id, type="export.result", payload={"output_path": output_path, "format": request.format}, ) return handler def make_ai_chat_handler() -> HandlerFunc: """Create an AI chat handler with persistent AIProviderService.""" from voice_to_notes.services.ai_provider import create_default_service service = create_default_service() def handler(msg: IPCMessage) -> IPCMessage: payload = msg.payload action = payload.get("action", "chat") if action == "list_providers": return IPCMessage( id=msg.id, type="ai.providers", payload={"providers": service.list_providers()}, ) if action == "set_provider": service.set_active(payload["provider"]) return IPCMessage( id=msg.id, type="ai.provider_set", payload={"provider": payload["provider"]}, ) if action == "configure": # Re-create a provider with custom settings provider_name = payload.get("provider", "") config = payload.get("config", {}) if provider_name == "local": from voice_to_notes.providers.local_provider import LocalProvider service.register_provider("local", LocalProvider( base_url=config.get("base_url", "http://localhost:8080"), model=config.get("model", "local"), )) elif provider_name == "openai": from voice_to_notes.providers.openai_provider import OpenAIProvider service.register_provider("openai", OpenAIProvider( api_key=config.get("api_key"), model=config.get("model", "gpt-4o-mini"), )) elif provider_name == "anthropic": from voice_to_notes.providers.anthropic_provider import AnthropicProvider service.register_provider("anthropic", AnthropicProvider( api_key=config.get("api_key"), model=config.get("model", "claude-sonnet-4-6"), )) elif provider_name == "litellm": from voice_to_notes.providers.litellm_provider import LiteLLMProvider service.register_provider("litellm", LiteLLMProvider( model=config.get("model", "gpt-4o-mini"), )) return IPCMessage( id=msg.id, type="ai.configured", payload={"provider": provider_name}, ) # Default: chat response = service.chat( messages=payload.get("messages", []), transcript_context=payload.get("transcript_context", ""), **{k: v for k, v in payload.items() if k not in ("action", "messages", "transcript_context")}, ) return IPCMessage( id=msg.id, type="ai.response", payload={"response": response}, ) return handler def hardware_detect_handler(msg: IPCMessage) -> IPCMessage: """Detect hardware capabilities and return recommendations.""" from voice_to_notes.hardware.detect import detect_hardware info = detect_hardware() return IPCMessage( id=msg.id, type="hardware.info", payload={ "has_cuda": info.has_cuda, "cuda_device_name": info.cuda_device_name, "vram_mb": info.vram_mb, "ram_mb": info.ram_mb, "cpu_cores": info.cpu_cores, "recommended_model": info.recommended_model, "recommended_device": info.recommended_device, "recommended_compute_type": info.recommended_compute_type, }, )