Add chunked transcription for large audio files (>1 hour)

Split files >1 hour into 5-minute chunks via ffmpeg, transcribe each
chunk independently, then merge results with corrected timestamps.
Also add chunk-level progress markers every 10 segments for all files.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Claude
2026-03-20 13:49:20 -07:00
parent d00281f0c7
commit 16f4b57771
4 changed files with 248 additions and 8 deletions

View File

@@ -0,0 +1,21 @@
{
"permissions": {
"allow": [
"Bash(git init:*)",
"Bash(git:*)",
"WebSearch",
"Bash(npm create:*)",
"Bash(cp:*)",
"Bash(npm install:*)",
"Bash(/home/jknapp/.cargo/bin/cargo test:*)",
"Bash(ruff:*)",
"Bash(npm run:*)",
"Bash(npx svelte-check:*)",
"Bash(pip install:*)",
"Bash(python3:*)",
"Bash(/home/jknapp/.cargo/bin/cargo check:*)",
"Bash(cargo check:*)",
"Bash(npm ls:*)"
]
}
}

View File

@@ -49,3 +49,80 @@ def test_result_to_payload_empty():
assert payload["segments"] == []
assert payload["language"] == ""
assert payload["duration_ms"] == 0
def test_chunk_report_size_progress():
"""Test CHUNK_REPORT_SIZE progress emission."""
from voice_to_notes.services.transcribe import CHUNK_REPORT_SIZE
assert CHUNK_REPORT_SIZE == 10
def test_transcribe_chunked_with_mocked_ffmpeg(monkeypatch):
"""Test transcribe_chunked with mocked ffmpeg/ffprobe and mocked WhisperModel."""
from unittest.mock import MagicMock, patch
from voice_to_notes.services.transcribe import TranscribeService, SegmentResult, WordResult
# Mock subprocess.run for ffprobe (returns duration of 700s = ~2 chunks at 300s each)
original_run = __import__("subprocess").run
def mock_subprocess_run(cmd, **kwargs):
if "ffprobe" in cmd:
result = MagicMock()
result.stdout = "700.0\n"
result.returncode = 0
return result
elif "ffmpeg" in cmd:
# Create an empty temp file (simulate chunk extraction)
# The output file is the last argument
import pathlib
output_file = cmd[-1]
pathlib.Path(output_file).touch()
result = MagicMock()
result.returncode = 0
return result
return original_run(cmd, **kwargs)
# Mock WhisperModel
mock_model = MagicMock()
def mock_transcribe_call(file_path, **kwargs):
mock_segments = []
for i in range(3):
seg = MagicMock()
seg.start = i * 1.0
seg.end = (i + 1) * 1.0
seg.text = f"Segment {i}"
seg.words = []
mock_segments.append(seg)
mock_info = MagicMock()
mock_info.language = "en"
mock_info.language_probability = 0.99
mock_info.duration = 300.0
return iter(mock_segments), mock_info
mock_model.transcribe = mock_transcribe_call
service = TranscribeService()
service._model = mock_model
service._current_model_name = "base"
service._current_device = "cpu"
service._current_compute_type = "int8"
written_messages = []
def mock_write(msg):
written_messages.append(msg)
with patch("subprocess.run", mock_subprocess_run), \
patch("voice_to_notes.services.transcribe.write_message", mock_write):
result = service.transcribe_chunked("req-1", "/fake/long_audio.wav")
# Should have segments from multiple chunks
assert len(result.segments) > 0
# Verify timestamp offsets — segments from chunk 1 should start at 0,
# segments from chunk 2 should be offset by 300000ms
if len(result.segments) > 3:
# Chunk 2 segments should have offset timestamps
assert result.segments[3].start_ms >= 300000
assert result.duration_ms == 700000
assert result.language == "en"

View File

