48 lines
1.3 KiB
Python
48 lines
1.3 KiB
Python
|
|
"""JSON-line protocol reader/writer over stdin/stdout."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
import sys
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from voice_to_notes.ipc.messages import IPCMessage
|
||
|
|
|
||
|
|
|
||
|
|
def read_message() -> IPCMessage | None:
|
||
|
|
"""Read a single JSON-line message from stdin. Returns None on EOF."""
|
||
|
|
try:
|
||
|
|
line = sys.stdin.readline()
|
||
|
|
if not line:
|
||
|
|
return None # EOF
|
||
|
|
line = line.strip()
|
||
|
|
if not line:
|
||
|
|
return None
|
||
|
|
data = json.loads(line)
|
||
|
|
return IPCMessage.from_dict(data)
|
||
|
|
except json.JSONDecodeError as e:
|
||
|
|
_log(f"Invalid JSON: {e}")
|
||
|
|
return None
|
||
|
|
except Exception as e:
|
||
|
|
_log(f"Read error: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def write_message(msg: IPCMessage) -> None:
|
||
|
|
"""Write a JSON-line message to stdout."""
|
||
|
|
line = json.dumps(msg.to_dict(), separators=(",", ":"))
|
||
|
|
sys.stdout.write(line + "\n")
|
||
|
|
sys.stdout.flush()
|
||
|
|
|
||
|
|
|
||
|
|
def write_dict(data: dict[str, Any]) -> None:
|
||
|
|
"""Write a raw dict as a JSON-line message to stdout."""
|
||
|
|
line = json.dumps(data, separators=(",", ":"))
|
||
|
|
sys.stdout.write(line + "\n")
|
||
|
|
sys.stdout.flush()
|
||
|
|
|
||
|
|
|
||
|
|
def _log(message: str) -> None:
|
||
|
|
"""Log to stderr (stdout is reserved for IPC)."""
|
||
|
|
print(f"[sidecar] {message}", file=sys.stderr, flush=True)
|