Initial commit: Local Transcription App v1.0

Phase 1 Complete - Standalone Desktop Application

Features:
- Real-time speech-to-text with Whisper (faster-whisper)
- PySide6 desktop GUI with settings dialog
- Web server for OBS browser source integration
- Audio capture with automatic sample rate detection and resampling
- Noise suppression with Voice Activity Detection (VAD)
- Configurable display settings (font, timestamps, fade duration)
- Settings apply without restart (with automatic model reloading)
- Auto-fade for web display transcriptions
- CPU/GPU support with automatic device detection
- Standalone executable builds (PyInstaller)
- CUDA build support (works on systems without CUDA hardware)

Components:
- Audio capture with sounddevice
- Noise reduction with noisereduce + webrtcvad
- Transcription with faster-whisper
- GUI with PySide6
- Web server with FastAPI + WebSocket
- Configuration system with YAML

Build System:
- Standard builds (CPU-only): build.sh / build.bat
- CUDA builds (universal): build-cuda.sh / build-cuda.bat
- Comprehensive BUILD.md documentation
- Cross-platform support (Linux, Windows)

Documentation:
- README.md with project overview and quick start
- BUILD.md with detailed build instructions
- NEXT_STEPS.md with future enhancement roadmap
- INSTALL.md with setup instructions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-25 18:48:23 -08:00
commit 472233aec4
31 changed files with 5116 additions and 0 deletions

0
client/__init__.py Normal file
View File

246
client/audio_capture.py Normal file
View File

@@ -0,0 +1,246 @@
"""Audio capture module for recording microphone or system audio."""
import numpy as np
import sounddevice as sd
from scipy import signal
from typing import Callable, Optional, List, Tuple
from threading import Thread, Event
import queue
class AudioCapture:
"""Captures audio from input devices and provides chunks for processing."""
def __init__(
self,
sample_rate: int = 16000,
chunk_duration: float = 3.0,
device: Optional[int] = None
):
"""
Initialize audio capture.
Args:
sample_rate: Target audio sample rate in Hz (16000 for Whisper)
chunk_duration: Duration of each audio chunk in seconds
device: Input device index, or None for default
"""
self.target_sample_rate = sample_rate
self.chunk_duration = chunk_duration
self.device = device
self.chunk_size = int(sample_rate * chunk_duration)
# Hardware sample rate (will be auto-detected)
self.hardware_sample_rate = None
self.audio_queue = queue.Queue()
self.is_recording = False
self.stop_event = Event()
self.recording_thread: Optional[Thread] = None
def _detect_sample_rate(self) -> int:
"""
Detect a supported sample rate for the audio device.
Returns:
Supported sample rate
"""
# Try common sample rates in order of preference
common_rates = [self.target_sample_rate, 48000, 44100, 22050, 32000, 8000]
for rate in common_rates:
try:
# Try to create a test stream
with sd.InputStream(
device=self.device,
channels=1,
samplerate=rate,
blocksize=1024
):
print(f"Using hardware sample rate: {rate} Hz")
return rate
except sd.PortAudioError:
continue
# If nothing works, default to 48000
print(f"Warning: Could not detect sample rate, defaulting to 48000 Hz")
return 48000
def _resample(self, audio: np.ndarray, from_rate: int, to_rate: int) -> np.ndarray:
"""
Resample audio from one sample rate to another.
Args:
audio: Input audio data
from_rate: Source sample rate
to_rate: Target sample rate
Returns:
Resampled audio
"""
if from_rate == to_rate:
return audio
# Calculate resampling ratio
num_samples = int(len(audio) * to_rate / from_rate)
# Use scipy's resample for high-quality resampling
resampled = signal.resample(audio, num_samples)
return resampled.astype(np.float32)
@staticmethod
def get_input_devices() -> List[Tuple[int, str]]:
"""
Get list of available input audio devices.
Returns:
List of (device_index, device_name) tuples
"""
devices = []
device_list = sd.query_devices()
for i, device in enumerate(device_list):
# Only include devices with input channels
if device['max_input_channels'] > 0:
devices.append((i, device['name']))
return devices
@staticmethod
def get_default_device() -> Optional[Tuple[int, str]]:
"""
Get the default input device.
Returns:
(device_index, device_name) tuple or None
"""
try:
default_device = sd.query_devices(kind='input')
device_list = sd.query_devices()
for i, device in enumerate(device_list):
if device['name'] == default_device['name']:
return (i, device['name'])
except:
pass
return None
def _audio_callback(self, indata, frames, time_info, status):
"""Callback function for sounddevice stream."""
if status:
print(f"Audio status: {status}")
# Copy audio data to queue
audio_data = indata.copy().flatten()
self.audio_queue.put(audio_data)
def start_recording(self, callback: Optional[Callable[[np.ndarray], None]] = None):
"""
Start recording audio.
Args:
callback: Optional callback function to receive audio chunks
"""
if self.is_recording:
return
# Detect supported sample rate
self.hardware_sample_rate = self._detect_sample_rate()
self.is_recording = True
self.stop_event.clear()
def record_loop():
"""Recording loop that runs in a separate thread."""
buffer = np.array([], dtype=np.float32)
# Calculate hardware chunk size
hardware_chunk_size = int(self.hardware_sample_rate * self.chunk_duration)
try:
with sd.InputStream(
device=self.device,
channels=1,
samplerate=self.hardware_sample_rate,
callback=self._audio_callback,
blocksize=int(self.hardware_sample_rate * 0.1) # 100ms blocks
):
while not self.stop_event.is_set():
try:
# Get audio data from queue (with timeout)
audio_chunk = self.audio_queue.get(timeout=0.1)
buffer = np.concatenate([buffer, audio_chunk])
# If we have enough data for a full chunk
if len(buffer) >= hardware_chunk_size:
# Extract chunk
chunk = buffer[:hardware_chunk_size]
buffer = buffer[hardware_chunk_size:]
# Resample to target rate if needed
if self.hardware_sample_rate != self.target_sample_rate:
chunk = self._resample(
chunk,
self.hardware_sample_rate,
self.target_sample_rate
)
# Send to callback if provided
if callback:
callback(chunk)
except queue.Empty:
continue
except Exception as e:
print(f"Error in recording loop: {e}")
except Exception as e:
print(f"Error opening audio stream: {e}")
self.is_recording = False
self.recording_thread = Thread(target=record_loop, daemon=True)
self.recording_thread.start()
def stop_recording(self):
"""Stop recording audio."""
if not self.is_recording:
return
self.is_recording = False
self.stop_event.set()
if self.recording_thread:
self.recording_thread.join(timeout=2.0)
self.recording_thread = None
def get_audio_chunk(self, timeout: float = 1.0) -> Optional[np.ndarray]:
"""
Get the next audio chunk from the queue.
Args:
timeout: Maximum time to wait for a chunk
Returns:
Audio chunk as numpy array or None if timeout
"""
try:
return self.audio_queue.get(timeout=timeout)
except queue.Empty:
return None
def is_recording_active(self) -> bool:
"""Check if recording is currently active."""
return self.is_recording
def clear_queue(self):
"""Clear any pending audio chunks from the queue."""
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except queue.Empty:
break
def __del__(self):
"""Cleanup when object is destroyed."""
self.stop_recording()

