1
0
mirror of https://github.com/emilk/egui.git synced 2026-06-26 22:53:14 -04:00

egui_kittest_mcp: cross-platform local socket via interprocess

Port the bridge listener (was tokio::net::UnixListener) and the shim client
(was std::os::unix::net::UnixStream) to the interprocess crate, so the MCP
inspection transport builds and runs on Windows (named pipe) as well as
unix/macOS (unix domain socket).

- bridge: bind via interprocess local_socket::tokio Listener + ListenerOptions;
  accept() yields a single Stream, split into async halves. read/write_message
  are now generic over AsyncRead/AsyncWrite.
- shim: connect via interprocess sync Stream and split for the byte relay;
  closing the write direction is done by dropping the send half.
- Allocate/parse socket names through egui_inspection::transport, so both ends
  agree on the platform-specific mapping. Drop the now-unused tempfile dep.
- Add a transport round-trip test (tokio listener <-> sync client).
This commit is contained in:
lucasmerlin
2026-05-26 14:26:10 +02:00
parent e15ba138d6
commit 2fced8012c
5 changed files with 132 additions and 71 deletions

View File

@@ -1497,12 +1497,12 @@ dependencies = [
"egui_inspection",
"egui_kittest",
"image",
"interprocess",
"rmcp",
"rmp-serde",
"schemars",
"serde",
"serde_json",
"tempfile",
"tokio",
"tracing",
"tracing-subscriber",
@@ -2545,8 +2545,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069323743400cb7ab06a8fe5c1ed911d36b6919ec531661d034c89083629595b"
dependencies = [
"doctest-file",
"futures-core",
"libc",
"recvmsg",
"tokio",
"widestring",
"windows-sys 0.61.2",
]

View File

@@ -18,8 +18,9 @@ path = "src/main.rs"
[dependencies]
egui_kittest = { workspace = true, features = ["inspector_api", "wgpu", "snapshot"] }
egui_inspection = { workspace = true, features = ["protocol"] }
egui_inspection = { workspace = true, features = ["protocol", "transport"] }
egui.workspace = true
interprocess = { version = "2.4", features = ["tokio"] }
accesskit.workspace = true
accesskit_consumer.workspace = true
image = { workspace = true, features = ["png"] }
@@ -28,7 +29,6 @@ serde = { workspace = true, features = ["derive"] }
serde_json = "1.0"
schemars = "1.0"
rmcp = { version = "1.7", features = ["server", "macros", "transport-io", "schemars"] }
tempfile.workspace = true
tokio = { version = "1.49", features = [
"rt-multi-thread",
"io-std",

View File

@@ -1,7 +1,7 @@
//! Bridge between the MCP server and a running kittest harness child process.
//!
//! Lifecycle:
//! 1. [`Bridge::launch`] binds a unix domain socket, spawns the target binary with
//! 1. [`Bridge::launch`] binds a local socket, spawns the target binary with
//! [`crate::HANDSHAKE_ENV_VAR`] + `KITTEST_INSPECTOR=1` +
//! `KITTEST_INSPECTOR_PATH=<self>`, and waits for the shim to connect.
//! 2. A reader task decodes [`HarnessMessage`]s from the socket and updates [`SharedState`].
@@ -16,9 +16,11 @@ use std::time::Duration;
use anyhow::{Context as _, anyhow, bail};
use egui_inspection::protocol::{Frame, HarnessMessage, InspectorCommand, SourceView};
use egui_inspection::transport::{SocketTarget, generate_socket_target, socket_name};
use interprocess::local_socket::ListenerOptions;
use interprocess::local_socket::tokio::{Listener, RecvHalf, SendHalf, prelude::*};
use serde::Serialize;
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio::net::UnixListener;
use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _};
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, Notify, mpsc};
use tokio::task::JoinHandle;
@@ -39,8 +41,9 @@ pub struct Bridge {
/// `Child` wrapped in a `Mutex` so a `kill` tool can take it. `None` in attach mode —
/// we don't own the lifecycle of an externally-started app.
child: Arc<Mutex<Option<Child>>>,
/// Temp dir holding the unix socket — kept alive while the bridge is.
_socket_dir: tempfile::TempDir,
/// Local-socket target — kept alive while the bridge is so its backing socket file
/// (on unix) survives.
_socket_target: SocketTarget,
/// How this bridge was created (informational).
pub peer_info: PeerInfo,
}
@@ -55,7 +58,7 @@ pub enum PeerInfo {
pid: u32,
},
/// Bridge bound a socket and accepted an incoming connection from a live app.
Attached { socket_path: PathBuf },
Attached { socket: String },
}
/// Mutable state observed by MCP tool handlers.
@@ -92,7 +95,10 @@ pub struct FinishedInfo {
pub struct StateSnapshot {
/// Peer identity + capabilities, captured at connect time. Used by tool handlers to
/// gate commands the peer doesn't honor (Step/Run/Pause against a live app, etc.).
#[expect(dead_code, reason = "consumed by upcoming capability-gating in tool handlers")]
#[cfg_attr(
not(test),
expect(dead_code, reason = "consumed by upcoming capability-gating in tool handlers")
)]
pub hello: Option<egui_inspection::protocol::PeerHello>,
pub frame: Option<Box<Frame>>,
pub blocked: bool,
@@ -144,20 +150,20 @@ impl Bridge {
let self_path = std::env::current_exe()
.context("get current_exe for KITTEST_INSPECTOR_PATH")?;
let socket_dir = tempfile::Builder::new()
.prefix("kittest-mcp-")
.tempdir()
.context("create temp dir for handshake socket")?;
let socket_path = socket_dir.path().join("kittest.sock");
let listener = UnixListener::bind(&socket_path)
.with_context(|| format!("bind {}", socket_path.display()))?;
let socket_target =
generate_socket_target().context("allocate handshake socket")?;
let name = socket_name(&socket_target.name)
.with_context(|| format!("parse socket name {}", socket_target.name))?;
let listener = ListenerOptions::new()
.name(name)
.create_tokio()
.with_context(|| format!("bind {}", socket_target.name))?;
let mut cmd = Command::new(&bin);
cmd.args(&args)
.env("KITTEST_INSPECTOR", "1")
.env("KITTEST_INSPECTOR_PATH", &self_path)
.env(crate::HANDSHAKE_ENV_VAR, &socket_path)
.env(crate::HANDSHAKE_ENV_VAR, &socket_target.name)
.stdin(std::process::Stdio::null())
// Harness inspector path: the child's stdout/stderr aren't ours — they get
// captured by the shim. We don't need them in the MCP server.
@@ -178,8 +184,8 @@ impl Bridge {
// Accept with a short timeout. If the binary fails to start, exits early, or
// doesn't have the inspector wired up, we surface that instead of hanging forever.
let (stream, _addr) = match timeout(Duration::from_secs(10), listener.accept()).await {
Ok(Ok(pair)) => pair,
let stream = match timeout(Duration::from_secs(10), listener.accept()).await {
Ok(Ok(stream)) => stream,
Ok(Err(e)) => {
let _ = child.kill().await;
bail!("accept on handshake socket: {e}");
@@ -195,7 +201,7 @@ impl Bridge {
}
};
let (reader, writer) = stream.into_split();
let (reader, writer) = stream.split();
let state = SharedState::new();
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let child_arc: Arc<Mutex<Option<Child>>> = Arc::new(Mutex::new(Some(child)));
@@ -209,25 +215,26 @@ impl Bridge {
_reader_task: reader_task,
_writer_task: writer_task,
child: child_arc,
_socket_dir: socket_dir,
_socket_target: socket_target,
peer_info: PeerInfo::Launched { bin, args, pid },
})
}
/// Bind a unix socket and return the path immediately. The caller is responsible for
/// starting the app with `EGUI_INSPECTION_SOCKET` set to this path. Call
/// [`Self::accept_pending`] once the app is running.
/// Bind a local socket and return it immediately. The caller is responsible for
/// starting the app with `EGUI_INSPECTION_SOCKET` set to the returned target's name.
/// Call [`Self::accept_pending`] once the app is running.
///
/// Returns the temp-dir handle (must be kept alive) and the listener.
pub async fn prepare_attach() -> anyhow::Result<(tempfile::TempDir, UnixListener, PathBuf)> {
let socket_dir = tempfile::Builder::new()
.prefix("egui-inspection-")
.tempdir()
.context("create temp dir for inspection socket")?;
let socket_path = socket_dir.path().join("inspection.sock");
let listener = UnixListener::bind(&socket_path)
.with_context(|| format!("bind {}", socket_path.display()))?;
Ok((socket_dir, listener, socket_path))
/// Returns the listener and the socket target (must be kept alive on unix).
pub async fn prepare_attach() -> anyhow::Result<(Listener, SocketTarget)> {
let socket_target =
generate_socket_target().context("allocate inspection socket")?;
let name = socket_name(&socket_target.name)
.with_context(|| format!("parse socket name {}", socket_target.name))?;
let listener = ListenerOptions::new()
.name(name)
.create_tokio()
.with_context(|| format!("bind {}", socket_target.name))?;
Ok((listener, socket_target))
}
/// Finish an attach started with [`Self::prepare_attach`] — wait for an inbound
@@ -237,19 +244,22 @@ impl Bridge {
/// pre-set. Passing it here lets `kill` reach it and `kill_on_drop` clean up if the
/// bridge is dropped.
pub async fn accept_pending(
socket_dir: tempfile::TempDir,
listener: UnixListener,
socket_path: PathBuf,
listener: Listener,
socket_target: SocketTarget,
child: Option<Child>,
accept_timeout: Duration,
) -> anyhow::Result<Self> {
let (stream, _addr) = match timeout(accept_timeout, listener.accept()).await {
Ok(Ok(pair)) => pair,
let stream = match timeout(accept_timeout, listener.accept()).await {
Ok(Ok(stream)) => stream,
Ok(Err(e)) => bail!("accept on inspection socket: {e}"),
Err(_) => bail!("timed out waiting for inbound connection at {}", socket_path.display()),
Err(_) => bail!(
"timed out waiting for inbound connection at {}",
socket_target.name
),
};
let (reader, writer) = stream.into_split();
let socket = socket_target.name.clone();
let (reader, writer) = stream.split();
let state = SharedState::new();
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let child_arc: Arc<Mutex<Option<Child>>> = Arc::new(Mutex::new(child));
@@ -262,8 +272,8 @@ impl Bridge {
_reader_task: reader_task,
_writer_task: writer_task,
child: child_arc,
_socket_dir: socket_dir,
peer_info: PeerInfo::Attached { socket_path },
_socket_target: socket_target,
peer_info: PeerInfo::Attached { socket },
})
}
@@ -320,7 +330,7 @@ impl Drop for Bridge {
}
async fn read_loop(
mut reader: tokio::net::unix::OwnedReadHalf,
mut reader: RecvHalf,
state: Arc<SharedState>,
child: Arc<Mutex<Option<Child>>>,
) {
@@ -399,7 +409,7 @@ impl accesskit_consumer::TreeChangeHandler for NoopChangeHandler {
}
async fn write_loop(
mut writer: tokio::net::unix::OwnedWriteHalf,
mut writer: SendHalf,
mut rx: mpsc::UnboundedReceiver<InspectorCommand>,
) {
while let Some(cmd) = rx.recv().await {
@@ -410,7 +420,7 @@ async fn write_loop(
}
}
async fn read_message(stream: &mut tokio::net::unix::OwnedReadHalf) -> anyhow::Result<HarnessMessage> {
async fn read_message<R: AsyncRead + Unpin>(stream: &mut R) -> anyhow::Result<HarnessMessage> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf) as usize;
@@ -422,8 +432,8 @@ async fn read_message(stream: &mut tokio::net::unix::OwnedReadHalf) -> anyhow::R
rmp_serde::from_slice(&buf).map_err(|e| anyhow!("decode: {e}"))
}
async fn write_message(
stream: &mut tokio::net::unix::OwnedWriteHalf,
async fn write_message<W: AsyncWrite + Unpin>(
stream: &mut W,
msg: &InspectorCommand,
) -> anyhow::Result<()> {
let bytes = rmp_serde::to_vec(msg).map_err(|e| anyhow!("encode: {e}"))?;
@@ -433,3 +443,58 @@ async fn write_message(
stream.flush().await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use egui_inspection::protocol::{
Capabilities, PROTOCOL_VERSION, PeerHello, PeerKind, write_message,
};
use interprocess::local_socket::Stream;
use interprocess::local_socket::prelude::*;
/// Full cross-platform transport round-trip: a tokio `interprocess` listener (the bridge
/// side) accepts a connection from a sync `interprocess` client (the plugin / shim side),
/// and a framed `Hello` is decoded into shared state. Runs against whatever local-socket
/// backend the host uses (unix domain socket on unix, named pipe on Windows).
#[tokio::test]
async fn handshake_roundtrip() {
let (listener, target) = Bridge::prepare_attach().await.unwrap();
let name = target.name.clone();
// Connect + write from a blocking thread, mirroring how the plugin/shim dial in.
let client = std::thread::spawn(move || {
let n = socket_name(&name).unwrap();
let mut stream = Stream::connect(n).unwrap();
let hello = HarnessMessage::Hello(PeerHello {
protocol_version: PROTOCOL_VERSION,
peer_kind: PeerKind::Live,
capabilities: Capabilities::LIVE,
continuous_screenshots: false,
label: Some("test".to_owned()),
});
write_message(&mut stream, &hello).unwrap();
// Hold the connection open until the bridge has read the message.
std::thread::sleep(Duration::from_millis(500));
});
let bridge = Bridge::accept_pending(listener, target, None, Duration::from_secs(5))
.await
.unwrap();
// The reader task applies the Hello asynchronously; poll briefly for it.
let mut hello = None;
for _ in 0..50 {
if let Some(h) = bridge.state.snapshot().await.hello {
hello = Some(h);
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
client.join().unwrap();
let hello = hello.expect("bridge should receive Hello over the transport");
assert_eq!(hello.peer_kind, PeerKind::Live);
assert_eq!(hello.label.as_deref(), Some("test"));
}
}

View File

@@ -1,7 +1,7 @@
//! Inspector shim role.
//!
//! Connects to the MCP server's unix domain socket and relays bytes in both directions
//! between the harness's stdio and that socket.
//! Connects to the MCP server's local socket and relays bytes in both directions between
//! the harness's stdio and that socket.
//!
//! From the harness's perspective we're an ordinary `kittest_inspector` (msgpack framed
//! messages on stdin/stdout). The MCP server sees the same framed bytes on the other end of
@@ -9,14 +9,15 @@
//! independent of protocol revisions.
use std::io::{Read as _, Write as _};
use std::os::unix::net::UnixStream;
use std::thread;
pub fn run(socket_path: &str) -> anyhow::Result<()> {
let stream = UnixStream::connect(socket_path)
.map_err(|e| anyhow::anyhow!("connect {socket_path}: {e}"))?;
let stream_to_stdout = stream.try_clone()?;
let mut stdin_to_socket = stream;
use egui_inspection::transport::socket_name;
use interprocess::local_socket::{Stream, prelude::*};
pub fn run(socket: &str) -> anyhow::Result<()> {
let name = socket_name(socket).map_err(|e| anyhow::anyhow!("socket name {socket}: {e}"))?;
let stream = Stream::connect(name).map_err(|e| anyhow::anyhow!("connect {socket}: {e}"))?;
let (mut reader, mut stdin_to_socket) = stream.split();
// Thread A: stdin (from harness) → socket (to MCP server).
let t_in = thread::Builder::new()
@@ -24,15 +25,15 @@ pub fn run(socket_path: &str) -> anyhow::Result<()> {
.spawn(move || {
let mut stdin = std::io::stdin().lock();
let _ = std::io::copy(&mut stdin, &mut stdin_to_socket);
// EOF on stdin or write error → shutdown write side so peer sees EOF.
let _ = stdin_to_socket.shutdown(std::net::Shutdown::Write);
// EOF on stdin or write error → drop the send half so the peer sees EOF on the
// write direction.
drop(stdin_to_socket);
})?;
// Thread B: socket (from MCP server) → stdout (to harness).
// Runs on main thread so the process exits when stdout closes.
let mut stdout = std::io::stdout().lock();
let mut buf = vec![0u8; 64 * 1024];
let mut reader = stream_to_stdout;
loop {
match reader.read(&mut buf) {
Ok(0) | Err(_) => break,

View File

@@ -441,7 +441,7 @@ impl Server {
"an app is already running — call `kill` first before attaching",
));
}
let (socket_dir, listener, socket_path) = match Bridge::prepare_attach().await {
let (listener, socket_target) = match Bridge::prepare_attach().await {
Ok(t) => t,
Err(e) => return Ok(text_error(format!("attach prepare failed: {e:#}"))),
};
@@ -450,7 +450,7 @@ impl Server {
if let Some(bin) = args.bin.clone() {
let mut cmd = tokio::process::Command::new(&bin);
cmd.args(&args.args)
.env(egui_inspection::INSPECTION_SOCKET_ENV_VAR, &socket_path)
.env(egui_inspection::INSPECTION_SOCKET_ENV_VAR, &socket_target.name)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
@@ -468,14 +468,7 @@ impl Server {
}
let timeout = Duration::from_secs(args.timeout_secs);
let bridge = match Bridge::accept_pending(
socket_dir,
listener,
socket_path.clone(),
spawned,
timeout,
)
.await
let bridge = match Bridge::accept_pending(listener, socket_target, spawned, timeout).await
{
Ok(b) => b,
Err(e) => return Ok(text_error(format!("attach failed: {e:#}"))),