Files

337 lines
12 KiB
Python
Raw Permalink Normal View History

"""Transcription service — faster-whisper pipeline with word-level timestamps."""
Phase 1 foundation: Tauri shell, Python sidecar, SQLite database Tauri v2 + Svelte + TypeScript frontend: - App shell with workspace layout (waveform, transcript, speakers, AI chat) - Placeholder components for all major UI areas - Typed stores (project, transcript, playback, AI) - TypeScript interfaces matching the database schema - Tauri bridge service with typed invoke wrappers - svelte-check passes with 0 errors Rust backend: - Tauri v2 app entry point with command registration - SQLite database layer (rusqlite with bundled SQLite) - Full schema: projects, media_files, speakers, segments, words, ai_outputs, annotations (with indexes) - Model structs with serde serialization - CRUD queries for projects, speakers, segments, words - Segment text editing preserves original text - Schema versioning for future migrations - 6 tests passing - Command stubs for project, transcribe, export, AI, settings, system - App state management Python sidecar: - JSON-line IPC protocol (stdin/stdout) - Message types: IPCMessage, progress, error, ready - Handler registry with routing and error handling - Ping/pong handler for connectivity testing - Service stubs: transcribe, diarize, pipeline, AI, export - Provider stubs: local (llama-server), OpenAI, Anthropic, LiteLLM - Hardware detection stubs - 14 tests passing, ruff clean Also adds: - Testing strategy document (docs/TESTING.md) - Validation script (scripts/validate.sh) - Updated .gitignore for Svelte, Rust, Python artifacts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:16:06 -08:00
from __future__ import annotations
import sys
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
from faster_whisper import WhisperModel
from voice_to_notes.ipc.messages import progress_message
from voice_to_notes.ipc.protocol import write_message
from voice_to_notes.utils.ffmpeg import get_ffmpeg_path, get_ffprobe_path
CHUNK_REPORT_SIZE = 10
LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour
@dataclass
class WordResult:
"""A single word with timestamp."""
word: str
start_ms: int
end_ms: int
confidence: float
@dataclass
class SegmentResult:
"""A transcription segment with words."""
text: str
start_ms: int
end_ms: int
words: list[WordResult] = field(default_factory=list)
@dataclass
class TranscriptionResult:
"""Full transcription output."""
segments: list[SegmentResult] = field(default_factory=list)
language: str = ""
language_probability: float = 0.0
duration_ms: int = 0
Phase 1 foundation: Tauri shell, Python sidecar, SQLite database Tauri v2 + Svelte + TypeScript frontend: - App shell with workspace layout (waveform, transcript, speakers, AI chat) - Placeholder components for all major UI areas - Typed stores (project, transcript, playback, AI) - TypeScript interfaces matching the database schema - Tauri bridge service with typed invoke wrappers - svelte-check passes with 0 errors Rust backend: - Tauri v2 app entry point with command registration - SQLite database layer (rusqlite with bundled SQLite) - Full schema: projects, media_files, speakers, segments, words, ai_outputs, annotations (with indexes) - Model structs with serde serialization - CRUD queries for projects, speakers, segments, words - Segment text editing preserves original text - Schema versioning for future migrations - 6 tests passing - Command stubs for project, transcribe, export, AI, settings, system - App state management Python sidecar: - JSON-line IPC protocol (stdin/stdout) - Message types: IPCMessage, progress, error, ready - Handler registry with routing and error handling - Ping/pong handler for connectivity testing - Service stubs: transcribe, diarize, pipeline, AI, export - Provider stubs: local (llama-server), OpenAI, Anthropic, LiteLLM - Hardware detection stubs - 14 tests passing, ruff clean Also adds: - Testing strategy document (docs/TESTING.md) - Validation script (scripts/validate.sh) - Updated .gitignore for Svelte, Rust, Python artifacts Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 15:16:06 -08:00
class TranscribeService:
"""Handles audio transcription via faster-whisper."""
def __init__(self) -> None:
self._model: WhisperModel | None = None
self._current_model_name: str = ""
self._current_device: str = ""
self._current_compute_type: str = ""
def _ensure_model(
self,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
) -> WhisperModel:
"""Load or reuse the Whisper model."""
if (
self._model is not None
and self._current_model_name == model_name
and self._current_device == device
and self._current_compute_type == compute_type
):
return self._model
print(
f"[sidecar] Loading model {model_name} on {device} ({compute_type})",
file=sys.stderr,
flush=True,
)
try:
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
except Exception as e:
if device != "cpu":
print(
f"[sidecar] Failed to load on {device}: {e}. Falling back to CPU.",
file=sys.stderr,
flush=True,
)
device = "cpu"
compute_type = "int8"
self._model = WhisperModel(
model_name,
device=device,
compute_type=compute_type,
)
else:
raise
self._current_model_name = model_name
self._current_device = device
self._current_compute_type = compute_type
return self._model
def transcribe(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_label: str | None = None,
) -> TranscriptionResult:
"""Transcribe an audio file with word-level timestamps.
Sends progress messages via IPC during processing.
If chunk_label is set (e.g. "chunk 3/12"), messages are prefixed with it.
"""
prefix = f"{chunk_label}: " if chunk_label else ""
# Stage: loading model (skip for chunks after the first — model already loaded)
if not chunk_label:
write_message(progress_message(request_id, 0, "loading_model", f"Loading {model_name}..."))
model = self._ensure_model(model_name, device, compute_type)
# Stage: transcribing
write_message(progress_message(request_id, 10, "transcribing", f"{prefix}Starting transcription..."))
start_time = time.time()
segments_iter, info = model.transcribe(
file_path,
language=language,
word_timestamps=True,
vad_filter=True,
)
result = TranscriptionResult(
language=info.language,
language_probability=info.language_probability,
duration_ms=int(info.duration * 1000),
)
# Process segments with progress reporting
total_duration = info.duration if info.duration > 0 else 1.0
segment_count = 0
for segment in segments_iter:
segment_count += 1
progress_pct = min(10 + int((segment.end / total_duration) * 80), 90)
words = []
if segment.words:
for w in segment.words:
words.append(
WordResult(
word=w.word.strip(),
start_ms=int(w.start * 1000),
end_ms=int(w.end * 1000),
confidence=round(w.probability, 4),
)
)
result.segments.append(
SegmentResult(
text=segment.text.strip(),
start_ms=int(segment.start * 1000),
end_ms=int(segment.end * 1000),
words=words,
)
)
if on_segment:
on_segment(result.segments[-1], segment_count - 1)
write_message(
progress_message(
request_id,
progress_pct,
"transcribing",
f"{prefix}Transcribing segment {segment_count} ({progress_pct}% of audio)...",
)
)
if segment_count % CHUNK_REPORT_SIZE == 0:
write_message(progress_message(
request_id, progress_pct, "transcribing",
f"Completed chunk of {CHUNK_REPORT_SIZE} segments "
f"({segment_count} total, {progress_pct}% of audio)..."))
elapsed = time.time() - start_time
print(
f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s",
file=sys.stderr,
flush=True,
)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return result
def transcribe_chunked(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_duration_sec: int = 300,
) -> TranscriptionResult:
"""Transcribe a large audio file by splitting into chunks.
Uses ffmpeg to split the file into chunks, transcribes each chunk,
then merges the results with corrected timestamps.
Falls back to standard transcribe() if ffmpeg is not available.
"""
import subprocess
import tempfile
# Get total duration via ffprobe
try:
probe_result = subprocess.run(
[get_ffprobe_path(), "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, check=True,
)
total_duration = float(probe_result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
# ffprobe not available or failed — fall back to standard transcription
write_message(progress_message(
request_id, 5, "transcribing",
"ffmpeg not available, using standard transcription..."))
return self.transcribe(request_id, file_path, model_name, device,
compute_type, language, on_segment=on_segment)
num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1)
write_message(progress_message(
request_id, 5, "transcribing",
f"Splitting {total_duration:.0f}s file into {num_chunks} chunks..."))
merged_result = TranscriptionResult()
global_segment_index = 0
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * chunk_duration_sec
if chunk_start >= total_duration:
break
chunk_start_ms = int(chunk_start * 1000)
# Extract chunk to temp file
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp.close()
try:
subprocess.run(
[get_ffmpeg_path(), "-y", "-ss", str(chunk_start),
"-t", str(chunk_duration_sec),
"-i", file_path,
"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le",
tmp.name],
capture_output=True, check=True,
)
# Wrap on_segment to offset the index
chunk_on_segment = None
if on_segment:
base_index = global_segment_index
def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None:
on_segment(seg, _base + idx)
chunk_result = self.transcribe(
request_id, tmp.name, model_name, device,
compute_type, language, on_segment=chunk_on_segment,
chunk_label=f"Chunk {chunk_idx + 1}/{num_chunks}",
)
# Offset timestamps and merge
for seg in chunk_result.segments:
seg.start_ms += chunk_start_ms
seg.end_ms += chunk_start_ms
for word in seg.words:
word.start_ms += chunk_start_ms
word.end_ms += chunk_start_ms
merged_result.segments.append(seg)
global_segment_index += len(chunk_result.segments)
# Take language from first chunk
if chunk_idx == 0:
merged_result.language = chunk_result.language
merged_result.language_probability = chunk_result.language_probability
finally:
import os
os.unlink(tmp.name)
# Chunk progress
chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90)
write_message(progress_message(
request_id, chunk_pct, "transcribing",
f"Completed chunk {chunk_idx + 1}/{num_chunks}..."))
merged_result.duration_ms = int(total_duration * 1000)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return merged_result
def result_to_payload(result: TranscriptionResult) -> dict[str, Any]:
"""Convert TranscriptionResult to IPC payload dict."""
return {
"segments": [
{
"text": seg.text,
"start_ms": seg.start_ms,
"end_ms": seg.end_ms,
"words": [
{
"word": w.word,
"start_ms": w.start_ms,
"end_ms": w.end_ms,
"confidence": w.confidence,
}
for w in seg.words
],
}
for seg in result.segments
],
"language": result.language,
"language_probability": result.language_probability,
"duration_ms": result.duration_ms,
}