"""Transcription service — faster-whisper pipeline with word-level timestamps.""" from __future__ import annotations import sys import time from collections.abc import Callable from dataclasses import dataclass, field from typing import Any from faster_whisper import WhisperModel from voice_to_notes.ipc.messages import progress_message from voice_to_notes.ipc.protocol import write_message CHUNK_REPORT_SIZE = 10 LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour @dataclass class WordResult: """A single word with timestamp.""" word: str start_ms: int end_ms: int confidence: float @dataclass class SegmentResult: """A transcription segment with words.""" text: str start_ms: int end_ms: int words: list[WordResult] = field(default_factory=list) @dataclass class TranscriptionResult: """Full transcription output.""" segments: list[SegmentResult] = field(default_factory=list) language: str = "" language_probability: float = 0.0 duration_ms: int = 0 class TranscribeService: """Handles audio transcription via faster-whisper.""" def __init__(self) -> None: self._model: WhisperModel | None = None self._current_model_name: str = "" self._current_device: str = "" self._current_compute_type: str = "" def _ensure_model( self, model_name: str = "base", device: str = "cpu", compute_type: str = "int8", ) -> WhisperModel: """Load or reuse the Whisper model.""" if ( self._model is not None and self._current_model_name == model_name and self._current_device == device and self._current_compute_type == compute_type ): return self._model print( f"[sidecar] Loading model {model_name} on {device} ({compute_type})", file=sys.stderr, flush=True, ) self._model = WhisperModel( model_name, device=device, compute_type=compute_type, ) self._current_model_name = model_name self._current_device = device self._current_compute_type = compute_type return self._model def transcribe( self, request_id: str, file_path: str, model_name: str = "base", device: str = "cpu", compute_type: str = "int8", language: str | None = None, on_segment: Callable[[SegmentResult, int], None] | None = None, ) -> TranscriptionResult: """Transcribe an audio file with word-level timestamps. Sends progress messages via IPC during processing. """ # Stage: loading model write_message(progress_message(request_id, 0, "loading_model", f"Loading {model_name}...")) model = self._ensure_model(model_name, device, compute_type) # Stage: transcribing write_message(progress_message(request_id, 10, "transcribing", "Starting transcription...")) start_time = time.time() segments_iter, info = model.transcribe( file_path, language=language, word_timestamps=True, vad_filter=True, ) result = TranscriptionResult( language=info.language, language_probability=info.language_probability, duration_ms=int(info.duration * 1000), ) # Process segments with progress reporting total_duration = info.duration if info.duration > 0 else 1.0 segment_count = 0 for segment in segments_iter: segment_count += 1 progress_pct = min(10 + int((segment.end / total_duration) * 80), 90) words = [] if segment.words: for w in segment.words: words.append( WordResult( word=w.word.strip(), start_ms=int(w.start * 1000), end_ms=int(w.end * 1000), confidence=round(w.probability, 4), ) ) result.segments.append( SegmentResult( text=segment.text.strip(), start_ms=int(segment.start * 1000), end_ms=int(segment.end * 1000), words=words, ) ) if on_segment: on_segment(result.segments[-1], segment_count - 1) write_message( progress_message( request_id, progress_pct, "transcribing", f"Transcribing segment {segment_count} ({progress_pct}% of audio)...", ) ) if segment_count % CHUNK_REPORT_SIZE == 0: write_message(progress_message( request_id, progress_pct, "transcribing", f"Completed chunk of {CHUNK_REPORT_SIZE} segments " f"({segment_count} total, {progress_pct}% of audio)...")) elapsed = time.time() - start_time print( f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s", file=sys.stderr, flush=True, ) write_message(progress_message(request_id, 100, "done", "Transcription complete")) return result def transcribe_chunked( self, request_id: str, file_path: str, model_name: str = "base", device: str = "cpu", compute_type: str = "int8", language: str | None = None, on_segment: Callable[[SegmentResult, int], None] | None = None, chunk_duration_sec: int = 300, ) -> TranscriptionResult: """Transcribe a large audio file by splitting into chunks. Uses ffmpeg to split the file into chunks, transcribes each chunk, then merges the results with corrected timestamps. Falls back to standard transcribe() if ffmpeg is not available. """ import subprocess import tempfile # Get total duration via ffprobe try: probe_result = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", file_path], capture_output=True, text=True, check=True, ) total_duration = float(probe_result.stdout.strip()) except (subprocess.CalledProcessError, FileNotFoundError, ValueError): # ffprobe not available or failed — fall back to standard transcription write_message(progress_message( request_id, 5, "transcribing", "ffmpeg not available, using standard transcription...")) return self.transcribe(request_id, file_path, model_name, device, compute_type, language, on_segment=on_segment) num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1) write_message(progress_message( request_id, 5, "transcribing", f"Splitting {total_duration:.0f}s file into {num_chunks} chunks...")) merged_result = TranscriptionResult() global_segment_index = 0 for chunk_idx in range(num_chunks): chunk_start = chunk_idx * chunk_duration_sec if chunk_start >= total_duration: break chunk_start_ms = int(chunk_start * 1000) # Extract chunk to temp file tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) tmp.close() try: subprocess.run( ["ffmpeg", "-y", "-ss", str(chunk_start), "-t", str(chunk_duration_sec), "-i", file_path, "-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le", tmp.name], capture_output=True, check=True, ) # Wrap on_segment to offset the index chunk_on_segment = None if on_segment: base_index = global_segment_index def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None: on_segment(seg, _base + idx) chunk_result = self.transcribe( request_id, tmp.name, model_name, device, compute_type, language, on_segment=chunk_on_segment, ) # Offset timestamps and merge for seg in chunk_result.segments: seg.start_ms += chunk_start_ms seg.end_ms += chunk_start_ms for word in seg.words: word.start_ms += chunk_start_ms word.end_ms += chunk_start_ms merged_result.segments.append(seg) global_segment_index += len(chunk_result.segments) # Take language from first chunk if chunk_idx == 0: merged_result.language = chunk_result.language merged_result.language_probability = chunk_result.language_probability finally: import os os.unlink(tmp.name) # Chunk progress chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90) write_message(progress_message( request_id, chunk_pct, "transcribing", f"Completed chunk {chunk_idx + 1}/{num_chunks}...")) merged_result.duration_ms = int(total_duration * 1000) write_message(progress_message(request_id, 100, "done", "Transcription complete")) return merged_result def result_to_payload(result: TranscriptionResult) -> dict[str, Any]: """Convert TranscriptionResult to IPC payload dict.""" return { "segments": [ { "text": seg.text, "start_ms": seg.start_ms, "end_ms": seg.end_ms, "words": [ { "word": w.word, "start_ms": w.start_ms, "end_ms": w.end_ms, "confidence": w.confidence, } for w in seg.words ], } for seg in result.segments ], "language": result.language, "language_probability": result.language_probability, "duration_ms": result.duration_ms, }