- transcribe: catch model load failures on CUDA and retry with CPU - hardware detect: test CUDA runtime actually works (torch.zeros on cuda) before recommending GPU, since CPU-only builds detect CUDA via driver but lack cublas/cuDNN libraries Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
331 lines
11 KiB
Python
331 lines
11 KiB
Python
"""Transcription service — faster-whisper pipeline with word-level timestamps."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import time
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from faster_whisper import WhisperModel
|
|
|
|
from voice_to_notes.ipc.messages import progress_message
|
|
from voice_to_notes.ipc.protocol import write_message
|
|
from voice_to_notes.utils.ffmpeg import get_ffmpeg_path, get_ffprobe_path
|
|
|
|
CHUNK_REPORT_SIZE = 10
|
|
LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour
|
|
|
|
|
|
@dataclass
|
|
class WordResult:
|
|
"""A single word with timestamp."""
|
|
|
|
word: str
|
|
start_ms: int
|
|
end_ms: int
|
|
confidence: float
|
|
|
|
|
|
@dataclass
|
|
class SegmentResult:
|
|
"""A transcription segment with words."""
|
|
|
|
text: str
|
|
start_ms: int
|
|
end_ms: int
|
|
words: list[WordResult] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class TranscriptionResult:
|
|
"""Full transcription output."""
|
|
|
|
segments: list[SegmentResult] = field(default_factory=list)
|
|
language: str = ""
|
|
language_probability: float = 0.0
|
|
duration_ms: int = 0
|
|
|
|
|
|
class TranscribeService:
|
|
"""Handles audio transcription via faster-whisper."""
|
|
|
|
def __init__(self) -> None:
|
|
self._model: WhisperModel | None = None
|
|
self._current_model_name: str = ""
|
|
self._current_device: str = ""
|
|
self._current_compute_type: str = ""
|
|
|
|
def _ensure_model(
|
|
self,
|
|
model_name: str = "base",
|
|
device: str = "cpu",
|
|
compute_type: str = "int8",
|
|
) -> WhisperModel:
|
|
"""Load or reuse the Whisper model."""
|
|
if (
|
|
self._model is not None
|
|
and self._current_model_name == model_name
|
|
and self._current_device == device
|
|
and self._current_compute_type == compute_type
|
|
):
|
|
return self._model
|
|
|
|
print(
|
|
f"[sidecar] Loading model {model_name} on {device} ({compute_type})",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
try:
|
|
self._model = WhisperModel(
|
|
model_name,
|
|
device=device,
|
|
compute_type=compute_type,
|
|
)
|
|
except Exception as e:
|
|
if device != "cpu":
|
|
print(
|
|
f"[sidecar] Failed to load on {device}: {e}. Falling back to CPU.",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
device = "cpu"
|
|
compute_type = "int8"
|
|
self._model = WhisperModel(
|
|
model_name,
|
|
device=device,
|
|
compute_type=compute_type,
|
|
)
|
|
else:
|
|
raise
|
|
self._current_model_name = model_name
|
|
self._current_device = device
|
|
self._current_compute_type = compute_type
|
|
return self._model
|
|
|
|
def transcribe(
|
|
self,
|
|
request_id: str,
|
|
file_path: str,
|
|
model_name: str = "base",
|
|
device: str = "cpu",
|
|
compute_type: str = "int8",
|
|
language: str | None = None,
|
|
on_segment: Callable[[SegmentResult, int], None] | None = None,
|
|
) -> TranscriptionResult:
|
|
"""Transcribe an audio file with word-level timestamps.
|
|
|
|
Sends progress messages via IPC during processing.
|
|
"""
|
|
# Stage: loading model
|
|
write_message(progress_message(request_id, 0, "loading_model", f"Loading {model_name}..."))
|
|
model = self._ensure_model(model_name, device, compute_type)
|
|
|
|
# Stage: transcribing
|
|
write_message(progress_message(request_id, 10, "transcribing", "Starting transcription..."))
|
|
|
|
start_time = time.time()
|
|
segments_iter, info = model.transcribe(
|
|
file_path,
|
|
language=language,
|
|
word_timestamps=True,
|
|
vad_filter=True,
|
|
)
|
|
|
|
result = TranscriptionResult(
|
|
language=info.language,
|
|
language_probability=info.language_probability,
|
|
duration_ms=int(info.duration * 1000),
|
|
)
|
|
|
|
# Process segments with progress reporting
|
|
total_duration = info.duration if info.duration > 0 else 1.0
|
|
segment_count = 0
|
|
|
|
for segment in segments_iter:
|
|
segment_count += 1
|
|
progress_pct = min(10 + int((segment.end / total_duration) * 80), 90)
|
|
|
|
words = []
|
|
if segment.words:
|
|
for w in segment.words:
|
|
words.append(
|
|
WordResult(
|
|
word=w.word.strip(),
|
|
start_ms=int(w.start * 1000),
|
|
end_ms=int(w.end * 1000),
|
|
confidence=round(w.probability, 4),
|
|
)
|
|
)
|
|
|
|
result.segments.append(
|
|
SegmentResult(
|
|
text=segment.text.strip(),
|
|
start_ms=int(segment.start * 1000),
|
|
end_ms=int(segment.end * 1000),
|
|
words=words,
|
|
)
|
|
)
|
|
|
|
if on_segment:
|
|
on_segment(result.segments[-1], segment_count - 1)
|
|
|
|
write_message(
|
|
progress_message(
|
|
request_id,
|
|
progress_pct,
|
|
"transcribing",
|
|
f"Transcribing segment {segment_count} ({progress_pct}% of audio)...",
|
|
)
|
|
)
|
|
|
|
if segment_count % CHUNK_REPORT_SIZE == 0:
|
|
write_message(progress_message(
|
|
request_id, progress_pct, "transcribing",
|
|
f"Completed chunk of {CHUNK_REPORT_SIZE} segments "
|
|
f"({segment_count} total, {progress_pct}% of audio)..."))
|
|
|
|
elapsed = time.time() - start_time
|
|
print(
|
|
f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
|
|
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
|
|
return result
|
|
|
|
def transcribe_chunked(
|
|
self,
|
|
request_id: str,
|
|
file_path: str,
|
|
model_name: str = "base",
|
|
device: str = "cpu",
|
|
compute_type: str = "int8",
|
|
language: str | None = None,
|
|
on_segment: Callable[[SegmentResult, int], None] | None = None,
|
|
chunk_duration_sec: int = 300,
|
|
) -> TranscriptionResult:
|
|
"""Transcribe a large audio file by splitting into chunks.
|
|
|
|
Uses ffmpeg to split the file into chunks, transcribes each chunk,
|
|
then merges the results with corrected timestamps.
|
|
|
|
Falls back to standard transcribe() if ffmpeg is not available.
|
|
"""
|
|
import subprocess
|
|
import tempfile
|
|
|
|
# Get total duration via ffprobe
|
|
try:
|
|
probe_result = subprocess.run(
|
|
[get_ffprobe_path(), "-v", "quiet", "-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
|
|
capture_output=True, text=True, check=True,
|
|
)
|
|
total_duration = float(probe_result.stdout.strip())
|
|
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
|
|
# ffprobe not available or failed — fall back to standard transcription
|
|
write_message(progress_message(
|
|
request_id, 5, "transcribing",
|
|
"ffmpeg not available, using standard transcription..."))
|
|
return self.transcribe(request_id, file_path, model_name, device,
|
|
compute_type, language, on_segment=on_segment)
|
|
|
|
num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1)
|
|
write_message(progress_message(
|
|
request_id, 5, "transcribing",
|
|
f"Splitting {total_duration:.0f}s file into {num_chunks} chunks..."))
|
|
|
|
merged_result = TranscriptionResult()
|
|
global_segment_index = 0
|
|
|
|
for chunk_idx in range(num_chunks):
|
|
chunk_start = chunk_idx * chunk_duration_sec
|
|
if chunk_start >= total_duration:
|
|
break
|
|
|
|
chunk_start_ms = int(chunk_start * 1000)
|
|
|
|
# Extract chunk to temp file
|
|
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
|
|
tmp.close()
|
|
try:
|
|
subprocess.run(
|
|
[get_ffmpeg_path(), "-y", "-ss", str(chunk_start),
|
|
"-t", str(chunk_duration_sec),
|
|
"-i", file_path,
|
|
"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le",
|
|
tmp.name],
|
|
capture_output=True, check=True,
|
|
)
|
|
|
|
# Wrap on_segment to offset the index
|
|
chunk_on_segment = None
|
|
if on_segment:
|
|
base_index = global_segment_index
|
|
def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None:
|
|
on_segment(seg, _base + idx)
|
|
|
|
chunk_result = self.transcribe(
|
|
request_id, tmp.name, model_name, device,
|
|
compute_type, language, on_segment=chunk_on_segment,
|
|
)
|
|
|
|
# Offset timestamps and merge
|
|
for seg in chunk_result.segments:
|
|
seg.start_ms += chunk_start_ms
|
|
seg.end_ms += chunk_start_ms
|
|
for word in seg.words:
|
|
word.start_ms += chunk_start_ms
|
|
word.end_ms += chunk_start_ms
|
|
merged_result.segments.append(seg)
|
|
|
|
global_segment_index += len(chunk_result.segments)
|
|
|
|
# Take language from first chunk
|
|
if chunk_idx == 0:
|
|
merged_result.language = chunk_result.language
|
|
merged_result.language_probability = chunk_result.language_probability
|
|
|
|
finally:
|
|
import os
|
|
os.unlink(tmp.name)
|
|
|
|
# Chunk progress
|
|
chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90)
|
|
write_message(progress_message(
|
|
request_id, chunk_pct, "transcribing",
|
|
f"Completed chunk {chunk_idx + 1}/{num_chunks}..."))
|
|
|
|
merged_result.duration_ms = int(total_duration * 1000)
|
|
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
|
|
return merged_result
|
|
|
|
|
|
def result_to_payload(result: TranscriptionResult) -> dict[str, Any]:
|
|
"""Convert TranscriptionResult to IPC payload dict."""
|
|
return {
|
|
"segments": [
|
|
{
|
|
"text": seg.text,
|
|
"start_ms": seg.start_ms,
|
|
"end_ms": seg.end_ms,
|
|
"words": [
|
|
{
|
|
"word": w.word,
|
|
"start_ms": w.start_ms,
|
|
"end_ms": w.end_ms,
|
|
"confidence": w.confidence,
|
|
}
|
|
for w in seg.words
|
|
],
|
|
}
|
|
for seg in result.segments
|
|
],
|
|
"language": result.language,
|
|
"language_probability": result.language_probability,
|
|
"duration_ms": result.duration_ms,
|
|
}
|