Files
voice-to-notes/python/voice_to_notes/services/transcribe.py
Claude 58faa83cb3 Cross-platform distribution, UI improvements, and performance optimizations
- PyInstaller frozen sidecar: spec file, build script, and ffmpeg path resolver
  for self-contained distribution without Python prerequisites
- Dual-mode sidecar launcher: frozen binary (production) with dev mode fallback
- Parallel transcription + diarization pipeline (~30-40% faster)
- GPU auto-detection for diarization (CUDA when available)
- Async run_pipeline command for real-time progress event delivery
- Web Audio API backend for instant playback and seeking
- OpenAI-compatible provider replacing LiteLLM client-side routing
- Cross-platform RAM detection (Linux/macOS/Windows)
- Settings: speaker count hint, token reveal toggles, dark dropdown styling
- Loading splash screen, flexbox layout fix for viewport overflow
- Gitea Actions CI/CD pipeline (Linux, Windows, macOS ARM)
- Updated README and CLAUDE.md documentation

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 21:33:43 -07:00

314 lines
11 KiB
Python

"""Transcription service — faster-whisper pipeline with word-level timestamps."""
from __future__ import annotations
import sys
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from faster_whisper import WhisperModel
from voice_to_notes.ipc.messages import progress_message
from voice_to_notes.ipc.protocol import write_message
from voice_to_notes.utils.ffmpeg import get_ffmpeg_path, get_ffprobe_path
CHUNK_REPORT_SIZE = 10
LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour
@dataclass
class WordResult:
"""A single word with timestamp."""
word: str
start_ms: int
end_ms: int
confidence: float
@dataclass
class SegmentResult:
"""A transcription segment with words."""
text: str
start_ms: int
end_ms: int
words: list[WordResult] = field(default_factory=list)
@dataclass
class TranscriptionResult:
"""Full transcription output."""
segments: list[SegmentResult] = field(default_factory=list)
language: str = ""
language_probability: float = 0.0
duration_ms: int = 0
class TranscribeService:
"""Handles audio transcription via faster-whisper."""
def __init__(self) -> None:
self._model: WhisperModel | None = None
self._current_model_name: str = ""
self._current_device: str = ""
self._current_compute_type: str = ""
def _ensure_model(
self,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
) -> WhisperModel:
"""Load or reuse the Whisper model."""
if (
self._model is not None
and self._current_model_name == model_name
and self._current_device == device
and self._current_compute_type == compute_type
):
return self._model
print(
f"[sidecar] Loading model {model_name} on {device} ({compute_type})",
file=sys.stderr,
flush=True,
)
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
self._current_model_name = model_name
self._current_device = device
self._current_compute_type = compute_type
return self._model
def transcribe(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
) -> TranscriptionResult:
"""Transcribe an audio file with word-level timestamps.
Sends progress messages via IPC during processing.
"""
# Stage: loading model
write_message(progress_message(request_id, 0, "loading_model", f"Loading {model_name}..."))
model = self._ensure_model(model_name, device, compute_type)
# Stage: transcribing
write_message(progress_message(request_id, 10, "transcribing", "Starting transcription..."))
start_time = time.time()
segments_iter, info = model.transcribe(
file_path,
language=language,
word_timestamps=True,
vad_filter=True,
)
result = TranscriptionResult(
language=info.language,
language_probability=info.language_probability,
duration_ms=int(info.duration * 1000),
)
# Process segments with progress reporting
total_duration = info.duration if info.duration > 0 else 1.0
segment_count = 0
for segment in segments_iter:
segment_count += 1
progress_pct = min(10 + int((segment.end / total_duration) * 80), 90)
words = []
if segment.words:
for w in segment.words:
words.append(
WordResult(
word=w.word.strip(),
start_ms=int(w.start * 1000),
end_ms=int(w.end * 1000),
confidence=round(w.probability, 4),
)
)
result.segments.append(
SegmentResult(
text=segment.text.strip(),
start_ms=int(segment.start * 1000),
end_ms=int(segment.end * 1000),
words=words,
)
)
if on_segment:
on_segment(result.segments[-1], segment_count - 1)
write_message(
progress_message(
request_id,
progress_pct,
"transcribing",
f"Transcribing segment {segment_count} ({progress_pct}% of audio)...",
)
)
if segment_count % CHUNK_REPORT_SIZE == 0:
write_message(progress_message(
request_id, progress_pct, "transcribing",
f"Completed chunk of {CHUNK_REPORT_SIZE} segments "
f"({segment_count} total, {progress_pct}% of audio)..."))
elapsed = time.time() - start_time
print(
f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s",
file=sys.stderr,
flush=True,
)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return result
def transcribe_chunked(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_duration_sec: int = 300,
) -> TranscriptionResult:
"""Transcribe a large audio file by splitting into chunks.
Uses ffmpeg to split the file into chunks, transcribes each chunk,
then merges the results with corrected timestamps.
Falls back to standard transcribe() if ffmpeg is not available.
"""
import subprocess
import tempfile
# Get total duration via ffprobe
try:
probe_result = subprocess.run(
[get_ffprobe_path(), "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, check=True,
)
total_duration = float(probe_result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
# ffprobe not available or failed — fall back to standard transcription
write_message(progress_message(
request_id, 5, "transcribing",
"ffmpeg not available, using standard transcription..."))
return self.transcribe(request_id, file_path, model_name, device,
compute_type, language, on_segment=on_segment)
num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1)
write_message(progress_message(
request_id, 5, "transcribing",
f"Splitting {total_duration:.0f}s file into {num_chunks} chunks..."))
merged_result = TranscriptionResult()
global_segment_index = 0
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * chunk_duration_sec
if chunk_start >= total_duration:
break
chunk_start_ms = int(chunk_start * 1000)
# Extract chunk to temp file
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp.close()
try:
subprocess.run(
[get_ffmpeg_path(), "-y", "-ss", str(chunk_start),
"-t", str(chunk_duration_sec),
"-i", file_path,
"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le",
tmp.name],
capture_output=True, check=True,
)
# Wrap on_segment to offset the index
chunk_on_segment = None
if on_segment:
base_index = global_segment_index
def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None:
on_segment(seg, _base + idx)
chunk_result = self.transcribe(
request_id, tmp.name, model_name, device,
compute_type, language, on_segment=chunk_on_segment,
)
# Offset timestamps and merge
for seg in chunk_result.segments:
seg.start_ms += chunk_start_ms
seg.end_ms += chunk_start_ms
for word in seg.words:
word.start_ms += chunk_start_ms
word.end_ms += chunk_start_ms
merged_result.segments.append(seg)
global_segment_index += len(chunk_result.segments)
# Take language from first chunk
if chunk_idx == 0:
merged_result.language = chunk_result.language
merged_result.language_probability = chunk_result.language_probability
finally:
import os
os.unlink(tmp.name)
# Chunk progress
chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90)
write_message(progress_message(
request_id, chunk_pct, "transcribing",
f"Completed chunk {chunk_idx + 1}/{num_chunks}..."))
merged_result.duration_ms = int(total_duration * 1000)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return merged_result
def result_to_payload(result: TranscriptionResult) -> dict[str, Any]:
"""Convert TranscriptionResult to IPC payload dict."""
return {
"segments": [
{
"text": seg.text,
"start_ms": seg.start_ms,
"end_ms": seg.end_ms,
"words": [
{
"word": w.word,
"start_ms": w.start_ms,
"end_ms": w.end_ms,
"confidence": w.confidence,
}
for w in seg.words
],
}
for seg in result.segments
],
"language": result.language,
"language_probability": result.language_probability,
"duration_ms": result.duration_ms,
}