Add test suite (63 tests) and CI workflow, fix Settings API bugs
Some checks failed
Release / Bump version and tag (push) Successful in 4s
Sidecar Release / Bump sidecar version and tag (push) Failing after 3s
Tests / Python Backend Tests (push) Failing after 3s
Tests / Frontend Tests (push) Successful in 8s
Tests / Rust Sidecar Tests (push) Successful in 3m10s
Some checks failed
Release / Bump version and tag (push) Successful in 4s
Sidecar Release / Bump sidecar version and tag (push) Failing after 3s
Tests / Python Backend Tests (push) Failing after 3s
Tests / Frontend Tests (push) Successful in 8s
Tests / Rust Sidecar Tests (push) Successful in 3m10s
Test suite covering all three layers: Python backend (25 tests): - AppController: state machine, start/stop, callbacks, settings reload - API server: REST endpoints, config CRUD, status, devices - Config: dot-notation get/set, persistence, nested paths - Main headless: ready event port format validation Svelte frontend (14 tests via Vitest): - Backend store: exported properties/methods, port derivation, URLs - Config store: method names (fetchConfig not loadConfig), defaults - Transcriptions store: add/clear/plaintext - File extension regression: ensures $state runes only in .svelte.ts Rust sidecar (24 tests via cargo test): - Platform/arch detection, asset name construction - Ready event deserialization (with extra fields tolerance) - Path construction, version read/write, old version cleanup - Zip extraction, SidecarManager lifecycle CI workflow (.gitea/workflows/test.yml): - Runs on push to main and PRs - Three parallel jobs: Python, Frontend, Rust Also fixes three bugs found during test planning: - Settings: /api/check-updates -> GET /api/check-update - Settings: /api/remote/login -> /api/login - Settings: /api/remote/register -> /api/register Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
0
backend/tests/__init__.py
Normal file
0
backend/tests/__init__.py
Normal file
159
backend/tests/conftest.py
Normal file
159
backend/tests/conftest.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""Shared fixtures for backend tests.
|
||||
|
||||
Heavy third-party modules (torch, sounddevice, numpy, RealtimeSTT, etc.) are
|
||||
stubbed at the *sys.modules* level before any backend code is imported. This
|
||||
lets the test suite run on a plain Python install without GPU drivers, audio
|
||||
hardware, or heavyweight ML libraries.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import types
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Project root on sys.path ────────────────────────────────────────
|
||||
project_root = Path(__file__).resolve().parent.parent.parent
|
||||
if str(project_root) not in sys.path:
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
|
||||
# ── Stub heavy modules before anything imports them ─────────────────
|
||||
|
||||
def _stub(name: str) -> types.ModuleType:
|
||||
"""Create a stub module and register it in sys.modules if not already present."""
|
||||
if name in sys.modules:
|
||||
return sys.modules[name]
|
||||
mod = types.ModuleType(name)
|
||||
sys.modules[name] = mod
|
||||
return mod
|
||||
|
||||
|
||||
# numpy -- must behave like a real module for `import numpy as np`
|
||||
_np = _stub("numpy")
|
||||
_np.float32 = float
|
||||
_np.float64 = float
|
||||
_np.int16 = int
|
||||
_np.ndarray = MagicMock
|
||||
_np.array = MagicMock(return_value=MagicMock())
|
||||
_np.zeros = MagicMock(return_value=MagicMock())
|
||||
_np.frombuffer = MagicMock(return_value=MagicMock())
|
||||
|
||||
# torch + sub-modules
|
||||
_torch = _stub("torch")
|
||||
_torch.cuda = MagicMock()
|
||||
_torch.cuda.is_available = MagicMock(return_value=False)
|
||||
_torch.backends = MagicMock()
|
||||
_torch.backends.mps = MagicMock()
|
||||
_torch.backends.mps.is_available = MagicMock(return_value=False)
|
||||
_stub("torch.cuda")
|
||||
_stub("torch.backends")
|
||||
_stub("torch.backends.mps")
|
||||
_stub("torchaudio")
|
||||
|
||||
# sounddevice
|
||||
_sd = _stub("sounddevice")
|
||||
_sd.query_devices = MagicMock(return_value=[])
|
||||
|
||||
# RealtimeSTT (imported by transcription_engine_realtime)
|
||||
_rtstt = _stub("RealtimeSTT")
|
||||
_rtstt.AudioToTextRecorder = MagicMock
|
||||
|
||||
# faster_whisper (sometimes imported transitively)
|
||||
_stub("faster_whisper")
|
||||
|
||||
# noisereduce
|
||||
_stub("noisereduce")
|
||||
|
||||
# scipy
|
||||
_scipy = _stub("scipy")
|
||||
_stub("scipy.signal")
|
||||
_stub("scipy.io")
|
||||
_stub("scipy.io.wavfile")
|
||||
|
||||
# webrtcvad
|
||||
_stub("webrtcvad")
|
||||
|
||||
# openwakeword
|
||||
_stub("openwakeword")
|
||||
|
||||
# pvporcupine
|
||||
_stub("pvporcupine")
|
||||
|
||||
# PySide6 (should not be needed, but just in case)
|
||||
_stub("PySide6")
|
||||
_stub("PySide6.QtWidgets")
|
||||
_stub("PySide6.QtCore")
|
||||
_stub("PySide6.QtGui")
|
||||
|
||||
# websockets
|
||||
_ws = _stub("websockets")
|
||||
_ws.connect = MagicMock
|
||||
|
||||
# deepgram (cloud transcription)
|
||||
_stub("deepgram")
|
||||
|
||||
|
||||
# ── Fixtures ────────────────────────────────────────────────────────
|
||||
|
||||
@pytest.fixture
|
||||
def mock_config(tmp_path):
|
||||
"""Return a Config object backed by a temporary file.
|
||||
|
||||
This avoids touching the real user config at ~/.local-transcription/.
|
||||
"""
|
||||
config_file = tmp_path / "test_config.yaml"
|
||||
from client.config import Config
|
||||
|
||||
config = Config(config_path=str(config_file))
|
||||
return config
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def controller(mock_config):
|
||||
"""Return an AppController wired to *mock_config* without starting heavy
|
||||
subsystems (engine, web server, device manager).
|
||||
|
||||
The transcription engine, web server thread, and DeviceManager are all
|
||||
replaced with lightweight mocks so the test suite can run without a GPU,
|
||||
audio hardware, or a free network port.
|
||||
"""
|
||||
from unittest.mock import patch
|
||||
|
||||
with patch("backend.app_controller.DeviceManager") as MockDM, \
|
||||
patch("backend.app_controller.RealtimeTranscriptionEngine"), \
|
||||
patch("backend.app_controller.DeepgramTranscriptionEngine"), \
|
||||
patch("backend.app_controller.TranscriptionWebServer"), \
|
||||
patch("backend.app_controller.ServerSyncClient"):
|
||||
|
||||
# DeviceManager stub
|
||||
dm_instance = MagicMock()
|
||||
dm_instance.get_device_info.return_value = [("cpu", "CPU")]
|
||||
dm_instance.get_device_for_whisper.return_value = "cpu"
|
||||
MockDM.return_value = dm_instance
|
||||
|
||||
from backend.app_controller import AppController
|
||||
|
||||
ctrl = AppController(config=mock_config)
|
||||
yield ctrl
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def api_client(controller):
|
||||
"""Return an httpx.AsyncClient speaking ASGI to the APIServer's FastAPI app.
|
||||
|
||||
Usage in tests::
|
||||
|
||||
async def test_something(api_client):
|
||||
resp = await api_client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
"""
|
||||
from backend.api_server import APIServer
|
||||
import httpx
|
||||
|
||||
api = APIServer(controller)
|
||||
|
||||
transport = httpx.ASGITransport(app=api.app)
|
||||
client = httpx.AsyncClient(transport=transport, base_url="http://testserver")
|
||||
return client
|
||||
150
backend/tests/test_api_server.py
Normal file
150
backend/tests/test_api_server.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""Tests for backend.api_server.APIServer REST endpoints."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
# Ensure project root is on path
|
||||
project_root = Path(__file__).resolve().parent.parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
|
||||
# ── GET /api/status ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_status(api_client):
|
||||
resp = await api_client.get("/api/status")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "state" in data
|
||||
assert "is_transcribing" in data
|
||||
assert "version" in data
|
||||
assert "web_server" in data
|
||||
|
||||
|
||||
# ── GET /api/config ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_config(api_client):
|
||||
resp = await api_client.get("/api/config")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
# The config should be a dict (the raw config mapping)
|
||||
assert isinstance(data, dict)
|
||||
|
||||
|
||||
# ── PUT /api/config ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_put_config(api_client, controller):
|
||||
"""Updating config via PUT should persist and return success."""
|
||||
# Patch reload_engine to avoid heavy lifting
|
||||
controller.reload_engine = MagicMock(return_value=(True, "ok"))
|
||||
controller.current_model_size = "base.en"
|
||||
controller.current_device_config = "auto"
|
||||
controller.config.set("transcription.model", "base.en")
|
||||
controller.config.set("transcription.device", "auto")
|
||||
|
||||
resp = await api_client.put(
|
||||
"/api/config",
|
||||
json={"settings": {"display.font_size": 24}},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["status"] == "ok"
|
||||
|
||||
# Verify the value was actually saved
|
||||
assert controller.config.get("display.font_size") == 24
|
||||
|
||||
|
||||
# ── POST /api/start (engine not ready) ─────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_start_when_not_ready(api_client, controller):
|
||||
"""Starting transcription without an engine should return 400."""
|
||||
controller.transcription_engine = None
|
||||
resp = await api_client.post("/api/start")
|
||||
assert resp.status_code == 400
|
||||
|
||||
|
||||
# ── POST /api/clear ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clear(api_client, controller):
|
||||
from client.transcription_engine_realtime import TranscriptionResult
|
||||
from datetime import datetime
|
||||
|
||||
controller.transcriptions = [
|
||||
TranscriptionResult(text="One", is_final=True, timestamp=datetime.now(), user_name="U"),
|
||||
]
|
||||
|
||||
resp = await api_client.post("/api/clear")
|
||||
assert resp.status_code == 200
|
||||
body = resp.json()
|
||||
assert body["cleared"] == 1
|
||||
assert len(controller.transcriptions) == 0
|
||||
|
||||
|
||||
# ── GET /api/audio-devices ──────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_audio_devices(api_client, controller):
|
||||
"""Audio devices endpoint should return a list, even when mocked."""
|
||||
# Mock sounddevice so the test works without audio hardware
|
||||
with patch("backend.app_controller.AppController.get_audio_devices",
|
||||
return_value=[{"index": 0, "name": "Mock Mic"}]):
|
||||
resp = await api_client.get("/api/audio-devices")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "devices" in data
|
||||
assert len(data["devices"]) >= 1
|
||||
|
||||
|
||||
# ── GET /api/compute-devices ────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_compute_devices(api_client, controller):
|
||||
resp = await api_client.get("/api/compute-devices")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "devices" in data
|
||||
# At minimum we get the "Auto-detect" entry
|
||||
assert any(d["id"] == "auto" for d in data["devices"])
|
||||
|
||||
|
||||
# ── GET /api/check-update ──────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_update(api_client, controller):
|
||||
"""check-update should return a dict with an 'available' key."""
|
||||
with patch.object(controller, "check_for_updates",
|
||||
return_value={"available": False, "current_version": "1.0.0"}):
|
||||
resp = await api_client.get("/api/check-update")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "available" in data
|
||||
|
||||
|
||||
# ── GET /api/version ────────────────────────────────────────────────
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_version(api_client):
|
||||
resp = await api_client.get("/api/version")
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "version" in data
|
||||
# Should be a non-empty string
|
||||
assert isinstance(data["version"], str)
|
||||
assert len(data["version"]) > 0
|
||||
181
backend/tests/test_app_controller.py
Normal file
181
backend/tests/test_app_controller.py
Normal file
@@ -0,0 +1,181 @@
|
||||
"""Tests for backend.app_controller.AppController."""
|
||||
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure project root is on path
|
||||
project_root = Path(__file__).resolve().parent.parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
from backend.app_controller import AppState
|
||||
|
||||
|
||||
# ── basic state ─────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_initial_state(controller):
|
||||
"""A freshly constructed controller should be INITIALIZING and not transcribing."""
|
||||
assert controller.state == AppState.INITIALIZING
|
||||
assert controller.is_transcribing is False
|
||||
|
||||
|
||||
# ── start / stop ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def test_start_transcription_without_engine(controller):
|
||||
"""Starting transcription before the engine is ready should fail gracefully."""
|
||||
controller.transcription_engine = None
|
||||
success, message = controller.start_transcription()
|
||||
assert success is False
|
||||
assert "not ready" in message.lower()
|
||||
|
||||
|
||||
def test_start_stop_cycle(controller):
|
||||
"""Full start -> stop cycle with a mocked engine that reports ready."""
|
||||
engine = MagicMock()
|
||||
engine.is_ready.return_value = True
|
||||
engine.start_recording.return_value = True
|
||||
controller.transcription_engine = engine
|
||||
|
||||
# Start
|
||||
ok, msg = controller.start_transcription()
|
||||
assert ok is True
|
||||
assert controller.is_transcribing is True
|
||||
assert controller.state == AppState.TRANSCRIBING
|
||||
|
||||
# Stop
|
||||
ok, msg = controller.stop_transcription()
|
||||
assert ok is True
|
||||
assert controller.is_transcribing is False
|
||||
engine.stop_recording.assert_called_once()
|
||||
|
||||
|
||||
def test_double_start_rejected(controller):
|
||||
"""Calling start_transcription twice should reject the second call."""
|
||||
engine = MagicMock()
|
||||
engine.is_ready.return_value = True
|
||||
engine.start_recording.return_value = True
|
||||
controller.transcription_engine = engine
|
||||
|
||||
controller.start_transcription()
|
||||
success, message = controller.start_transcription()
|
||||
assert success is False
|
||||
assert "already" in message.lower()
|
||||
|
||||
|
||||
# ── transcription storage ───────────────────────────────────────────
|
||||
|
||||
|
||||
def test_clear_transcriptions(controller):
|
||||
"""clear_transcriptions should empty the list and return the count."""
|
||||
from client.transcription_engine_realtime import TranscriptionResult
|
||||
|
||||
controller.transcriptions = [
|
||||
TranscriptionResult(text="Hello", is_final=True, timestamp=datetime.now(), user_name="Alice"),
|
||||
TranscriptionResult(text="World", is_final=True, timestamp=datetime.now(), user_name="Bob"),
|
||||
]
|
||||
count = controller.clear_transcriptions()
|
||||
assert count == 2
|
||||
assert len(controller.transcriptions) == 0
|
||||
|
||||
|
||||
def test_get_transcriptions_text_with_timestamps(controller):
|
||||
"""get_transcriptions_text should include [HH:MM:SS] prefixes when requested."""
|
||||
from client.transcription_engine_realtime import TranscriptionResult
|
||||
|
||||
ts = datetime(2025, 1, 15, 10, 30, 45)
|
||||
controller.transcriptions = [
|
||||
TranscriptionResult(text="Test line", is_final=True, timestamp=ts, user_name="User"),
|
||||
]
|
||||
|
||||
text = controller.get_transcriptions_text(include_timestamps=True)
|
||||
assert "[10:30:45]" in text
|
||||
assert "User:" in text
|
||||
assert "Test line" in text
|
||||
|
||||
|
||||
# ── settings / engine reload ────────────────────────────────────────
|
||||
|
||||
|
||||
def test_apply_settings_triggers_reload_on_model_change(controller):
|
||||
"""Changing the transcription model should trigger an engine reload."""
|
||||
controller.current_model_size = "base.en"
|
||||
controller.current_device_config = "auto"
|
||||
|
||||
# Patch reload_engine so it doesn't actually try to spin up threads
|
||||
controller.reload_engine = MagicMock(return_value=(True, "reloaded"))
|
||||
|
||||
reloaded, msg = controller.apply_settings({
|
||||
"transcription.model": "small.en",
|
||||
})
|
||||
|
||||
assert reloaded is True
|
||||
controller.reload_engine.assert_called_once()
|
||||
|
||||
|
||||
def test_apply_settings_no_reload_when_same(controller):
|
||||
"""If model and device haven't changed, no reload should happen."""
|
||||
controller.current_model_size = "base.en"
|
||||
controller.current_device_config = "auto"
|
||||
|
||||
# Ensure config returns the same values
|
||||
controller.config.set("transcription.model", "base.en")
|
||||
controller.config.set("transcription.device", "auto")
|
||||
|
||||
controller.reload_engine = MagicMock(return_value=(True, "reloaded"))
|
||||
|
||||
reloaded, msg = controller.apply_settings({
|
||||
"display.font_size": 20,
|
||||
})
|
||||
|
||||
assert reloaded is False
|
||||
controller.reload_engine.assert_not_called()
|
||||
|
||||
|
||||
# ── transcription callbacks ─────────────────────────────────────────
|
||||
|
||||
|
||||
def test_on_final_transcription_callback_fires(controller):
|
||||
"""_on_final_transcription should append and invoke on_transcription callback."""
|
||||
from client.transcription_engine_realtime import TranscriptionResult
|
||||
|
||||
received = []
|
||||
controller.on_transcription = lambda data: received.append(data)
|
||||
|
||||
controller.is_transcribing = True
|
||||
controller._set_state(AppState.TRANSCRIBING)
|
||||
|
||||
result = TranscriptionResult(
|
||||
text="Hello world",
|
||||
is_final=True,
|
||||
timestamp=datetime.now(),
|
||||
user_name="Tester",
|
||||
)
|
||||
controller._on_final_transcription(result)
|
||||
|
||||
assert len(controller.transcriptions) == 1
|
||||
assert len(received) == 1
|
||||
assert received[0]["text"] == "Hello world"
|
||||
assert received[0]["user_name"] == "Tester"
|
||||
assert received[0]["is_preview"] is False
|
||||
|
||||
|
||||
def test_on_final_transcription_ignored_when_not_transcribing(controller):
|
||||
"""If the controller is not in transcribing state the callback should be a no-op."""
|
||||
from client.transcription_engine_realtime import TranscriptionResult
|
||||
|
||||
controller.is_transcribing = False
|
||||
|
||||
result = TranscriptionResult(
|
||||
text="Should be ignored",
|
||||
is_final=True,
|
||||
timestamp=datetime.now(),
|
||||
user_name="Ghost",
|
||||
)
|
||||
controller._on_final_transcription(result)
|
||||
|
||||
assert len(controller.transcriptions) == 0
|
||||
56
backend/tests/test_main_headless.py
Normal file
56
backend/tests/test_main_headless.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Tests for backend.main_headless ready-event JSON format."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure project root is on path
|
||||
project_root = Path(__file__).resolve().parent.parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
|
||||
def test_ready_event_reports_api_port_not_obs_port():
|
||||
"""The ready JSON printed by main_headless must set ``port`` to
|
||||
``obs_port + 1`` (the API port), not the OBS display port.
|
||||
|
||||
From main_headless.py::
|
||||
|
||||
obs_port = controller.actual_web_port or args.port
|
||||
api_port = obs_port + 1
|
||||
|
||||
print(json.dumps({
|
||||
"event": "ready",
|
||||
"port": api_port,
|
||||
"obs_port": obs_port,
|
||||
}), flush=True)
|
||||
|
||||
We verify this contract by reading the source and checking the
|
||||
structure directly (running main() would start a real server).
|
||||
"""
|
||||
import ast
|
||||
import textwrap
|
||||
|
||||
source_path = project_root / "backend" / "main_headless.py"
|
||||
source = source_path.read_text()
|
||||
|
||||
# Verify the key relationships exist in the source:
|
||||
# 1. api_port = obs_port + 1
|
||||
assert "api_port = obs_port + 1" in source, (
|
||||
"Expected `api_port = obs_port + 1` in main_headless.py"
|
||||
)
|
||||
|
||||
# 2. The ready event JSON uses api_port for "port", not obs_port
|
||||
assert '"port": api_port' in source or "'port': api_port" in source, (
|
||||
"The ready event should report api_port as 'port'"
|
||||
)
|
||||
|
||||
# 3. obs_port is also included separately
|
||||
assert '"obs_port": obs_port' in source or "'obs_port': obs_port" in source, (
|
||||
"The ready event should also include 'obs_port'"
|
||||
)
|
||||
|
||||
# 4. Verify the event name
|
||||
assert '"event": "ready"' in source or "'event': 'ready'" in source, (
|
||||
"The ready event should have event='ready'"
|
||||
)
|
||||
Reference in New Issue
Block a user