Files
voice-to-notes/python/voice_to_notes/services/diarize.py

168 lines
5.1 KiB
Python
Raw Normal View History

Phase 1 foundation: Tauri shell, Python sidecar, SQLite database Tauri v2 + Svelte + TypeScript frontend: - App shell with workspace layout (waveform, transcript, speakers, AI chat) - Placeholder components for all major UI areas - Typed stores (project, transcript, playback, AI) - TypeScript interfaces matching the database schema - Tauri bridge service with typed invoke wrappers - svelte-check passes with 0 errors Rust backend: - Tauri v2 app entry point with command registration - SQLite database layer (rusqlite with bundled SQLite) - Full schema: projects, media_files, speakers, segments, words, ai_outputs, annotations (with indexes) - Model structs with serde serialization - CRUD queries for projects, speakers, segments, words - Segment text editing preserves original text - Schema versioning for future migrations - 6 tests passing - Command stubs for project, transcribe, export, AI, settings, system - App state management Python sidecar: - JSON-line IPC protocol (stdin/stdout) - Message types: IPCMessage, progress, error, ready - Handler registry with routing and error handling - Ping/pong handler for connectivity testing - Service stubs: transcribe, diarize, pipeline, AI, export - Provider stubs: local (llama-server), OpenAI, Anthropic, LiteLLM - Hardware detection stubs - 14 tests passing, ruff clean Also adds: - Testing strategy document (docs/TESTING.md) - Validation script (scripts/validate.sh) - Updated .gitignore for Svelte, Rust, Python artifacts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:16:06 -08:00
"""Diarization service — pyannote.audio speaker identification."""
from __future__ import annotations
import sys
import time
from dataclasses import dataclass, field
from typing import Any
from voice_to_notes.ipc.messages import progress_message
from voice_to_notes.ipc.protocol import write_message
@dataclass
class SpeakerSegment:
"""A time span assigned to a speaker."""
speaker: str
start_ms: int
end_ms: int
@dataclass
class DiarizationResult:
"""Full diarization output."""
speaker_segments: list[SpeakerSegment] = field(default_factory=list)
num_speakers: int = 0
speakers: list[str] = field(default_factory=list)
Phase 1 foundation: Tauri shell, Python sidecar, SQLite database Tauri v2 + Svelte + TypeScript frontend: - App shell with workspace layout (waveform, transcript, speakers, AI chat) - Placeholder components for all major UI areas - Typed stores (project, transcript, playback, AI) - TypeScript interfaces matching the database schema - Tauri bridge service with typed invoke wrappers - svelte-check passes with 0 errors Rust backend: - Tauri v2 app entry point with command registration - SQLite database layer (rusqlite with bundled SQLite) - Full schema: projects, media_files, speakers, segments, words, ai_outputs, annotations (with indexes) - Model structs with serde serialization - CRUD queries for projects, speakers, segments, words - Segment text editing preserves original text - Schema versioning for future migrations - 6 tests passing - Command stubs for project, transcribe, export, AI, settings, system - App state management Python sidecar: - JSON-line IPC protocol (stdin/stdout) - Message types: IPCMessage, progress, error, ready - Handler registry with routing and error handling - Ping/pong handler for connectivity testing - Service stubs: transcribe, diarize, pipeline, AI, export - Provider stubs: local (llama-server), OpenAI, Anthropic, LiteLLM - Hardware detection stubs - 14 tests passing, ruff clean Also adds: - Testing strategy document (docs/TESTING.md) - Validation script (scripts/validate.sh) - Updated .gitignore for Svelte, Rust, Python artifacts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:16:06 -08:00
class DiarizeService:
"""Handles speaker diarization via pyannote.audio."""
def __init__(self) -> None:
self._pipeline: Any = None
def _ensure_pipeline(self) -> Any:
"""Load the pyannote diarization pipeline (lazy)."""
if self._pipeline is not None:
return self._pipeline
print("[sidecar] Loading pyannote diarization pipeline...", file=sys.stderr, flush=True)
try:
from pyannote.audio import Pipeline
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=False,
)
except Exception:
# Fall back to a simpler approach if the model isn't available
# pyannote requires HuggingFace token for some models
# Try the community model first
try:
from pyannote.audio import Pipeline
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization",
use_auth_token=False,
)
except Exception as e:
print(
f"[sidecar] Warning: Could not load pyannote pipeline: {e}",
file=sys.stderr,
flush=True,
)
raise RuntimeError(
"pyannote.audio pipeline not available. "
"You may need to accept the model license at "
"https://huggingface.co/pyannote/speaker-diarization-3.1 "
"and set a HF_TOKEN environment variable."
) from e
return self._pipeline
def diarize(
self,
request_id: str,
file_path: str,
num_speakers: int | None = None,
min_speakers: int | None = None,
max_speakers: int | None = None,
) -> DiarizationResult:
"""Run speaker diarization on an audio file.
Args:
request_id: IPC request ID for progress messages.
file_path: Path to audio file.
num_speakers: Exact number of speakers (if known).
min_speakers: Minimum expected speakers.
max_speakers: Maximum expected speakers.
Returns:
DiarizationResult with speaker segments.
"""
write_message(
progress_message(request_id, 0, "loading_diarization", "Loading diarization model...")
)
pipeline = self._ensure_pipeline()
write_message(
progress_message(request_id, 20, "diarizing", "Running speaker diarization...")
)
start_time = time.time()
# Build kwargs for speaker constraints
kwargs: dict[str, Any] = {}
if num_speakers is not None:
kwargs["num_speakers"] = num_speakers
if min_speakers is not None:
kwargs["min_speakers"] = min_speakers
if max_speakers is not None:
kwargs["max_speakers"] = max_speakers
# Run diarization
diarization = pipeline(file_path, **kwargs)
# Convert pyannote output to our format
result = DiarizationResult()
seen_speakers: set[str] = set()
for turn, _, speaker in diarization.itertracks(yield_label=True):
result.speaker_segments.append(
SpeakerSegment(
speaker=speaker,
start_ms=int(turn.start * 1000),
end_ms=int(turn.end * 1000),
)
)
seen_speakers.add(speaker)
result.speakers = sorted(seen_speakers)
result.num_speakers = len(seen_speakers)
elapsed = time.time() - start_time
print(
f"[sidecar] Diarization complete: {result.num_speakers} speakers, "
f"{len(result.speaker_segments)} segments in {elapsed:.1f}s",
file=sys.stderr,
flush=True,
)
write_message(
progress_message(request_id, 100, "done", "Diarization complete")
)
return result
def diarization_to_payload(result: DiarizationResult) -> dict[str, Any]:
"""Convert DiarizationResult to IPC payload dict."""
return {
"speaker_segments": [
{
"speaker": seg.speaker,
"start_ms": seg.start_ms,
"end_ms": seg.end_ms,
}
for seg in result.speaker_segments
],
"num_speakers": result.num_speakers,
"speakers": result.speakers,
}