diff --git a/Cargo.lock b/Cargo.lock index 1c7c26bd0..47549fbe9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1497,12 +1497,12 @@ dependencies = [ "egui_inspection", "egui_kittest", "image", + "interprocess", "rmcp", "rmp-serde", "schemars", "serde", "serde_json", - "tempfile", "tokio", "tracing", "tracing-subscriber", @@ -2545,8 +2545,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "069323743400cb7ab06a8fe5c1ed911d36b6919ec531661d034c89083629595b" dependencies = [ "doctest-file", + "futures-core", "libc", "recvmsg", + "tokio", "widestring", "windows-sys 0.61.2", ] diff --git a/crates/egui_kittest_mcp/Cargo.toml b/crates/egui_kittest_mcp/Cargo.toml index 6435da073..003eef1af 100644 --- a/crates/egui_kittest_mcp/Cargo.toml +++ b/crates/egui_kittest_mcp/Cargo.toml @@ -18,8 +18,9 @@ path = "src/main.rs" [dependencies] egui_kittest = { workspace = true, features = ["inspector_api", "wgpu", "snapshot"] } -egui_inspection = { workspace = true, features = ["protocol"] } +egui_inspection = { workspace = true, features = ["protocol", "transport"] } egui.workspace = true +interprocess = { version = "2.4", features = ["tokio"] } accesskit.workspace = true accesskit_consumer.workspace = true image = { workspace = true, features = ["png"] } @@ -28,7 +29,6 @@ serde = { workspace = true, features = ["derive"] } serde_json = "1.0" schemars = "1.0" rmcp = { version = "1.7", features = ["server", "macros", "transport-io", "schemars"] } -tempfile.workspace = true tokio = { version = "1.49", features = [ "rt-multi-thread", "io-std", diff --git a/crates/egui_kittest_mcp/src/bridge.rs b/crates/egui_kittest_mcp/src/bridge.rs index 8928c173d..95dd2c4c4 100644 --- a/crates/egui_kittest_mcp/src/bridge.rs +++ b/crates/egui_kittest_mcp/src/bridge.rs @@ -1,7 +1,7 @@ //! Bridge between the MCP server and a running kittest harness child process. //! //! Lifecycle: -//! 1. [`Bridge::launch`] binds a unix domain socket, spawns the target binary with +//! 1. [`Bridge::launch`] binds a local socket, spawns the target binary with //! [`crate::HANDSHAKE_ENV_VAR`] + `KITTEST_INSPECTOR=1` + //! `KITTEST_INSPECTOR_PATH=`, and waits for the shim to connect. //! 2. A reader task decodes [`HarnessMessage`]s from the socket and updates [`SharedState`]. @@ -16,9 +16,11 @@ use std::time::Duration; use anyhow::{Context as _, anyhow, bail}; use egui_inspection::protocol::{Frame, HarnessMessage, InspectorCommand, SourceView}; +use egui_inspection::transport::{SocketTarget, generate_socket_target, socket_name}; +use interprocess::local_socket::ListenerOptions; +use interprocess::local_socket::tokio::{Listener, RecvHalf, SendHalf, prelude::*}; use serde::Serialize; -use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; -use tokio::net::UnixListener; +use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; use tokio::process::{Child, Command}; use tokio::sync::{Mutex, Notify, mpsc}; use tokio::task::JoinHandle; @@ -39,8 +41,9 @@ pub struct Bridge { /// `Child` wrapped in a `Mutex` so a `kill` tool can take it. `None` in attach mode — /// we don't own the lifecycle of an externally-started app. child: Arc>>, - /// Temp dir holding the unix socket — kept alive while the bridge is. - _socket_dir: tempfile::TempDir, + /// Local-socket target — kept alive while the bridge is so its backing socket file + /// (on unix) survives. + _socket_target: SocketTarget, /// How this bridge was created (informational). pub peer_info: PeerInfo, } @@ -55,7 +58,7 @@ pub enum PeerInfo { pid: u32, }, /// Bridge bound a socket and accepted an incoming connection from a live app. - Attached { socket_path: PathBuf }, + Attached { socket: String }, } /// Mutable state observed by MCP tool handlers. @@ -92,7 +95,10 @@ pub struct FinishedInfo { pub struct StateSnapshot { /// Peer identity + capabilities, captured at connect time. Used by tool handlers to /// gate commands the peer doesn't honor (Step/Run/Pause against a live app, etc.). - #[expect(dead_code, reason = "consumed by upcoming capability-gating in tool handlers")] + #[cfg_attr( + not(test), + expect(dead_code, reason = "consumed by upcoming capability-gating in tool handlers") + )] pub hello: Option, pub frame: Option>, pub blocked: bool, @@ -144,20 +150,20 @@ impl Bridge { let self_path = std::env::current_exe() .context("get current_exe for KITTEST_INSPECTOR_PATH")?; - let socket_dir = tempfile::Builder::new() - .prefix("kittest-mcp-") - .tempdir() - .context("create temp dir for handshake socket")?; - let socket_path = socket_dir.path().join("kittest.sock"); - - let listener = UnixListener::bind(&socket_path) - .with_context(|| format!("bind {}", socket_path.display()))?; + let socket_target = + generate_socket_target().context("allocate handshake socket")?; + let name = socket_name(&socket_target.name) + .with_context(|| format!("parse socket name {}", socket_target.name))?; + let listener = ListenerOptions::new() + .name(name) + .create_tokio() + .with_context(|| format!("bind {}", socket_target.name))?; let mut cmd = Command::new(&bin); cmd.args(&args) .env("KITTEST_INSPECTOR", "1") .env("KITTEST_INSPECTOR_PATH", &self_path) - .env(crate::HANDSHAKE_ENV_VAR, &socket_path) + .env(crate::HANDSHAKE_ENV_VAR, &socket_target.name) .stdin(std::process::Stdio::null()) // Harness inspector path: the child's stdout/stderr aren't ours — they get // captured by the shim. We don't need them in the MCP server. @@ -178,8 +184,8 @@ impl Bridge { // Accept with a short timeout. If the binary fails to start, exits early, or // doesn't have the inspector wired up, we surface that instead of hanging forever. - let (stream, _addr) = match timeout(Duration::from_secs(10), listener.accept()).await { - Ok(Ok(pair)) => pair, + let stream = match timeout(Duration::from_secs(10), listener.accept()).await { + Ok(Ok(stream)) => stream, Ok(Err(e)) => { let _ = child.kill().await; bail!("accept on handshake socket: {e}"); @@ -195,7 +201,7 @@ impl Bridge { } }; - let (reader, writer) = stream.into_split(); + let (reader, writer) = stream.split(); let state = SharedState::new(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let child_arc: Arc>> = Arc::new(Mutex::new(Some(child))); @@ -209,25 +215,26 @@ impl Bridge { _reader_task: reader_task, _writer_task: writer_task, child: child_arc, - _socket_dir: socket_dir, + _socket_target: socket_target, peer_info: PeerInfo::Launched { bin, args, pid }, }) } - /// Bind a unix socket and return the path immediately. The caller is responsible for - /// starting the app with `EGUI_INSPECTION_SOCKET` set to this path. Call - /// [`Self::accept_pending`] once the app is running. + /// Bind a local socket and return it immediately. The caller is responsible for + /// starting the app with `EGUI_INSPECTION_SOCKET` set to the returned target's name. + /// Call [`Self::accept_pending`] once the app is running. /// - /// Returns the temp-dir handle (must be kept alive) and the listener. - pub async fn prepare_attach() -> anyhow::Result<(tempfile::TempDir, UnixListener, PathBuf)> { - let socket_dir = tempfile::Builder::new() - .prefix("egui-inspection-") - .tempdir() - .context("create temp dir for inspection socket")?; - let socket_path = socket_dir.path().join("inspection.sock"); - let listener = UnixListener::bind(&socket_path) - .with_context(|| format!("bind {}", socket_path.display()))?; - Ok((socket_dir, listener, socket_path)) + /// Returns the listener and the socket target (must be kept alive on unix). + pub async fn prepare_attach() -> anyhow::Result<(Listener, SocketTarget)> { + let socket_target = + generate_socket_target().context("allocate inspection socket")?; + let name = socket_name(&socket_target.name) + .with_context(|| format!("parse socket name {}", socket_target.name))?; + let listener = ListenerOptions::new() + .name(name) + .create_tokio() + .with_context(|| format!("bind {}", socket_target.name))?; + Ok((listener, socket_target)) } /// Finish an attach started with [`Self::prepare_attach`] — wait for an inbound @@ -237,19 +244,22 @@ impl Bridge { /// pre-set. Passing it here lets `kill` reach it and `kill_on_drop` clean up if the /// bridge is dropped. pub async fn accept_pending( - socket_dir: tempfile::TempDir, - listener: UnixListener, - socket_path: PathBuf, + listener: Listener, + socket_target: SocketTarget, child: Option, accept_timeout: Duration, ) -> anyhow::Result { - let (stream, _addr) = match timeout(accept_timeout, listener.accept()).await { - Ok(Ok(pair)) => pair, + let stream = match timeout(accept_timeout, listener.accept()).await { + Ok(Ok(stream)) => stream, Ok(Err(e)) => bail!("accept on inspection socket: {e}"), - Err(_) => bail!("timed out waiting for inbound connection at {}", socket_path.display()), + Err(_) => bail!( + "timed out waiting for inbound connection at {}", + socket_target.name + ), }; - let (reader, writer) = stream.into_split(); + let socket = socket_target.name.clone(); + let (reader, writer) = stream.split(); let state = SharedState::new(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let child_arc: Arc>> = Arc::new(Mutex::new(child)); @@ -262,8 +272,8 @@ impl Bridge { _reader_task: reader_task, _writer_task: writer_task, child: child_arc, - _socket_dir: socket_dir, - peer_info: PeerInfo::Attached { socket_path }, + _socket_target: socket_target, + peer_info: PeerInfo::Attached { socket }, }) } @@ -320,7 +330,7 @@ impl Drop for Bridge { } async fn read_loop( - mut reader: tokio::net::unix::OwnedReadHalf, + mut reader: RecvHalf, state: Arc, child: Arc>>, ) { @@ -399,7 +409,7 @@ impl accesskit_consumer::TreeChangeHandler for NoopChangeHandler { } async fn write_loop( - mut writer: tokio::net::unix::OwnedWriteHalf, + mut writer: SendHalf, mut rx: mpsc::UnboundedReceiver, ) { while let Some(cmd) = rx.recv().await { @@ -410,7 +420,7 @@ async fn write_loop( } } -async fn read_message(stream: &mut tokio::net::unix::OwnedReadHalf) -> anyhow::Result { +async fn read_message(stream: &mut R) -> anyhow::Result { let mut len_buf = [0u8; 4]; stream.read_exact(&mut len_buf).await?; let len = u32::from_be_bytes(len_buf) as usize; @@ -422,8 +432,8 @@ async fn read_message(stream: &mut tokio::net::unix::OwnedReadHalf) -> anyhow::R rmp_serde::from_slice(&buf).map_err(|e| anyhow!("decode: {e}")) } -async fn write_message( - stream: &mut tokio::net::unix::OwnedWriteHalf, +async fn write_message( + stream: &mut W, msg: &InspectorCommand, ) -> anyhow::Result<()> { let bytes = rmp_serde::to_vec(msg).map_err(|e| anyhow!("encode: {e}"))?; @@ -433,3 +443,58 @@ async fn write_message( stream.flush().await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use egui_inspection::protocol::{ + Capabilities, PROTOCOL_VERSION, PeerHello, PeerKind, write_message, + }; + use interprocess::local_socket::Stream; + use interprocess::local_socket::prelude::*; + + /// Full cross-platform transport round-trip: a tokio `interprocess` listener (the bridge + /// side) accepts a connection from a sync `interprocess` client (the plugin / shim side), + /// and a framed `Hello` is decoded into shared state. Runs against whatever local-socket + /// backend the host uses (unix domain socket on unix, named pipe on Windows). + #[tokio::test] + async fn handshake_roundtrip() { + let (listener, target) = Bridge::prepare_attach().await.unwrap(); + let name = target.name.clone(); + + // Connect + write from a blocking thread, mirroring how the plugin/shim dial in. + let client = std::thread::spawn(move || { + let n = socket_name(&name).unwrap(); + let mut stream = Stream::connect(n).unwrap(); + let hello = HarnessMessage::Hello(PeerHello { + protocol_version: PROTOCOL_VERSION, + peer_kind: PeerKind::Live, + capabilities: Capabilities::LIVE, + continuous_screenshots: false, + label: Some("test".to_owned()), + }); + write_message(&mut stream, &hello).unwrap(); + // Hold the connection open until the bridge has read the message. + std::thread::sleep(Duration::from_millis(500)); + }); + + let bridge = Bridge::accept_pending(listener, target, None, Duration::from_secs(5)) + .await + .unwrap(); + + // The reader task applies the Hello asynchronously; poll briefly for it. + let mut hello = None; + for _ in 0..50 { + if let Some(h) = bridge.state.snapshot().await.hello { + hello = Some(h); + break; + } + tokio::time::sleep(Duration::from_millis(20)).await; + } + client.join().unwrap(); + + let hello = hello.expect("bridge should receive Hello over the transport"); + assert_eq!(hello.peer_kind, PeerKind::Live); + assert_eq!(hello.label.as_deref(), Some("test")); + } +} diff --git a/crates/egui_kittest_mcp/src/shim.rs b/crates/egui_kittest_mcp/src/shim.rs index 93bf0c1c6..9da9710a8 100644 --- a/crates/egui_kittest_mcp/src/shim.rs +++ b/crates/egui_kittest_mcp/src/shim.rs @@ -1,7 +1,7 @@ //! Inspector shim role. //! -//! Connects to the MCP server's unix domain socket and relays bytes in both directions -//! between the harness's stdio and that socket. +//! Connects to the MCP server's local socket and relays bytes in both directions between +//! the harness's stdio and that socket. //! //! From the harness's perspective we're an ordinary `kittest_inspector` (msgpack framed //! messages on stdin/stdout). The MCP server sees the same framed bytes on the other end of @@ -9,14 +9,15 @@ //! independent of protocol revisions. use std::io::{Read as _, Write as _}; -use std::os::unix::net::UnixStream; use std::thread; -pub fn run(socket_path: &str) -> anyhow::Result<()> { - let stream = UnixStream::connect(socket_path) - .map_err(|e| anyhow::anyhow!("connect {socket_path}: {e}"))?; - let stream_to_stdout = stream.try_clone()?; - let mut stdin_to_socket = stream; +use egui_inspection::transport::socket_name; +use interprocess::local_socket::{Stream, prelude::*}; + +pub fn run(socket: &str) -> anyhow::Result<()> { + let name = socket_name(socket).map_err(|e| anyhow::anyhow!("socket name {socket}: {e}"))?; + let stream = Stream::connect(name).map_err(|e| anyhow::anyhow!("connect {socket}: {e}"))?; + let (mut reader, mut stdin_to_socket) = stream.split(); // Thread A: stdin (from harness) → socket (to MCP server). let t_in = thread::Builder::new() @@ -24,15 +25,15 @@ pub fn run(socket_path: &str) -> anyhow::Result<()> { .spawn(move || { let mut stdin = std::io::stdin().lock(); let _ = std::io::copy(&mut stdin, &mut stdin_to_socket); - // EOF on stdin or write error → shutdown write side so peer sees EOF. - let _ = stdin_to_socket.shutdown(std::net::Shutdown::Write); + // EOF on stdin or write error → drop the send half so the peer sees EOF on the + // write direction. + drop(stdin_to_socket); })?; // Thread B: socket (from MCP server) → stdout (to harness). // Runs on main thread so the process exits when stdout closes. let mut stdout = std::io::stdout().lock(); let mut buf = vec![0u8; 64 * 1024]; - let mut reader = stream_to_stdout; loop { match reader.read(&mut buf) { Ok(0) | Err(_) => break, diff --git a/crates/egui_kittest_mcp/src/tools.rs b/crates/egui_kittest_mcp/src/tools.rs index 4ceb7090f..76faea139 100644 --- a/crates/egui_kittest_mcp/src/tools.rs +++ b/crates/egui_kittest_mcp/src/tools.rs @@ -441,7 +441,7 @@ impl Server { "an app is already running — call `kill` first before attaching", )); } - let (socket_dir, listener, socket_path) = match Bridge::prepare_attach().await { + let (listener, socket_target) = match Bridge::prepare_attach().await { Ok(t) => t, Err(e) => return Ok(text_error(format!("attach prepare failed: {e:#}"))), }; @@ -450,7 +450,7 @@ impl Server { if let Some(bin) = args.bin.clone() { let mut cmd = tokio::process::Command::new(&bin); cmd.args(&args.args) - .env(egui_inspection::INSPECTION_SOCKET_ENV_VAR, &socket_path) + .env(egui_inspection::INSPECTION_SOCKET_ENV_VAR, &socket_target.name) .stdin(std::process::Stdio::null()) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) @@ -468,14 +468,7 @@ impl Server { } let timeout = Duration::from_secs(args.timeout_secs); - let bridge = match Bridge::accept_pending( - socket_dir, - listener, - socket_path.clone(), - spawned, - timeout, - ) - .await + let bridge = match Bridge::accept_pending(listener, socket_target, spawned, timeout).await { Ok(b) => b, Err(e) => return Ok(text_error(format!("attach failed: {e:#}"))),