141
client/config.py Normal file
View File

@@ -0,0 +1,141 @@
"""Configuration management for the local transcription application."""
import os
import yaml
from pathlib import Path
from typing import Any, Dict, Optional
class Config:
"""Manages application configuration with YAML file storage."""
def __init__(self, config_path: Optional[str] = None):
"""
Initialize configuration.
Args:
config_path: Path to configuration file. If None, uses default location.
"""
self.app_dir = Path.home() / ".local-transcription"
self.app_dir.mkdir(parents=True, exist_ok=True)
if config_path is None:
self.config_path = self.app_dir / "config.yaml"
else:
self.config_path = Path(config_path)
self.config: Dict[str, Any] = {}
self.load()
def load(self) -> None:
"""Load configuration from file or create default if not exists."""
if self.config_path.exists():
with open(self.config_path, 'r') as f:
self.config = yaml.safe_load(f) or {}
else:
# Load default configuration
default_config_path = Path(__file__).parent.parent / "config" / "default_config.yaml"
if default_config_path.exists():
with open(default_config_path, 'r') as f:
self.config = yaml.safe_load(f) or {}
else:
self.config = self._get_default_config()
# Save the default configuration
self.save()
def save(self) -> None:
"""Save current configuration to file."""
with open(self.config_path, 'w') as f:
yaml.dump(self.config, f, default_flow_style=False, indent=2)
def get(self, key_path: str, default: Any = None) -> Any:
"""
Get configuration value using dot notation.
Args:
key_path: Dot-separated path to config value (e.g., "audio.sample_rate")
default: Default value if key not found
Returns:
Configuration value or default
"""
keys = key_path.split('.')
value = self.config
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
return default
return value
def set(self, key_path: str, value: Any) -> None:
"""
Set configuration value using dot notation.
Args:
key_path: Dot-separated path to config value (e.g., "audio.sample_rate")
value: Value to set
"""
keys = key_path.split('.')
config = self.config
# Navigate to the parent dict
for key in keys[:-1]:
if key not in config:
config[key] = {}
config = config[key]
# Set the value
config[keys[-1]] = value
self.save()
def _get_default_config(self) -> Dict[str, Any]:
"""Get hardcoded default configuration."""
return {
'user': {
'name': 'User',
'id': ''
},
'audio': {
'input_device': 'default',
'sample_rate': 16000,
'chunk_duration': 3.0
},
'noise_suppression': {
'enabled': True,
'strength': 0.7,
'method': 'noisereduce'
},
'transcription': {
'model': 'base',
'device': 'auto',
'language': 'en',
'task': 'transcribe'
},
'processing': {
'use_vad': True,
'min_confidence': 0.5
},
'server_sync': {
'enabled': False,
'url': 'ws://localhost:8000',
'api_key': ''
},
'display': {
'show_timestamps': True,
'max_lines': 100,
'font_size': 12,
'theme': 'dark'
}
}
def reset_to_default(self) -> None:
"""Reset configuration to default values."""
self.config = self._get_default_config()
self.save()
def __repr__(self) -> str:
return f"Config(path={self.config_path})"

