165 lines
5.2 KiB
Python
165 lines
5.2 KiB
Python
|
|
"""Noise suppression module for reducing background noise in audio."""
|
||
|
|
|
||
|
|
import warnings
|
||
|
|
# Suppress pkg_resources deprecation warning from webrtcvad
|
||
|
|
warnings.filterwarnings("ignore", message=".*pkg_resources.*", category=UserWarning)
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
import noisereduce as nr
|
||
|
|
import webrtcvad
|
||
|
|
from typing import Optional
|
||
|
|
|
||
|
|
|
||
|
|
class NoiseSuppressor:
|
||
|
|
"""Handles noise reduction and voice activity detection."""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
sample_rate: int = 16000,
|
||
|
|
method: str = "noisereduce",
|
||
|
|
strength: float = 0.7,
|
||
|
|
use_vad: bool = True
|
||
|
|
):
|
||
|
|
"""
|
||
|
|
Initialize noise suppressor.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
sample_rate: Audio sample rate in Hz
|
||
|
|
method: Noise reduction method ('noisereduce' or 'none')
|
||
|
|
strength: Noise reduction strength (0.0 to 1.0)
|
||
|
|
use_vad: Whether to use Voice Activity Detection
|
||
|
|
"""
|
||
|
|
self.sample_rate = sample_rate
|
||
|
|
self.method = method
|
||
|
|
self.strength = max(0.0, min(1.0, strength)) # Clamp to [0, 1]
|
||
|
|
self.use_vad = use_vad
|
||
|
|
|
||
|
|
# Initialize VAD if requested
|
||
|
|
self.vad = None
|
||
|
|
if use_vad:
|
||
|
|
try:
|
||
|
|
# WebRTC VAD supports 16kHz, 32kHz, and 48kHz
|
||
|
|
if sample_rate in [8000, 16000, 32000, 48000]:
|
||
|
|
self.vad = webrtcvad.Vad(2) # Aggressiveness: 0-3 (2 is balanced)
|
||
|
|
else:
|
||
|
|
print(f"Warning: VAD not supported for sample rate {sample_rate}Hz")
|
||
|
|
self.use_vad = False
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Warning: Failed to initialize VAD: {e}")
|
||
|
|
self.use_vad = False
|
||
|
|
|
||
|
|
# Store noise profile for adaptive reduction
|
||
|
|
self.noise_profile: Optional[np.ndarray] = None
|
||
|
|
|
||
|
|
def reduce_noise(self, audio: np.ndarray) -> np.ndarray:
|
||
|
|
"""
|
||
|
|
Apply noise reduction to audio.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
audio: Audio data as numpy array (float32, range [-1, 1])
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Noise-reduced audio
|
||
|
|
"""
|
||
|
|
if self.method == "none" or self.strength == 0.0:
|
||
|
|
return audio
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Ensure audio is float32
|
||
|
|
audio = audio.astype(np.float32)
|
||
|
|
|
||
|
|
if self.method == "noisereduce":
|
||
|
|
# Apply noisereduce noise reduction
|
||
|
|
reduced = nr.reduce_noise(
|
||
|
|
y=audio,
|
||
|
|
sr=self.sample_rate,
|
||
|
|
prop_decrease=self.strength,
|
||
|
|
stationary=True
|
||
|
|
)
|
||
|
|
return reduced.astype(np.float32)
|
||
|
|
else:
|
||
|
|
return audio
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error in noise reduction: {e}")
|
||
|
|
return audio
|
||
|
|
|
||
|
|
def is_speech(self, audio: np.ndarray) -> bool:
|
||
|
|
"""
|
||
|
|
Detect if audio contains speech using VAD.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
audio: Audio data as numpy array (float32, range [-1, 1])
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
True if speech is detected, False otherwise
|
||
|
|
"""
|
||
|
|
if not self.use_vad or self.vad is None:
|
||
|
|
return True # Assume speech if VAD not available
|
||
|
|
|
||
|
|
try:
|
||
|
|
# Convert float32 audio to int16 for VAD
|
||
|
|
audio_int16 = (audio * 32767).astype(np.int16)
|
||
|
|
|
||
|
|
# VAD requires specific frame sizes (10, 20, or 30 ms)
|
||
|
|
frame_duration_ms = 30
|
||
|
|
frame_size = int(self.sample_rate * frame_duration_ms / 1000)
|
||
|
|
|
||
|
|
# Process audio in frames
|
||
|
|
num_frames = len(audio_int16) // frame_size
|
||
|
|
speech_frames = 0
|
||
|
|
|
||
|
|
for i in range(num_frames):
|
||
|
|
frame = audio_int16[i * frame_size:(i + 1) * frame_size]
|
||
|
|
if self.vad.is_speech(frame.tobytes(), self.sample_rate):
|
||
|
|
speech_frames += 1
|
||
|
|
|
||
|
|
# Consider it speech if more than 30% of frames contain speech
|
||
|
|
return speech_frames > (num_frames * 0.3)
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"Error in VAD: {e}")
|
||
|
|
return True # Assume speech on error
|
||
|
|
|
||
|
|
def process(self, audio: np.ndarray, skip_silent: bool = True) -> Optional[np.ndarray]:
|
||
|
|
"""
|
||
|
|
Process audio with noise reduction and optional VAD filtering.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
audio: Audio data as numpy array
|
||
|
|
skip_silent: If True, return None for non-speech audio
|
||
|
|
|
||
|
|
Returns:
|
||
|
|
Processed audio or None if silent (when skip_silent=True)
|
||
|
|
"""
|
||
|
|
# Check for speech first (before noise reduction)
|
||
|
|
if skip_silent and self.use_vad:
|
||
|
|
if not self.is_speech(audio):
|
||
|
|
return None
|
||
|
|
|
||
|
|
# Apply noise reduction
|
||
|
|
processed_audio = self.reduce_noise(audio)
|
||
|
|
|
||
|
|
return processed_audio
|
||
|
|
|
||
|
|
def set_strength(self, strength: float):
|
||
|
|
"""
|
||
|
|
Update noise reduction strength.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
strength: New strength value (0.0 to 1.0)
|
||
|
|
"""
|
||
|
|
self.strength = max(0.0, min(1.0, strength))
|
||
|
|
|
||
|
|
def set_vad_enabled(self, enabled: bool):
|
||
|
|
"""
|
||
|
|
Enable or disable Voice Activity Detection.
|
||
|
|
|
||
|
|
Args:
|
||
|
|
enabled: True to enable VAD, False to disable
|
||
|
|
"""
|
||
|
|
self.use_vad = enabled and self.vad is not None
|
||
|
|
|
||
|
|
def __repr__(self) -> str:
|
||
|
|
return f"NoiseSuppressor(method={self.method}, strength={self.strength}, vad={self.use_vad})"
|