"""Audio capture module for recording microphone or system audio.""" import numpy as np import sounddevice as sd from scipy import signal from typing import Callable, Optional, List, Tuple from threading import Thread, Event import queue class AudioCapture: """Captures audio from input devices and provides chunks for processing.""" def __init__( self, sample_rate: int = 16000, chunk_duration: float = 3.0, device: Optional[int] = None ): """ Initialize audio capture. Args: sample_rate: Target audio sample rate in Hz (16000 for Whisper) chunk_duration: Duration of each audio chunk in seconds device: Input device index, or None for default """ self.target_sample_rate = sample_rate self.chunk_duration = chunk_duration self.device = device self.chunk_size = int(sample_rate * chunk_duration) # Hardware sample rate (will be auto-detected) self.hardware_sample_rate = None self.audio_queue = queue.Queue() self.is_recording = False self.stop_event = Event() self.recording_thread: Optional[Thread] = None def _detect_sample_rate(self) -> int: """ Detect a supported sample rate for the audio device. Returns: Supported sample rate """ # Try common sample rates in order of preference common_rates = [self.target_sample_rate, 48000, 44100, 22050, 32000, 8000] for rate in common_rates: try: # Try to create a test stream with sd.InputStream( device=self.device, channels=1, samplerate=rate, blocksize=1024 ): print(f"Using hardware sample rate: {rate} Hz") return rate except sd.PortAudioError: continue # If nothing works, default to 48000 print(f"Warning: Could not detect sample rate, defaulting to 48000 Hz") return 48000 def _resample(self, audio: np.ndarray, from_rate: int, to_rate: int) -> np.ndarray: """ Resample audio from one sample rate to another. Args: audio: Input audio data from_rate: Source sample rate to_rate: Target sample rate Returns: Resampled audio """ if from_rate == to_rate: return audio # Calculate resampling ratio num_samples = int(len(audio) * to_rate / from_rate) # Use scipy's resample for high-quality resampling resampled = signal.resample(audio, num_samples) return resampled.astype(np.float32) @staticmethod def get_input_devices() -> List[Tuple[int, str]]: """ Get list of available input audio devices. Returns: List of (device_index, device_name) tuples """ devices = [] device_list = sd.query_devices() for i, device in enumerate(device_list): # Only include devices with input channels if device['max_input_channels'] > 0: devices.append((i, device['name'])) return devices @staticmethod def get_default_device() -> Optional[Tuple[int, str]]: """ Get the default input device. Returns: (device_index, device_name) tuple or None """ try: default_device = sd.query_devices(kind='input') device_list = sd.query_devices() for i, device in enumerate(device_list): if device['name'] == default_device['name']: return (i, device['name']) except: pass return None def _audio_callback(self, indata, frames, time_info, status): """Callback function for sounddevice stream.""" if status: print(f"Audio status: {status}") # Copy audio data to queue audio_data = indata.copy().flatten() self.audio_queue.put(audio_data) def start_recording(self, callback: Optional[Callable[[np.ndarray], None]] = None): """ Start recording audio. Args: callback: Optional callback function to receive audio chunks """ if self.is_recording: return # Detect supported sample rate self.hardware_sample_rate = self._detect_sample_rate() self.is_recording = True self.stop_event.clear() def record_loop(): """Recording loop that runs in a separate thread.""" buffer = np.array([], dtype=np.float32) # Calculate hardware chunk size hardware_chunk_size = int(self.hardware_sample_rate * self.chunk_duration) try: with sd.InputStream( device=self.device, channels=1, samplerate=self.hardware_sample_rate, callback=self._audio_callback, blocksize=int(self.hardware_sample_rate * 0.1) # 100ms blocks ): while not self.stop_event.is_set(): try: # Get audio data from queue (with timeout) audio_chunk = self.audio_queue.get(timeout=0.1) buffer = np.concatenate([buffer, audio_chunk]) # If we have enough data for a full chunk if len(buffer) >= hardware_chunk_size: # Extract chunk chunk = buffer[:hardware_chunk_size] buffer = buffer[hardware_chunk_size:] # Resample to target rate if needed if self.hardware_sample_rate != self.target_sample_rate: chunk = self._resample( chunk, self.hardware_sample_rate, self.target_sample_rate ) # Send to callback if provided if callback: callback(chunk) except queue.Empty: continue except Exception as e: print(f"Error in recording loop: {e}") except Exception as e: print(f"Error opening audio stream: {e}") self.is_recording = False self.recording_thread = Thread(target=record_loop, daemon=True) self.recording_thread.start() def stop_recording(self): """Stop recording audio.""" if not self.is_recording: return self.is_recording = False self.stop_event.set() if self.recording_thread: self.recording_thread.join(timeout=2.0) self.recording_thread = None def get_audio_chunk(self, timeout: float = 1.0) -> Optional[np.ndarray]: """ Get the next audio chunk from the queue. Args: timeout: Maximum time to wait for a chunk Returns: Audio chunk as numpy array or None if timeout """ try: return self.audio_queue.get(timeout=timeout) except queue.Empty: return None def is_recording_active(self) -> bool: """Check if recording is currently active.""" return self.is_recording def clear_queue(self): """Clear any pending audio chunks from the queue.""" while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except queue.Empty: break def __del__(self): """Cleanup when object is destroyed.""" self.stop_recording()