From 16f4b5777126807e9aeca4e53b8f17f300c8ddeb Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 20 Mar 2026 13:49:20 -0700 Subject: [PATCH] Add chunked transcription for large audio files (>1 hour) Split files >1 hour into 5-minute chunks via ffmpeg, transcribe each chunk independently, then merge results with corrected timestamps. Also add chunk-level progress markers every 10 segments for all files. Co-Authored-By: Claude Opus 4.6 (1M context) --- .claude/settings.local.json | 21 ++++ python/tests/test_transcribe.py | 77 ++++++++++++ python/voice_to_notes/services/pipeline.py | 40 +++++-- python/voice_to_notes/services/transcribe.py | 118 +++++++++++++++++++ 4 files changed, 248 insertions(+), 8 deletions(-) create mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..0d775f8 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,21 @@ +{ + "permissions": { + "allow": [ + "Bash(git init:*)", + "Bash(git:*)", + "WebSearch", + "Bash(npm create:*)", + "Bash(cp:*)", + "Bash(npm install:*)", + "Bash(/home/jknapp/.cargo/bin/cargo test:*)", + "Bash(ruff:*)", + "Bash(npm run:*)", + "Bash(npx svelte-check:*)", + "Bash(pip install:*)", + "Bash(python3:*)", + "Bash(/home/jknapp/.cargo/bin/cargo check:*)", + "Bash(cargo check:*)", + "Bash(npm ls:*)" + ] + } +} diff --git a/python/tests/test_transcribe.py b/python/tests/test_transcribe.py index b9e4220..82db5ab 100644 --- a/python/tests/test_transcribe.py +++ b/python/tests/test_transcribe.py @@ -49,3 +49,80 @@ def test_result_to_payload_empty(): assert payload["segments"] == [] assert payload["language"] == "" assert payload["duration_ms"] == 0 + + +def test_chunk_report_size_progress(): + """Test CHUNK_REPORT_SIZE progress emission.""" + from voice_to_notes.services.transcribe import CHUNK_REPORT_SIZE + assert CHUNK_REPORT_SIZE == 10 + + +def test_transcribe_chunked_with_mocked_ffmpeg(monkeypatch): + """Test transcribe_chunked with mocked ffmpeg/ffprobe and mocked WhisperModel.""" + from unittest.mock import MagicMock, patch + from voice_to_notes.services.transcribe import TranscribeService, SegmentResult, WordResult + + # Mock subprocess.run for ffprobe (returns duration of 700s = ~2 chunks at 300s each) + original_run = __import__("subprocess").run + + def mock_subprocess_run(cmd, **kwargs): + if "ffprobe" in cmd: + result = MagicMock() + result.stdout = "700.0\n" + result.returncode = 0 + return result + elif "ffmpeg" in cmd: + # Create an empty temp file (simulate chunk extraction) + # The output file is the last argument + import pathlib + output_file = cmd[-1] + pathlib.Path(output_file).touch() + result = MagicMock() + result.returncode = 0 + return result + return original_run(cmd, **kwargs) + + # Mock WhisperModel + mock_model = MagicMock() + def mock_transcribe_call(file_path, **kwargs): + mock_segments = [] + for i in range(3): + seg = MagicMock() + seg.start = i * 1.0 + seg.end = (i + 1) * 1.0 + seg.text = f"Segment {i}" + seg.words = [] + mock_segments.append(seg) + mock_info = MagicMock() + mock_info.language = "en" + mock_info.language_probability = 0.99 + mock_info.duration = 300.0 + return iter(mock_segments), mock_info + + mock_model.transcribe = mock_transcribe_call + + service = TranscribeService() + service._model = mock_model + service._current_model_name = "base" + service._current_device = "cpu" + service._current_compute_type = "int8" + + written_messages = [] + def mock_write(msg): + written_messages.append(msg) + + with patch("subprocess.run", mock_subprocess_run), \ + patch("voice_to_notes.services.transcribe.write_message", mock_write): + result = service.transcribe_chunked("req-1", "/fake/long_audio.wav") + + # Should have segments from multiple chunks + assert len(result.segments) > 0 + + # Verify timestamp offsets — segments from chunk 1 should start at 0, + # segments from chunk 2 should be offset by 300000ms + if len(result.segments) > 3: + # Chunk 2 segments should have offset timestamps + assert result.segments[3].start_ms >= 300000 + + assert result.duration_ms == 700000 + assert result.language == "en" diff --git a/python/voice_to_notes/services/pipeline.py b/python/voice_to_notes/services/pipeline.py index 2d1f66b..2a83610 100644 --- a/python/voice_to_notes/services/pipeline.py +++ b/python/voice_to_notes/services/pipeline.py @@ -82,14 +82,38 @@ class PipelineService: progress_message(request_id, 0, "pipeline", "Starting transcription pipeline...") ) - transcription = self._transcribe_service.transcribe( - request_id=request_id, - file_path=file_path, - model_name=model_name, - device=device, - compute_type=compute_type, - language=language, - ) + # Probe audio duration for conditional chunked transcription + audio_duration_sec = None + try: + import subprocess + probe_result = subprocess.run( + ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", file_path], + capture_output=True, text=True, check=True, + ) + audio_duration_sec = float(probe_result.stdout.strip()) + except (subprocess.CalledProcessError, FileNotFoundError, ValueError): + pass + + from voice_to_notes.services.transcribe import LARGE_FILE_THRESHOLD_SEC + if audio_duration_sec and audio_duration_sec > LARGE_FILE_THRESHOLD_SEC: + transcription = self._transcribe_service.transcribe_chunked( + request_id=request_id, + file_path=file_path, + model_name=model_name, + device=device, + compute_type=compute_type, + language=language, + ) + else: + transcription = self._transcribe_service.transcribe( + request_id=request_id, + file_path=file_path, + model_name=model_name, + device=device, + compute_type=compute_type, + language=language, + ) if skip_diarization: # Convert transcription directly without speaker labels diff --git a/python/voice_to_notes/services/transcribe.py b/python/voice_to_notes/services/transcribe.py index 2539cfc..ae42bcf 100644 --- a/python/voice_to_notes/services/transcribe.py +++ b/python/voice_to_notes/services/transcribe.py @@ -4,6 +4,7 @@ from __future__ import annotations import sys import time +from collections.abc import Callable from dataclasses import dataclass, field from typing import Any @@ -12,6 +13,9 @@ from faster_whisper import WhisperModel from voice_to_notes.ipc.messages import progress_message from voice_to_notes.ipc.protocol import write_message +CHUNK_REPORT_SIZE = 10 +LARGE_FILE_THRESHOLD_SEC = 3600 # 1 hour + @dataclass class WordResult: @@ -90,6 +94,7 @@ class TranscribeService: device: str = "cpu", compute_type: str = "int8", language: str | None = None, + on_segment: Callable[[SegmentResult, int], None] | None = None, ) -> TranscriptionResult: """Transcribe an audio file with word-level timestamps. @@ -156,6 +161,12 @@ class TranscribeService: ) ) + if segment_count % CHUNK_REPORT_SIZE == 0: + write_message(progress_message( + request_id, progress_pct, "transcribing", + f"Completed chunk of {CHUNK_REPORT_SIZE} segments " + f"({segment_count} total, {progress_pct}% of audio)...")) + elapsed = time.time() - start_time print( f"[sidecar] Transcription complete: {segment_count} segments in {elapsed:.1f}s", @@ -166,6 +177,113 @@ class TranscribeService: write_message(progress_message(request_id, 100, "done", "Transcription complete")) return result + def transcribe_chunked( + self, + request_id: str, + file_path: str, + model_name: str = "base", + device: str = "cpu", + compute_type: str = "int8", + language: str | None = None, + on_segment: Callable[[SegmentResult, int], None] | None = None, + chunk_duration_sec: int = 300, + ) -> TranscriptionResult: + """Transcribe a large audio file by splitting into chunks. + + Uses ffmpeg to split the file into chunks, transcribes each chunk, + then merges the results with corrected timestamps. + + Falls back to standard transcribe() if ffmpeg is not available. + """ + import subprocess + import tempfile + + # Get total duration via ffprobe + try: + probe_result = subprocess.run( + ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", + "-of", "default=noprint_wrappers=1:nokey=1", file_path], + capture_output=True, text=True, check=True, + ) + total_duration = float(probe_result.stdout.strip()) + except (subprocess.CalledProcessError, FileNotFoundError, ValueError): + # ffprobe not available or failed — fall back to standard transcription + write_message(progress_message( + request_id, 5, "transcribing", + "ffmpeg not available, using standard transcription...")) + return self.transcribe(request_id, file_path, model_name, device, + compute_type, language, on_segment=on_segment) + + num_chunks = max(1, int(total_duration / chunk_duration_sec) + 1) + write_message(progress_message( + request_id, 5, "transcribing", + f"Splitting {total_duration:.0f}s file into {num_chunks} chunks...")) + + merged_result = TranscriptionResult() + global_segment_index = 0 + + for chunk_idx in range(num_chunks): + chunk_start = chunk_idx * chunk_duration_sec + if chunk_start >= total_duration: + break + + chunk_start_ms = int(chunk_start * 1000) + + # Extract chunk to temp file + tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) + tmp.close() + try: + subprocess.run( + ["ffmpeg", "-y", "-ss", str(chunk_start), + "-t", str(chunk_duration_sec), + "-i", file_path, + "-ar", "16000", "-ac", "1", "-c:a", "pcm_s16le", + tmp.name], + capture_output=True, check=True, + ) + + # Wrap on_segment to offset the index + chunk_on_segment = None + if on_segment: + base_index = global_segment_index + def chunk_on_segment(seg: SegmentResult, idx: int, _base=base_index) -> None: + on_segment(seg, _base + idx) + + chunk_result = self.transcribe( + request_id, tmp.name, model_name, device, + compute_type, language, on_segment=chunk_on_segment, + ) + + # Offset timestamps and merge + for seg in chunk_result.segments: + seg.start_ms += chunk_start_ms + seg.end_ms += chunk_start_ms + for word in seg.words: + word.start_ms += chunk_start_ms + word.end_ms += chunk_start_ms + merged_result.segments.append(seg) + + global_segment_index += len(chunk_result.segments) + + # Take language from first chunk + if chunk_idx == 0: + merged_result.language = chunk_result.language + merged_result.language_probability = chunk_result.language_probability + + finally: + import os + os.unlink(tmp.name) + + # Chunk progress + chunk_pct = min(10 + int(((chunk_idx + 1) / num_chunks) * 80), 90) + write_message(progress_message( + request_id, chunk_pct, "transcribing", + f"Completed chunk {chunk_idx + 1}/{num_chunks}...")) + + merged_result.duration_ms = int(total_duration * 1000) + write_message(progress_message(request_id, 100, "done", "Transcription complete")) + return merged_result + def result_to_payload(result: TranscriptionResult) -> dict[str, Any]: """Convert TranscriptionResult to IPC payload dict."""