Files
voice-to-notes/src-tauri/src/sidecar/mod.rs

293 lines
10 KiB
Rust
Raw Normal View History

pub mod ipc;
pub mod messages;
use std::io::{BufRead, BufReader, Write};
use std::process::{Child, ChildStdin, Command, Stdio};
use std::sync::{Mutex, OnceLock};
use crate::sidecar::messages::IPCMessage;
/// Get the global sidecar manager singleton.
pub fn sidecar() -> &'static SidecarManager {
static INSTANCE: OnceLock<SidecarManager> = OnceLock::new();
INSTANCE.get_or_init(SidecarManager::new)
}
/// Manages the Python sidecar process lifecycle.
/// Uses separated stdin/stdout ownership to avoid BufReader conflicts.
pub struct SidecarManager {
process: Mutex<Option<Child>>,
stdin: Mutex<Option<ChildStdin>>,
reader: Mutex<Option<BufReader<std::process::ChildStdout>>>,
}
impl SidecarManager {
pub fn new() -> Self {
Self {
process: Mutex::new(None),
stdin: Mutex::new(None),
reader: Mutex::new(None),
}
}
/// Ensure the sidecar is running, starting it if needed.
pub fn ensure_running(&self) -> Result<(), String> {
if self.is_running() {
return Ok(());
}
let python_path = std::env::current_dir()
.map_err(|e| e.to_string())?
.join("../python")
.canonicalize()
.map_err(|e| format!("Cannot find python directory: {e}"))?;
self.start(&python_path.to_string_lossy())
}
/// Spawn the Python sidecar process.
pub fn start(&self, python_path: &str) -> Result<(), String> {
// Stop existing process if any
self.stop().ok();
let mut child = Command::new("python3")
.arg("-m")
.arg("voice_to_notes.main")
.current_dir(python_path)
.env("PYTHONPATH", python_path)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::inherit())
.spawn()
.map_err(|e| format!("Failed to start sidecar: {e}"))?;
// Take ownership of stdin and stdout separately
let stdin = child.stdin.take().ok_or("Failed to get sidecar stdin")?;
let stdout = child.stdout.take().ok_or("Failed to get sidecar stdout")?;
let buf_reader = BufReader::new(stdout);
{
let mut proc = self.process.lock().map_err(|e| e.to_string())?;
*proc = Some(child);
}
{
let mut s = self.stdin.lock().map_err(|e| e.to_string())?;
*s = Some(stdin);
}
{
let mut r = self.reader.lock().map_err(|e| e.to_string())?;
*r = Some(buf_reader);
}
// Wait for the "ready" message
self.wait_for_ready()?;
Ok(())
}
/// Wait for the sidecar to send its ready message.
fn wait_for_ready(&self) -> Result<(), String> {
let mut reader_guard = self.reader.lock().map_err(|e| e.to_string())?;
if let Some(ref mut reader) = *reader_guard {
let mut line = String::new();
loop {
line.clear();
let bytes = reader
.read_line(&mut line)
.map_err(|e| format!("Read error: {e}"))?;
if bytes == 0 {
return Err("Sidecar closed stdout before sending ready".to_string());
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(msg) = serde_json::from_str::<IPCMessage>(trimmed) {
if msg.msg_type == "ready" {
return Ok(());
}
}
// Non-JSON or non-ready line — skip and keep waiting
eprintln!(
"[sidecar-rs] Skipping pre-ready line: {}",
&trimmed[..trimmed.len().min(200)]
);
continue;
}
}
Err("Sidecar did not send ready message".to_string())
}
/// Send a message to the sidecar and read the response.
/// This is a blocking call. Progress messages are skipped.
pub fn send_and_receive(&self, msg: &IPCMessage) -> Result<IPCMessage, String> {
self.send_and_receive_with_progress(msg, |_| {})
}
/// Send a message and read the response, calling on_progress for each progress message.
pub fn send_and_receive_with_progress(
&self,
msg: &IPCMessage,
on_progress: impl Fn(&IPCMessage),
) -> Result<IPCMessage, String> {
// Write to stdin
{
let mut stdin_guard = self.stdin.lock().map_err(|e| e.to_string())?;
if let Some(ref mut stdin) = *stdin_guard {
let json = serde_json::to_string(msg).map_err(|e| e.to_string())?;
stdin
.write_all(json.as_bytes())
.map_err(|e| format!("Write error: {e}"))?;
stdin
.write_all(b"\n")
.map_err(|e| format!("Write error: {e}"))?;
stdin.flush().map_err(|e| format!("Flush error: {e}"))?;
} else {
return Err("Sidecar stdin not available".to_string());
}
}
// Read from stdout
{
let mut reader_guard = self.reader.lock().map_err(|e| e.to_string())?;
if let Some(ref mut reader) = *reader_guard {
let mut line = String::new();
loop {
line.clear();
let bytes_read = reader
.read_line(&mut line)
.map_err(|e| format!("Read error: {e}"))?;
if bytes_read == 0 {
return Err("Sidecar closed stdout".to_string());
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Skip non-JSON lines (library output that leaked to stdout)
let response: IPCMessage = match serde_json::from_str(trimmed) {
Ok(msg) => msg,
Err(_) => {
eprintln!(
"[sidecar-rs] Skipping non-JSON line: {}",
&trimmed[..trimmed.len().min(200)]
);
continue;
}
};
if response.msg_type == "progress" {
on_progress(&response);
continue;
}
return Ok(response);
}
} else {
Err("Sidecar stdout not available".to_string())
}
}
}
/// Send a message and receive the response, calling a callback for intermediate messages.
/// Intermediate messages include progress, pipeline.segment, and pipeline.speaker_update.
pub fn send_and_receive_with_progress<F>(
&self,
msg: &IPCMessage,
on_intermediate: F,
) -> Result<IPCMessage, String>
where
F: Fn(&IPCMessage),
{
// Write to stdin
{
let mut stdin_guard = self.stdin.lock().map_err(|e| e.to_string())?;
if let Some(ref mut stdin) = *stdin_guard {
let json = serde_json::to_string(msg).map_err(|e| e.to_string())?;
stdin
.write_all(json.as_bytes())
.map_err(|e| format!("Write error: {e}"))?;
stdin
.write_all(b"\n")
.map_err(|e| format!("Write error: {e}"))?;
stdin.flush().map_err(|e| format!("Flush error: {e}"))?;
} else {
return Err("Sidecar stdin not available".to_string());
}
}
// Read from stdout
{
let mut reader_guard = self.reader.lock().map_err(|e| e.to_string())?;
if let Some(ref mut reader) = *reader_guard {
let mut line = String::new();
loop {
line.clear();
let bytes_read = reader
.read_line(&mut line)
.map_err(|e| format!("Read error: {e}"))?;
if bytes_read == 0 {
return Err("Sidecar closed stdout".to_string());
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let response: IPCMessage =
serde_json::from_str(trimmed).map_err(|e| format!("Parse error: {e}"))?;
// Forward intermediate messages via callback, return the final result/error
let is_intermediate = matches!(
response.msg_type.as_str(),
"progress" | "pipeline.segment" | "pipeline.speaker_update"
);
if is_intermediate {
on_intermediate(&response);
} else {
return Ok(response);
}
}
} else {
Err("Sidecar stdout not available".to_string())
}
}
}
/// Stop the sidecar process.
pub fn stop(&self) -> Result<(), String> {
// Drop stdin to signal EOF
{
let mut stdin_guard = self.stdin.lock().map_err(|e| e.to_string())?;
*stdin_guard = None;
}
// Drop reader
{
let mut reader_guard = self.reader.lock().map_err(|e| e.to_string())?;
*reader_guard = None;
}
// Wait for process to exit
{
let mut proc = self.process.lock().map_err(|e| e.to_string())?;
if let Some(ref mut child) = proc.take() {
match child.wait() {
Ok(_) => {}
Err(_) => {
let _ = child.kill();
}
}
}
}
Ok(())
}
pub fn is_running(&self) -> bool {
let proc = self.process.lock().ok();
proc.map_or(false, |p| p.is_some())
}
}
impl Drop for SidecarManager {
fn drop(&mut self) {
let _ = self.stop();
}
}