Files
voice-to-notes/python/voice_to_notes/ipc/handlers.py

329 lines
12 KiB
Python
Raw Normal View History

Phase 1 foundation: Tauri shell, Python sidecar, SQLite database Tauri v2 + Svelte + TypeScript frontend: - App shell with workspace layout (waveform, transcript, speakers, AI chat) - Placeholder components for all major UI areas - Typed stores (project, transcript, playback, AI) - TypeScript interfaces matching the database schema - Tauri bridge service with typed invoke wrappers - svelte-check passes with 0 errors Rust backend: - Tauri v2 app entry point with command registration - SQLite database layer (rusqlite with bundled SQLite) - Full schema: projects, media_files, speakers, segments, words, ai_outputs, annotations (with indexes) - Model structs with serde serialization - CRUD queries for projects, speakers, segments, words - Segment text editing preserves original text - Schema versioning for future migrations - 6 tests passing - Command stubs for project, transcribe, export, AI, settings, system - App state management Python sidecar: - JSON-line IPC protocol (stdin/stdout) - Message types: IPCMessage, progress, error, ready - Handler registry with routing and error handling - Ping/pong handler for connectivity testing - Service stubs: transcribe, diarize, pipeline, AI, export - Provider stubs: local (llama-server), OpenAI, Anthropic, LiteLLM - Hardware detection stubs - 14 tests passing, ruff clean Also adds: - Testing strategy document (docs/TESTING.md) - Validation script (scripts/validate.sh) - Updated .gitignore for Svelte, Rust, Python artifacts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:16:06 -08:00
"""Message handler registry and routing."""
from __future__ import annotations
import sys
from collections.abc import Callable
from voice_to_notes.ipc.messages import IPCMessage, error_message
# Handler function type: takes a message, returns a response message
HandlerFunc = Callable[[IPCMessage], IPCMessage | None]
class HandlerRegistry:
"""Registry mapping message types to handler functions."""
def __init__(self) -> None:
self._handlers: dict[str, HandlerFunc] = {}
def register(self, message_type: str, handler: HandlerFunc) -> None:
"""Register a handler for a message type."""
self._handlers[message_type] = handler
def handle(self, msg: IPCMessage) -> IPCMessage | None:
"""Route a message to its handler. Returns a response or error."""
handler = self._handlers.get(msg.type)
if handler is None:
print(f"[sidecar] Unknown message type: {msg.type}", file=sys.stderr, flush=True)
return error_message(msg.id, "unknown_type", f"Unknown message type: {msg.type}")
try:
return handler(msg)
except Exception as e:
print(f"[sidecar] Handler error for {msg.type}: {e}", file=sys.stderr, flush=True)
return error_message(msg.id, "handler_error", str(e))
def ping_handler(msg: IPCMessage) -> IPCMessage:
"""Simple ping handler for testing connectivity."""
return IPCMessage(id=msg.id, type="pong", payload={"echo": msg.payload})
def make_transcribe_handler() -> HandlerFunc:
"""Create a transcription handler with a persistent TranscribeService."""
service = None
def handler(msg: IPCMessage) -> IPCMessage:
nonlocal service
if service is None:
from voice_to_notes.services.transcribe import TranscribeService
service = TranscribeService()
from voice_to_notes.services.transcribe import result_to_payload
payload = msg.payload
result = service.transcribe(
request_id=msg.id,
file_path=payload["file"],
model_name=payload.get("model", "base"),
device=payload.get("device", "cpu"),
compute_type=payload.get("compute_type", "int8"),
language=payload.get("language"),
)
return IPCMessage(
id=msg.id,
type="transcribe.result",
payload=result_to_payload(result),
)
return handler
def make_diarize_handler() -> HandlerFunc:
"""Create a diarization handler with a persistent DiarizeService."""
service = None
def handler(msg: IPCMessage) -> IPCMessage:
nonlocal service
if service is None:
from voice_to_notes.services.diarize import DiarizeService
service = DiarizeService()
from voice_to_notes.services.diarize import diarization_to_payload
payload = msg.payload
result = service.diarize(
request_id=msg.id,
file_path=payload["file"],
num_speakers=payload.get("num_speakers"),
min_speakers=payload.get("min_speakers"),
max_speakers=payload.get("max_speakers"),
)
return IPCMessage(
id=msg.id,
type="diarize.result",
payload=diarization_to_payload(result),
)
return handler
def make_diarize_download_handler() -> HandlerFunc:
"""Create a handler that downloads/validates the diarization model."""
import os
def handler(msg: IPCMessage) -> IPCMessage:
payload = msg.payload
hf_token = payload.get("hf_token")
try:
import huggingface_hub
# Disable pyannote telemetry (has a bug in v4.0.4)
os.environ.setdefault("PYANNOTE_METRICS_ENABLED", "false")
from pyannote.audio import Pipeline
# Persist token globally so ALL huggingface_hub downloads use auth.
# Setting env var alone isn't enough — pyannote's internal sub-downloads
# (e.g. PLDA.from_pretrained) don't forward the token= parameter.
# login() writes the token to ~/.cache/huggingface/token which
# huggingface_hub reads automatically for all downloads.
if hf_token:
os.environ["HF_TOKEN"] = hf_token
huggingface_hub.login(token=hf_token, add_to_git_credential=False)
# Pre-download sub-models that pyannote loads internally.
# This ensures they're cached before Pipeline.from_pretrained
# tries to load them (where token forwarding can fail).
sub_models = [
"pyannote/segmentation-3.0",
"pyannote/speaker-diarization-community-1",
]
for model_id in sub_models:
print(f"[sidecar] Pre-downloading {model_id}...", file=sys.stderr, flush=True)
huggingface_hub.snapshot_download(model_id, token=hf_token)
print("[sidecar] Downloading diarization pipeline...", file=sys.stderr, flush=True)
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
token=hf_token,
)
print("[sidecar] Diarization model downloaded successfully", file=sys.stderr, flush=True)
return IPCMessage(
id=msg.id,
type="diarize.download.result",
payload={"ok": True},
)
except Exception as e:
error_msg = str(e)
print(f"[sidecar] Model download error: {error_msg}", file=sys.stderr, flush=True)
# Make common errors more user-friendly
if "403" in error_msg or "gated" in error_msg.lower():
# Try to extract the specific model name from the error
import re
model_match = re.search(r"pyannote/[\w-]+", error_msg)
if model_match:
model_name = model_match.group(0)
error_msg = (
f"Access denied for {model_name}. "
f"Please visit huggingface.co/{model_name} "
f"and accept the license agreement, then try again."
)
else:
error_msg = (
"Access denied. Please accept the license agreements for all "
"required pyannote models on HuggingFace."
)
elif "401" in error_msg:
error_msg = "Invalid token. Please check your HuggingFace token."
return error_message(msg.id, "download_error", error_msg)
return handler
def make_pipeline_handler() -> HandlerFunc:
"""Create a full pipeline handler (transcribe + diarize + merge)."""
service = None
def handler(msg: IPCMessage) -> IPCMessage:
nonlocal service
if service is None:
from voice_to_notes.services.pipeline import PipelineService
service = PipelineService()
from voice_to_notes.services.pipeline import pipeline_result_to_payload
payload = msg.payload
result = service.run(
request_id=msg.id,
file_path=payload["file"],
model_name=payload.get("model", "base"),
device=payload.get("device", "cpu"),
compute_type=payload.get("compute_type", "int8"),
language=payload.get("language"),
num_speakers=payload.get("num_speakers"),
min_speakers=payload.get("min_speakers"),
max_speakers=payload.get("max_speakers"),
skip_diarization=payload.get("skip_diarization", False),
hf_token=payload.get("hf_token"),
)
return IPCMessage(
id=msg.id,
type="pipeline.result",
payload=pipeline_result_to_payload(result),
)
return handler
def make_export_handler() -> HandlerFunc:
"""Create an export handler."""
service = None
def handler(msg: IPCMessage) -> IPCMessage:
nonlocal service
if service is None:
from voice_to_notes.services.export import ExportService
service = ExportService()
from voice_to_notes.services.export import make_export_request
request = make_export_request(msg.payload)
output_path = service.export(request)
return IPCMessage(
id=msg.id,
type="export.result",
payload={"output_path": output_path, "format": request.format},
)
return handler
def make_ai_chat_handler() -> HandlerFunc:
"""Create an AI chat handler with persistent AIProviderService."""
service = None
def handler(msg: IPCMessage) -> IPCMessage:
nonlocal service
if service is None:
from voice_to_notes.services.ai_provider import create_default_service
service = create_default_service()
payload = msg.payload
action = payload.get("action", "chat")
if action == "list_providers":
return IPCMessage(
id=msg.id,
type="ai.providers",
payload={"providers": service.list_providers()},
)
if action == "set_provider":
service.set_active(payload["provider"])
return IPCMessage(
id=msg.id,
type="ai.provider_set",
payload={"provider": payload["provider"]},
)
if action == "configure":
# Re-create a provider with custom settings
provider_name = payload.get("provider", "")
config = payload.get("config", {})
if provider_name == "local":
from voice_to_notes.providers.local_provider import LocalProvider
service.register_provider("local", LocalProvider(
base_url=config.get("base_url", "http://localhost:8080"),
model=config.get("model", "local"),
))
elif provider_name == "openai":
from voice_to_notes.providers.openai_provider import OpenAIProvider
service.register_provider("openai", OpenAIProvider(
api_key=config.get("api_key"),
model=config.get("model", "gpt-4o-mini"),
))
elif provider_name == "anthropic":
from voice_to_notes.providers.anthropic_provider import AnthropicProvider
service.register_provider("anthropic", AnthropicProvider(
api_key=config.get("api_key"),
model=config.get("model", "claude-sonnet-4-6"),
))
elif provider_name == "litellm":
from voice_to_notes.providers.litellm_provider import OpenAICompatibleProvider
service.register_provider("litellm", OpenAICompatibleProvider(
model=config.get("model", "gpt-4o-mini"),
api_key=config.get("api_key"),
api_base=config.get("api_base"),
))
return IPCMessage(
id=msg.id,
type="ai.configured",
payload={"provider": provider_name},
)
# Default: chat
response = service.chat(
messages=payload.get("messages", []),
transcript_context=payload.get("transcript_context", ""),
**{k: v for k, v in payload.items() if k not in ("action", "messages", "transcript_context")},
)
return IPCMessage(
id=msg.id,
type="ai.response",
payload={"response": response},
)
return handler
def hardware_detect_handler(msg: IPCMessage) -> IPCMessage:
"""Detect hardware capabilities and return recommendations."""
from voice_to_notes.hardware.detect import detect_hardware
info = detect_hardware()
return IPCMessage(
id=msg.id,
type="hardware.info",
payload={
"has_cuda": info.has_cuda,
"cuda_device_name": info.cuda_device_name,
"vram_mb": info.vram_mb,
"ram_mb": info.ram_mb,
"cpu_cores": info.cpu_cores,
"recommended_model": info.recommended_model,
"recommended_device": info.recommended_device,
"recommended_compute_type": info.recommended_compute_type,
},
)