128
client/device_utils.py Normal file
View File

@@ -0,0 +1,128 @@
"""Utilities for detecting and managing compute devices (CPU/GPU)."""
import torch
from typing import List, Tuple
class DeviceManager:
"""Manages device detection and selection for transcription."""
def __init__(self):
"""Initialize device manager and detect available devices."""
self.available_devices = self._detect_devices()
self.current_device = self.available_devices[0] if self.available_devices else "cpu"
def _detect_devices(self) -> List[str]:
"""
Detect available compute devices.
Returns:
List of available device names
"""
devices = ["cpu"]
# Check for CUDA (NVIDIA GPU)
if torch.cuda.is_available():
devices.append("cuda")
# Check for MPS (Apple Silicon GPU)
if hasattr(torch.backends, 'mps') and torch.backends.mps.is_available():
devices.append("mps")
return devices
def get_device_info(self) -> List[Tuple[str, str]]:
"""
Get detailed information about available devices.
Returns:
List of (device_name, device_description) tuples
"""
info = []
for device in self.available_devices:
if device == "cpu":
info.append(("cpu", "CPU"))
elif device == "cuda":
try:
gpu_name = torch.cuda.get_device_name(0)
info.append(("cuda", f"CUDA GPU: {gpu_name}"))
except:
info.append(("cuda", "CUDA GPU"))
elif device == "mps":
info.append(("mps", "Apple Silicon GPU (MPS)"))
return info
def set_device(self, device: str) -> bool:
"""
Set the current device for transcription.
Args:
device: Device name ('cpu', 'cuda', 'mps', or 'auto')
Returns:
True if device was set successfully, False otherwise
"""
if device == "auto":
# Auto-select best available device
if "cuda" in self.available_devices:
self.current_device = "cuda"
elif "mps" in self.available_devices:
self.current_device = "mps"
else:
self.current_device = "cpu"
return True
if device in self.available_devices:
self.current_device = device
return True
return False
def get_device(self) -> str:
"""
Get the currently selected device.
Returns:
Current device name
"""
return self.current_device
def is_gpu_available(self) -> bool:
"""
Check if any GPU is available.
Returns:
True if CUDA or MPS is available
"""
return "cuda" in self.available_devices or "mps" in self.available_devices
def get_device_for_whisper(self) -> str:
"""
Get device string formatted for faster-whisper.
Returns:
Device string for faster-whisper ('cpu', 'cuda', etc.)
"""
if self.current_device == "mps":
# faster-whisper doesn't support MPS, fall back to CPU
return "cpu"
return self.current_device
def get_compute_type(self) -> str:
"""
Get the appropriate compute type for the current device.
Returns:
Compute type string for faster-whisper
"""
if self.current_device == "cuda":
# Use float16 for GPU for better performance
return "float16"
else:
# Use int8 for CPU for better performance
return "int8"
def __repr__(self) -> str:
return f"DeviceManager(current={self.current_device}, available={self.available_devices})"

164
client/noise_suppression.py Normal file
View File

