Files
voice-to-notes/python/voice_to_notes/services/transcribe.py
Claude 7efa3bb116
Some checks failed
Release / Bump version and tag (push) Successful in 18s
Release / Build (macOS) (push) Successful in 5m27s
Release / Build (Linux) (push) Successful in 11m38s
Release / Build (Windows) (push) Has been cancelled
Fix CUDA fallback: gracefully fall back to CPU when CUDA libs missing
- transcribe: catch model load failures on CUDA and retry with CPU
- hardware detect: test CUDA runtime actually works (torch.zeros on cuda)
  before recommending GPU, since CPU-only builds detect CUDA via driver
  but lack cublas/cuDNN libraries

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 05:36:40 -07:00

331 lines
11 KiB
Python

"""Transcription service — faster-whisper pipeline with word-level timestamps."""
from __future__ import annotations
import sys
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from faster_whisper import WhisperModel
from voice_to_notes.ipc.messages import progress_message
from voice_to_notes.ipc.protocol import write_message
from voice_to_notes.utils.ffmpeg import get_ffmpeg_path, get_ffprobe_path
CHUNK_REPORT_SIZE = 10
LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour
@dataclass
class WordResult:
"""A single word with timestamp."""
word: str
start_ms: int
end_ms: int
confidence: float
@dataclass
class SegmentResult:
"""A transcription segment with words."""
text: str
start_ms: int
end_ms: int
words: list[WordResult] = field(default_factory=list)
@dataclass
class TranscriptionResult:
"""Full transcription output."""
segments: list[SegmentResult] = field(default_factory=list)
language: str = ""
language_probability: float = 0.0
duration_ms: int = 0
class TranscribeService:
"""Handles audio transcription via faster-whisper."""
def __init__(self) -> None:
self._model: WhisperModel | None = None
self._current_model_name: str = ""
self._current_device: str = ""
self._current_compute_type: str = ""
def _ensure_model(
self,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
) -> WhisperModel:
"""Load or reuse the Whisper model."""
if (
self._model is not None
and self._current_model_name == model_name
and self._current_device == device
and self._current_compute_type == compute_type
):
return self._model
print(
f"[sidecar] Loading model {model_name} on {device} ({compute_type})",
file=sys.stderr,
flush=True,
)
try:
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
except Exception as e:
if device != "cpu":
print(
f"[sidecar] Failed to load on {device}: {e}. Falling back to CPU.",
file=sys.stderr,
flush=True,
)
device = "cpu"
compute_type = "int8"
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
else:
raise
self._current_model_name = model_name
self._current_device = device
self._current_compute_type = compute_type
return self._model
def transcribe(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
) -> TranscriptionResult:
"""Transcribe an audio file with word-level timestamps.
Sends progress messages via IPC during processing.
"""
# Stage: loading model
write_message(progress_message(request_id, 0, "loading_model", f"Loading {model_name}..."))
model = self._ensure_model(model_name, device, compute_type)
# Stage: transcribing
write_message(progress_message(request_id, 10, "transcribing", "Starting transcription..."))
start_time = time.time()
segments_iter, info = model.transcribe(
file_path,
language=language,
word_timestamps=True,
vad_filter=True,
)
result = TranscriptionResult(
language=info.language,
language_probability=info.language_probability,
duration_ms=int(info.duration * 1000),
)
# Process segments with progress reporting
total_duration = info.duration if info.duration > 0 else 1.0
segment_count = 0
for segment in segments_iter:
segment_count += 1
progress_pct = min(10 + int((segment.end / total_duration) * 80), 90)
words = []
if segment.words:
for w in segment.words:
words.append(
WordResult(
word=w.word.strip(),
start_ms=int(w.start * 1000),
end_ms=int(w.end * 1000),
confidence=round(w.probability, 4),
)
)
result.segments.append(
SegmentResult(
text=segment.text.strip(),
start_ms=int(segment.start * 1000),
end_ms=int(segment.end * 1000),
words=words,
)
)
if on_segment:
on_segment(result.segments[-1], segment_count - 1)
write_message(
progress_message(
request_id,
progress_pct,
"transcribing",
f"Transcribing segment {segment_count} ({progress_pct}% of audio)...",
)
)
if segment_count % CHUNK_REPORT_SIZE == 0:
write_message(progress_message(
request_id, progress_pct, "transcribing",
f"Completed chunk of {CHUNK_REPORT_SIZE} segments "
f"({segment_count} total, {progress_pct}% of audio)..."))
elapsed = time.time() - start_time
print(
f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s",
file=sys.stderr,
flush=True,
)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return result
def transcribe_chunked(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_duration_sec: int = 300,
) -> TranscriptionResult:
"""Transcribe a large audio file by splitting into chunks.
Uses ffmpeg to split the file into chunks, transcribes each chunk,
then merges the results with corrected timestamps.
Falls back to standard transcribe() if ffmpeg is not available.
"""
import subprocess
import tempfile
# Get total duration via ffprobe
try:
probe_result = subprocess.run(
[get_ffprobe_path(), "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, check=True,
)
total_duration = float(probe_result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
# ffprobe not available or failed — fall back to standard transcription
write_message(progress_message(
request_id, 5, "transcribing",
"ffmpeg not available, using standard transcription..."))
return self.transcribe(request_id, file_path, model_name, device,
compute_type, language, on_segment=on_segment)
num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1)
write_message(progress_message(
request_id, 5, "transcribing",
f"Splitting {total_duration:.0f}s file into {num_chunks} chunks..."))
merged_result = TranscriptionResult()
global_segment_index = 0
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * chunk_duration_sec
if chunk_start >= total_duration:
break
chunk_start_ms = int(chunk_start * 1000)
# Extract chunk to temp file
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp.close()
try:
subprocess.run(
[get_ffmpeg_path(), "-y", "-ss", str(chunk_start),
"-t", str(chunk_duration_sec),
"-i", file_path,
"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le",
tmp.name],
capture_output=True, check=True,
)
# Wrap on_segment to offset the index
chunk_on_segment = None
if on_segment:
base_index = global_segment_index
def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None:
on_segment(seg, _base + idx)
chunk_result = self.transcribe(
request_id, tmp.name, model_name, device,
compute_type, language, on_segment=chunk_on_segment,
)
# Offset timestamps and merge
for seg in chunk_result.segments:
seg.start_ms += chunk_start_ms
seg.end_ms += chunk_start_ms
for word in seg.words:
word.start_ms += chunk_start_ms
word.end_ms += chunk_start_ms
merged_result.segments.append(seg)
global_segment_index += len(chunk_result.segments)
# Take language from first chunk
if chunk_idx == 0:
merged_result.language = chunk_result.language
merged_result.language_probability = chunk_result.language_probability
finally:
import os
os.unlink(tmp.name)
# Chunk progress
chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90)
write_message(progress_message(
request_id, chunk_pct, "transcribing",
f"Completed chunk {chunk_idx + 1}/{num_chunks}..."))
merged_result.duration_ms = int(total_duration * 1000)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return merged_result
def result_to_payload(result: TranscriptionResult) -> dict[str, Any]:
"""Convert TranscriptionResult to IPC payload dict."""
return {
"segments": [
{
"text": seg.text,
"start_ms": seg.start_ms,
"end_ms": seg.end_ms,
"words": [
{
"word": w.word,
"start_ms": w.start_ms,
"end_ms": w.end_ms,
"confidence": w.confidence,
}
for w in seg.words
],
}
for seg in result.segments
],
"language": result.language,
"language_probability": result.language_probability,
"duration_ms": result.duration_ms,
}