Files
local-transcription/client/audio_capture.py
Josh Knapp 0ba84e6ddd Improve transcription accuracy with overlapping audio chunks
Changes:
1. Changed UI text from "Recording" to "Transcribing" for clarity
2. Implemented overlapping audio chunks to prevent word cutoff

Audio Overlap Feature:
- Added overlap_duration parameter (default: 0.5 seconds)
- Audio chunks now overlap by 0.5s to capture words at boundaries
- Prevents missed words when chunks are processed separately
- Configurable via audio.overlap_duration in config.yaml

How it works:
- Each 3-second chunk includes 0.5s from the previous chunk
- Buffer advances by (chunk_size - overlap_size) instead of full chunk
- Ensures words at chunk boundaries are captured in at least one chunk
- No duplicate transcription due to Whisper's context handling

Example with 3s chunks and 0.5s overlap:
  Chunk 1: [0.0s - 3.0s]
  Chunk 2: [2.5s - 5.5s]  <- 0.5s overlap
  Chunk 3: [5.0s - 8.0s]  <- 0.5s overlap

Files modified:
- client/audio_capture.py: Implemented overlapping buffer logic
- config/default_config.yaml: Added overlap_duration setting
- gui/main_window_qt.py: Updated UI text, passed overlap param
- main_cli.py: Passed overlap param

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-26 08:47:19 -08:00

256 lines
8.5 KiB
Python

"""Audio capture module for recording microphone or system audio."""
import numpy as np
import sounddevice as sd
from scipy import signal
from typing import Callable, Optional, List, Tuple
from threading import Thread, Event
import queue
class AudioCapture:
"""Captures audio from input devices and provides chunks for processing."""
def __init__(
self,
sample_rate: int = 16000,
chunk_duration: float = 3.0,
overlap_duration: float = 0.5,
device: Optional[int] = None
):
"""
Initialize audio capture.
Args:
sample_rate: Target audio sample rate in Hz (16000 for Whisper)
chunk_duration: Duration of each audio chunk in seconds
overlap_duration: Duration of overlap between chunks in seconds (prevents word cutoff)
device: Input device index, or None for default
"""
self.target_sample_rate = sample_rate
self.chunk_duration = chunk_duration
self.overlap_duration = overlap_duration
self.device = device
self.chunk_size = int(sample_rate * chunk_duration)
self.overlap_size = int(sample_rate * overlap_duration)
# Hardware sample rate (will be auto-detected)
self.hardware_sample_rate = None
self.audio_queue = queue.Queue()
self.is_recording = False
self.stop_event = Event()
self.recording_thread: Optional[Thread] = None
def _detect_sample_rate(self) -> int:
"""
Detect a supported sample rate for the audio device.
Returns:
Supported sample rate
"""
# Try common sample rates in order of preference
common_rates = [self.target_sample_rate, 48000, 44100, 22050, 32000, 8000]
for rate in common_rates:
try:
# Try to create a test stream
with sd.InputStream(
device=self.device,
channels=1,
samplerate=rate,
blocksize=1024
):
print(f"Using hardware sample rate: {rate} Hz")
return rate
except sd.PortAudioError:
continue
# If nothing works, default to 48000
print(f"Warning: Could not detect sample rate, defaulting to 48000 Hz")
return 48000
def _resample(self, audio: np.ndarray, from_rate: int, to_rate: int) -> np.ndarray:
"""
Resample audio from one sample rate to another.
Args:
audio: Input audio data
from_rate: Source sample rate
to_rate: Target sample rate
Returns:
Resampled audio
"""
if from_rate == to_rate:
return audio
# Calculate resampling ratio
num_samples = int(len(audio) * to_rate / from_rate)
# Use scipy's resample for high-quality resampling
resampled = signal.resample(audio, num_samples)
return resampled.astype(np.float32)
@staticmethod
def get_input_devices() -> List[Tuple[int, str]]:
"""
Get list of available input audio devices.
Returns:
List of (device_index, device_name) tuples
"""
devices = []
device_list = sd.query_devices()
for i, device in enumerate(device_list):
# Only include devices with input channels
if device['max_input_channels'] > 0:
devices.append((i, device['name']))
return devices
@staticmethod
def get_default_device() -> Optional[Tuple[int, str]]:
"""
Get the default input device.
Returns:
(device_index, device_name) tuple or None
"""
try:
default_device = sd.query_devices(kind='input')
device_list = sd.query_devices()
for i, device in enumerate(device_list):
if device['name'] == default_device['name']:
return (i, device['name'])
except:
pass
return None
def _audio_callback(self, indata, frames, time_info, status):
"""Callback function for sounddevice stream."""
if status:
print(f"Audio status: {status}")
# Copy audio data to queue
audio_data = indata.copy().flatten()
self.audio_queue.put(audio_data)
def start_recording(self, callback: Optional[Callable[[np.ndarray], None]] = None):
"""
Start recording audio.
Args:
callback: Optional callback function to receive audio chunks
"""
if self.is_recording:
return
# Detect supported sample rate
self.hardware_sample_rate = self._detect_sample_rate()
self.is_recording = True
self.stop_event.clear()
def record_loop():
"""Recording loop that runs in a separate thread."""
buffer = np.array([], dtype=np.float32)
# Calculate hardware chunk and overlap sizes
hardware_chunk_size = int(self.hardware_sample_rate * self.chunk_duration)
hardware_overlap_size = int(self.hardware_sample_rate * self.overlap_duration)
hardware_stride = hardware_chunk_size - hardware_overlap_size
try:
with sd.InputStream(
device=self.device,
channels=1,
samplerate=self.hardware_sample_rate,
callback=self._audio_callback,
blocksize=int(self.hardware_sample_rate * 0.1) # 100ms blocks
):
while not self.stop_event.is_set():
try:
# Get audio data from queue (with timeout)
audio_chunk = self.audio_queue.get(timeout=0.1)
buffer = np.concatenate([buffer, audio_chunk])
# If we have enough data for a full chunk
if len(buffer) >= hardware_chunk_size:
# Extract chunk
chunk = buffer[:hardware_chunk_size]
# Move buffer forward by stride (not full chunk size)
# This creates overlap between consecutive chunks
buffer = buffer[hardware_stride:]
# Resample to target rate if needed
if self.hardware_sample_rate != self.target_sample_rate:
chunk = self._resample(
chunk,
self.hardware_sample_rate,
self.target_sample_rate
)
# Send to callback if provided
if callback:
callback(chunk)
except queue.Empty:
continue
except Exception as e:
print(f"Error in recording loop: {e}")
except Exception as e:
print(f"Error opening audio stream: {e}")
self.is_recording = False
self.recording_thread = Thread(target=record_loop, daemon=True)
self.recording_thread.start()
def stop_recording(self):
"""Stop recording audio."""
if not self.is_recording:
return
self.is_recording = False
self.stop_event.set()
if self.recording_thread:
self.recording_thread.join(timeout=2.0)
self.recording_thread = None
def get_audio_chunk(self, timeout: float = 1.0) -> Optional[np.ndarray]:
"""
Get the next audio chunk from the queue.
Args:
timeout: Maximum time to wait for a chunk
Returns:
Audio chunk as numpy array or None if timeout
"""
try:
return self.audio_queue.get(timeout=timeout)
except queue.Empty:
return None
def is_recording_active(self) -> bool:
"""Check if recording is currently active."""
return self.is_recording
def clear_queue(self):
"""Clear any pending audio chunks from the queue."""
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except queue.Empty:
break
def __del__(self):
"""Cleanup when object is destroyed."""
self.stop_recording()