@@ -0,0 +1,164 @@
"""Noise suppression module for reducing background noise in audio."""
import warnings
# Suppress pkg_resources deprecation warning from webrtcvad
warnings.filterwarnings("ignore", message=".*pkg_resources.*", category=UserWarning)
import numpy as np
import noisereduce as nr
import webrtcvad
from typing import Optional
class NoiseSuppressor:
"""Handles noise reduction and voice activity detection."""
def __init__(
self,
sample_rate: int = 16000,
method: str = "noisereduce",
strength: float = 0.7,
use_vad: bool = True
):
"""
Initialize noise suppressor.
Args:
sample_rate: Audio sample rate in Hz
method: Noise reduction method ('noisereduce' or 'none')
strength: Noise reduction strength (0.0 to 1.0)
use_vad: Whether to use Voice Activity Detection
"""
self.sample_rate = sample_rate
self.method = method
self.strength = max(0.0, min(1.0, strength)) # Clamp to [0, 1]
self.use_vad = use_vad
# Initialize VAD if requested
self.vad = None
if use_vad:
try:
# WebRTC VAD supports 16kHz, 32kHz, and 48kHz
if sample_rate in [8000, 16000, 32000, 48000]:
self.vad = webrtcvad.Vad(2) # Aggressiveness: 0-3 (2 is balanced)
else:
print(f"Warning: VAD not supported for sample rate {sample_rate}Hz")
self.use_vad = False
except Exception as e:
print(f"Warning: Failed to initialize VAD: {e}")
self.use_vad = False
# Store noise profile for adaptive reduction
self.noise_profile: Optional[np.ndarray] = None
def reduce_noise(self, audio: np.ndarray) -> np.ndarray:
"""
Apply noise reduction to audio.
Args:
audio: Audio data as numpy array (float32, range [-1, 1])
Returns:
Noise-reduced audio
"""
if self.method == "none" or self.strength == 0.0:
return audio
try:
# Ensure audio is float32
audio = audio.astype(np.float32)
if self.method == "noisereduce":
# Apply noisereduce noise reduction
reduced = nr.reduce_noise(
y=audio,
sr=self.sample_rate,
prop_decrease=self.strength,
stationary=True
)
return reduced.astype(np.float32)
else:
return audio
except Exception as e:
print(f"Error in noise reduction: {e}")
return audio
def is_speech(self, audio: np.ndarray) -> bool:
"""
Detect if audio contains speech using VAD.
Args:
audio: Audio data as numpy array (float32, range [-1, 1])
Returns:
True if speech is detected, False otherwise
"""
if not self.use_vad or self.vad is None:
return True # Assume speech if VAD not available
try:
# Convert float32 audio to int16 for VAD
audio_int16 = (audio * 32767).astype(np.int16)
# VAD requires specific frame sizes (10, 20, or 30 ms)
frame_duration_ms = 30
frame_size = int(self.sample_rate * frame_duration_ms / 1000)
# Process audio in frames
num_frames = len(audio_int16) // frame_size
speech_frames = 0
for i in range(num_frames):
frame = audio_int16[i * frame_size:(i + 1) * frame_size]
if self.vad.is_speech(frame.tobytes(), self.sample_rate):
speech_frames += 1
# Consider it speech if more than 30% of frames contain speech
return speech_frames > (num_frames * 0.3)
except Exception as e:
print(f"Error in VAD: {e}")
return True # Assume speech on error
def process(self, audio: np.ndarray, skip_silent: bool = True) -> Optional[np.ndarray]:
"""
Process audio with noise reduction and optional VAD filtering.
Args:
audio: Audio data as numpy array
skip_silent: If True, return None for non-speech audio
Returns:
Processed audio or None if silent (when skip_silent=True)
"""
# Check for speech first (before noise reduction)
if skip_silent and self.use_vad:
if not self.is_speech(audio):
return None
# Apply noise reduction
processed_audio = self.reduce_noise(audio)
return processed_audio
def set_strength(self, strength: float):
"""
Update noise reduction strength.
Args:
strength: New strength value (0.0 to 1.0)
"""
self.strength = max(0.0, min(1.0, strength))
def set_vad_enabled(self, enabled: bool):
"""
Enable or disable Voice Activity Detection.
Args:
enabled: True to enable VAD, False to disable
"""
self.use_vad = enabled and self.vad is not None
def __repr__(self) -> str:
return f"NoiseSuppressor(method={self.method}, strength={self.strength}, vad={self.use_vad})"

View File