@@ -82,14 +82,38 @@ class PipelineService:
progress_message(request_id, 0, "pipeline", "Starting transcription pipeline...")
)
transcription = self._transcribe_service.transcribe(
request_id=request_id,
file_path=file_path,
model_name=model_name,
device=device,
compute_type=compute_type,
language=language,
)
# Probe audio duration for conditional chunked transcription
audio_duration_sec = None
try:
import subprocess
probe_result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, check=True,
)
audio_duration_sec = float(probe_result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
pass
from voice_to_notes.services.transcribe import LARGE_FILE_THRESHOLD_SEC
if audio_duration_sec and audio_duration_sec > LARGE_FILE_THRESHOLD_SEC:
transcription = self._transcribe_service.transcribe_chunked(
request_id=request_id,
file_path=file_path,
model_name=model_name,
device=device,
compute_type=compute_type,
language=language,
)
else:
transcription = self._transcribe_service.transcribe(
request_id=request_id,
file_path=file_path,
model_name=model_name,
device=device,
compute_type=compute_type,
language=language,
)
if skip_diarization:
# Convert transcription directly without speaker labels

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import sys
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any
@@ -12,6 +13,9 @@ from faster_whisper import WhisperModel
from voice_to_notes.ipc.messages import progress_message
from voice_to_notes.ipc.protocol import write_message
CHUNK_REPORT_SIZE = 10
LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour
@dataclass
class WordResult:
@@ -90,6 +94,7 @@ class TranscribeService:
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
) -> TranscriptionResult:
"""Transcribe an audio file with word-level timestamps.
@@ -156,6 +161,12 @@ class TranscribeService:
)
)
if segment_count % CHUNK_REPORT_SIZE == 0:
write_message(progress_message(
request_id, progress_pct, "transcribing",
f"Completed chunk of {CHUNK_REPORT_SIZE} segments "
f"({segment_count} total, {progress_pct}% of audio)..."))
elapsed = time.time() - start_time
print(
f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s",
@@ -166,6 +177,113 @@ class TranscribeService:
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return result
def transcribe_chunked(
self,
request_id: str,
file_path: str,
model_name: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str | None = None,
on_segment: Callable[[SegmentResult, int], None] | None = None,
chunk_duration_sec: int = 300,
) -> TranscriptionResult:
"""Transcribe a large audio file by splitting into chunks.
Uses ffmpeg to split the file into chunks, transcribes each chunk,
then merges the results with corrected timestamps.
Falls back to standard transcribe() if ffmpeg is not available.
"""
import subprocess
import tempfile
# Get total duration via ffprobe
try:
probe_result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1", file_path],
capture_output=True, text=True, check=True,
)
total_duration = float(probe_result.stdout.strip())
except (subprocess.CalledProcessError, FileNotFoundError, ValueError):
# ffprobe not available or failed — fall back to standard transcription
write_message(progress_message(
request_id, 5, "transcribing",
"ffmpeg not available, using standard transcription..."))
return self.transcribe(request_id, file_path, model_name, device,
compute_type, language, on_segment=on_segment)
num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1)
write_message(progress_message(
request_id, 5, "transcribing",
f"Splitting {total_duration:.0f}s file into {num_chunks} chunks..."))
merged_result = TranscriptionResult()
global_segment_index = 0
for chunk_idx in range(num_chunks):
chunk_start = chunk_idx * chunk_duration_sec
if chunk_start >= total_duration:
break
chunk_start_ms = int(chunk_start * 1000)
# Extract chunk to temp file
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
tmp.close()
try:
subprocess.run(
["ffmpeg", "-y", "-ss", str(chunk_start),
"-t", str(chunk_duration_sec),
"-i", file_path,
"-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le",
tmp.name],
capture_output=True, check=True,
)
# Wrap on_segment to offset the index
chunk_on_segment = None
if on_segment:
base_index = global_segment_index
def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None:
on_segment(seg, _base + idx)
chunk_result = self.transcribe(
request_id, tmp.name, model_name, device,
compute_type, language, on_segment=chunk_on_segment,
)
# Offset timestamps and merge
for seg in chunk_result.segments:
seg.start_ms += chunk_start_ms
seg.end_ms += chunk_start_ms
for word in seg.words:
word.start_ms += chunk_start_ms
word.end_ms += chunk_start_ms
merged_result.segments.append(seg)
global_segment_index += len(chunk_result.segments)
# Take language from first chunk
if chunk_idx == 0:
merged_result.language = chunk_result.language
merged_result.language_probability = chunk_result.language_probability
finally:
import os
os.unlink(tmp.name)
# Chunk progress
chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90)
write_message(progress_message(
request_id, chunk_pct, "transcribing",
f"Completed chunk {chunk_idx + 1}/{num_chunks}..."))
merged_result.duration_ms = int(total_duration * 1000)
write_message(progress_message(request_id, 100, "done", "Transcription complete"))
return merged_result
def result_to_payload(result: TranscriptionResult) -> dict[str, Any]:
"""Convert TranscriptionResult to IPC payload dict."""