Files
Claude 33ca3e4a28
All checks were successful
Build Sidecars / Bump sidecar version and tag (push) Successful in 3s
Release / Bump version and tag (push) Successful in 3s
Build Sidecars / Build Sidecar (macOS) (push) Successful in 8m30s
Release / Build App (macOS) (push) Successful in 1m19s
Build Sidecars / Build Sidecar (Linux) (push) Successful in 12m9s
Release / Build App (Linux) (push) Successful in 3m36s
Build Sidecars / Build Sidecar (Windows) (push) Successful in 29m36s
Release / Build App (Windows) (push) Successful in 3m13s
Show chunk context in transcription progress for large files
Files >1 hour are split into 5-minute chunks. Previously each chunk
showed "Starting transcription..." making it look like a restart.
Now shows "Chunk 3/12: Starting transcription..." and
"Chunk 3/12: Transcribing segment 5 (42% of audio)..."

Also skips the "Loading model..." message for chunks after the first
since the model is already loaded.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-23 07:57:59 -07:00

337 lines
12 KiB
Python

"""Transcription service — faster-whisper pipeline with word-level timestamps."""
from __future__ import annotations
import sys
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from faster_whisper import WhisperModel
from voice_to_notes.ipc.messages import progress_message
from voice_to_notes.ipc.protocol import write_message
from voice_to_notes.utils.ffmpeg import get_ffmpeg_path, get_ffprobe_path
CHUNK_REPORT_SIZE = 10
LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour
@dataclass
class WordResult:
"""A single word with timestamp."""
word: str
start_ms: int
end_ms: int
confidence: float
@dataclass
class SegmentResult:
"""A transcription segment with words."""
text: str
start_ms: int
end_ms: int
words: list[WordResult] = field(default_factory=list)
@dataclass
class TranscriptionResult:
"""Full transcription output."""
segments: list[SegmentResult] = field(default_factory=list)
language: str = ""
language_probability: float = 0.0
duration_ms: int = 0
class TranscribeService:
"""Handles audio transcription via faster-whisper."""
def __init__(self) -> None:
self._model: WhisperModel | None = None
self._current_model_name: str = ""
self._current_device: str = ""
self._current_compute_type: str = ""
def _ensure_model(
self,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
) -> WhisperModel:
"""Load or reuse the Whisper model."""
if (
self._model is not None
and self._current_model_name == model_name
and self._current_device == device
and self._current_compute_type == compute_type
):
return self._model
print(
f"[sidecar] Loading model {model_name} on {device} ({compute_type})",
file=sys.stderr,
flush=True,
)
try:
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
except Exception as e:
if device != "cpu":
print(
f"[sidecar] Failed to load on {device}: {e}. Falling back to CPU.",
file=sys.stderr,
flush=True,
)
device = "cpu"
compute_type = "int8"
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
else:
raise
self._current_model_name = model_name
self._current_device = device
self._current_compute_type = compute_type
return self._model
def transcribe(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_label: str | None = None,
) -> TranscriptionResult:
"""Transcribe an audio file with word-level timestamps.
Sends progress messages via IPC during processing.
If chunk_label is set (e.g. "chunk 3/12"), messages are prefixed with it.
"""
prefix = f"{chunk_label}: " if chunk_label else ""
# Stage: loading model (skip for chunks after the first — model already loaded)
if not chunk_label:
write_message(progress_message(request_id, 0, "loading_model", f"Loading {model_name}..."))
model = self._ensure_model(model_name, device, compute_type)
# Stage: transcribing
write_message(progress_message(request_id, 10, "transcribing", f"{prefix}Starting transcription..."))
start_time = time.time()
segments_iter, info = model.transcribe(
file_path,
language=language,
word_timestamps=True,
vad_filter=True,
)
result = TranscriptionResult(
language=info.language,
language_probability=info.language_probability,
duration_ms=int(info.duration * 1000),
)
# Process segments with progress reporting
total_duration = info.duration if info.duration > 0 else 1.0
segment_count = 0
for segment in segments_iter:
segment_count += 1
progress_pct = min(10 + int((segment.end / total_duration) * 80), 90)
words = []
if segment.words:
for w in segment.words:
words.append(
WordResult(
word=w.word.strip(),
start_ms=int(w.start * 1000),
end_ms=int(w.end * 1000),
confidence=round(w.probability, 4),
)
)
result.segments.append(
SegmentResult(
text=segment.text.strip(),
start_ms=int(segment.start * 1000),
end_ms=int(segment.end * 1000),
words=words,
)
)
if on_segment:
on_segment(result.segments[-1], segment_count - 1)
write_message(
progress_message(
request_id,
progress_pct,
"transcribing",
f"{prefix}Transcribing segment {segment_count} ({progress_pct}% of audio)...",
)
)
if segment_count % CHUNK_REPORT_SIZE == 0:
write_message(progress_message(
request_id, progress_pct, "transcribing",
f"Completed chunk of {CHUNK_REPORT_SIZE} segments "
f"({segment_count} total, {progress_pct}% of audio)..."))
elapsed = time.time() - start_time
print(
f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s",
file=sys.stderr,
flush=True,
)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return result
def transcribe_chunked(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_duration_sec: int = 300,
) -> TranscriptionResult:
"""Transcribe a large audio file by splitting into chunks.
Uses ffmpeg to split the file into chunks, transcribes each chunk,
then merges the results with corrected timestamps.
Falls back to standard transcribe() if ffmpeg is not available.
"""
import subprocess
import tempfile
# Get total duration via ffprobe
try:
probe_result = subprocess.run(
[get_ffprobe_path(), "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, check=True,
)
total_duration = float(probe_result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
# ffprobe not available or failed — fall back to standard transcription
write_message(progress_message(
request_id, 5, "transcribing",
"ffmpeg not available, using standard transcription..."))
return self.transcribe(request_id, file_path, model_name, device,
compute_type, language, on_segment=on_segment)
num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1)
write_message(progress_message(
request_id, 5, "transcribing",
f"Splitting {total_duration:.0f}s file into {num_chunks} chunks..."))
merged_result = TranscriptionResult()
global_segment_index = 0
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * chunk_duration_sec
if chunk_start >= total_duration:
break
chunk_start_ms = int(chunk_start * 1000)
# Extract chunk to temp file
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp.close()
try:
subprocess.run(
[get_ffmpeg_path(), "-y", "-ss", str(chunk_start),
"-t", str(chunk_duration_sec),
"-i", file_path,
"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le",
tmp.name],
capture_output=True, check=True,
)
# Wrap on_segment to offset the index
chunk_on_segment = None
if on_segment:
base_index = global_segment_index
def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None:
on_segment(seg, _base + idx)
chunk_result = self.transcribe(
request_id, tmp.name, model_name, device,
compute_type, language, on_segment=chunk_on_segment,
chunk_label=f"Chunk {chunk_idx + 1}/{num_chunks}",
)
# Offset timestamps and merge
for seg in chunk_result.segments:
seg.start_ms += chunk_start_ms
seg.end_ms += chunk_start_ms
for word in seg.words:
word.start_ms += chunk_start_ms
word.end_ms += chunk_start_ms
merged_result.segments.append(seg)
global_segment_index += len(chunk_result.segments)
# Take language from first chunk
if chunk_idx == 0:
merged_result.language = chunk_result.language
merged_result.language_probability = chunk_result.language_probability
finally:
import os
os.unlink(tmp.name)
# Chunk progress
chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90)
write_message(progress_message(
request_id, chunk_pct, "transcribing",
f"Completed chunk {chunk_idx + 1}/{num_chunks}..."))
merged_result.duration_ms = int(total_duration * 1000)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return merged_result
def result_to_payload(result: TranscriptionResult) -> dict[str, Any]:
"""Convert TranscriptionResult to IPC payload dict."""
return {
"segments": [
{
"text": seg.text,
"start_ms": seg.start_ms,
"end_ms": seg.end_ms,
"words": [
{
"word": w.word,
"start_ms": w.start_ms,
"end_ms": w.end_ms,
"confidence": w.confidence,
}
for w in seg.words
],
}
for seg in result.segments
],
"language": result.language,
"language_probability": result.language_probability,
"duration_ms": result.duration_ms,
}