"""JSON-line protocol reader/writer over stdin/stdout. IMPORTANT: stdout is reserved exclusively for IPC messages. At init time we save the real stdout, then redirect sys.stdout → stderr so that any rogue print() calls from libraries don't corrupt the IPC stream. """ from __future__ import annotations import io import json import os import sys from typing import Any from voice_to_notes.ipc.messages import IPCMessage # Save the real stdout fd for IPC before any library can pollute it. # Then redirect sys.stdout to stderr so library prints go to stderr. _ipc_out: io.TextIOWrapper | None = None def init_ipc() -> None: """Capture real stdout for IPC and redirect sys.stdout to stderr. Must be called once at sidecar startup, before importing any ML libraries. """ global _ipc_out if _ipc_out is not None: return # already initialised # Duplicate the real stdout fd so we keep it even after redirect real_stdout_fd = os.dup(sys.stdout.fileno()) _ipc_out = io.TextIOWrapper( io.BufferedWriter(io.FileIO(real_stdout_fd, "w")), encoding="utf-8", line_buffering=True, ) # Redirect sys.stdout → stderr so print() from libraries goes to stderr sys.stdout = sys.stderr def _get_ipc_out() -> io.TextIOWrapper: """Return the IPC output stream, falling back to sys.__stdout__.""" if _ipc_out is not None: return _ipc_out # Fallback if init_ipc() was never called (e.g. in tests) return sys.__stdout__ def read_message() -> IPCMessage | None: """Read a single JSON-line message from stdin. Returns None on EOF.""" try: line = sys.stdin.readline() if not line: return None # EOF line = line.strip() if not line: return None data = json.loads(line) return IPCMessage.from_dict(data) except json.JSONDecodeError as e: _log(f"Invalid JSON: {e}") return None except Exception as e: _log(f"Read error: {e}") return None def write_message(msg: IPCMessage) -> None: """Write a JSON-line message to the IPC channel (real stdout).""" out = _get_ipc_out() line = json.dumps(msg.to_dict(), separators=(",", ":")) out.write(line + "\n") out.flush() def write_dict(data: dict[str, Any]) -> None: """Write a raw dict as a JSON-line message to the IPC channel.""" out = _get_ipc_out() line = json.dumps(data, separators=(",", ":")) out.write(line + "\n") out.flush() def _log(message: str) -> None: """Log to stderr (stdout is reserved for IPC).""" print(f"[sidecar] {message}", file=sys.stderr, flush=True)