@@ -0,0 +1,232 @@
"""Transcription engine using faster-whisper for speech-to-text."""
import numpy as np
from faster_whisper import WhisperModel
from typing import Optional, List, Tuple
from datetime import datetime
import threading
class TranscriptionResult:
"""Represents a transcription result."""
def __init__(self, text: str, confidence: float, timestamp: datetime, user_name: str = ""):
"""
Initialize transcription result.
Args:
text: Transcribed text
confidence: Confidence score (0.0 to 1.0)
timestamp: Timestamp of transcription
user_name: Name of the user/speaker
"""
self.text = text.strip()
self.confidence = confidence
self.timestamp = timestamp
self.user_name = user_name
def __repr__(self) -> str:
time_str = self.timestamp.strftime("%H:%M:%S")
if self.user_name:
return f"[{time_str}] {self.user_name}: {self.text}"
return f"[{time_str}] {self.text}"
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
'text': self.text,
'confidence': self.confidence,
'timestamp': self.timestamp.isoformat(),
'user_name': self.user_name
}
class TranscriptionEngine:
"""Handles speech-to-text transcription using faster-whisper."""
def __init__(
self,
model_size: str = "base",
device: str = "cpu",
compute_type: str = "int8",
language: str = "en",
min_confidence: float = 0.5
):
"""
Initialize transcription engine.
Args:
model_size: Whisper model size ('tiny', 'base', 'small', 'medium', 'large')
device: Device to use ('cpu', 'cuda', 'auto')
compute_type: Compute type ('int8', 'float16', 'float32')
language: Language code for transcription
min_confidence: Minimum confidence threshold for transcriptions
"""
self.model_size = model_size
self.device = device
self.compute_type = compute_type
self.language = language
self.min_confidence = min_confidence
self.model: Optional[WhisperModel] = None
self.model_lock = threading.Lock()
self.is_loaded = False
def load_model(self) -> bool:
"""
Load the Whisper model.
Returns:
True if model loaded successfully, False otherwise
"""
try:
print(f"Loading Whisper {self.model_size} model on {self.device}...")
with self.model_lock:
self.model = WhisperModel(
self.model_size,
device=self.device,
compute_type=self.compute_type
)
self.is_loaded = True
print(f"Model loaded successfully!")
return True
except Exception as e:
print(f"Error loading model: {e}")
self.is_loaded = False
return False
def transcribe(
self,
audio: np.ndarray,
sample_rate: int = 16000,
user_name: str = ""
) -> Optional[TranscriptionResult]:
"""
Transcribe audio to text.
Args:
audio: Audio data as numpy array (float32)
sample_rate: Audio sample rate in Hz
user_name: Name of the user/speaker
Returns:
TranscriptionResult or None if transcription failed or confidence too low
"""
if not self.is_loaded or self.model is None:
print("Model not loaded")
return None
try:
# Ensure audio is float32
audio = audio.astype(np.float32)
# Transcribe using faster-whisper
with self.model_lock:
segments, info = self.model.transcribe(
audio,
language=self.language if self.language != "auto" else None,
vad_filter=True, # Use built-in VAD
vad_parameters=dict(
min_silence_duration_ms=500
)
)
# Collect all segments
full_text = ""
total_confidence = 0.0
segment_count = 0
for segment in segments:
full_text += segment.text + " "
total_confidence += segment.avg_logprob
segment_count += 1
# Calculate average confidence
if segment_count == 0:
return None
# Convert log probability to approximate confidence (0-1 range)
# avg_logprob is typically in range [-1, 0], so we transform it
avg_confidence = np.exp(total_confidence / segment_count)
# Filter by minimum confidence
if avg_confidence < self.min_confidence:
return None
# Clean up text
text = full_text.strip()
if not text:
return None
# Create result
result = TranscriptionResult(
text=text,
confidence=avg_confidence,
timestamp=datetime.now(),
user_name=user_name
)
return result
except Exception as e:
print(f"Error during transcription: {e}")
return None
def change_model(self, model_size: str) -> bool:
"""
Change to a different model size.
Args:
model_size: New model size
Returns:
True if model changed successfully
"""
self.model_size = model_size
self.is_loaded = False
self.model = None
return self.load_model()
def change_device(self, device: str, compute_type: Optional[str] = None) -> bool:
"""
Change compute device.
Args:
device: New device ('cpu', 'cuda', etc.)
compute_type: Optional new compute type
Returns:
True if device changed successfully
"""
self.device = device
if compute_type:
self.compute_type = compute_type
self.is_loaded = False
self.model = None
return self.load_model()
def change_language(self, language: str):
"""
Change transcription language.
Args:
language: Language code or 'auto'
"""
self.language = language
def unload_model(self):
"""Unload the model from memory."""
with self.model_lock:
self.model = None
self.is_loaded = False
def __repr__(self) -> str:
return f"TranscriptionEngine(model={self.model_size}, device={self.device}, loaded={self.is_loaded})"
def __del__(self):
"""Cleanup when object is destroyed."""
self.unload_model()