From 07013f80093119e1a069fa78213e656a126b2443 Mon Sep 17 00:00:00 2001 From: jedarden Date: Thu, 25 Jun 2026 08:17:59 -0400 Subject: [PATCH] feat(bf-2w7): add self-pipe signaling to watchdog timeout mechanism This improvement ensures that when a watchdog timeout occurs, the event loop wakes up immediately (via self-pipe write) rather than waiting for the poll timeout. This allows for faster and more responsive cleanup on timeout, ensuring temp dirs and FIFOs are removed promptly. Co-Authored-By: Claude --- src/session.rs | 9 +++++++-- src/watchdog.rs | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/session.rs b/src/session.rs index 45123be..708a487 100644 --- a/src/session.rs +++ b/src/session.rs @@ -10,7 +10,8 @@ use crate::watchdog::{Watchdog, WatchdogConfig, TimeoutType}; use nix::sys::signal::{self, SigHandler}; use nix::sys::wait::waitpid; use std::ffi::{CString, OsString}; -use std::os::unix::io::AsRawFd; +use std::os::fd::AsRawFd; +use std::os::unix::io::AsRawFd as UnixAsRawFd; use std::path::{Path, PathBuf}; use std::process::Command; use std::thread; @@ -166,7 +167,11 @@ impl Session { // Get temp directory path for stream-json monitoring // The watchdog will monitor /transcript.jsonl for stream-json output let temp_dir_path = installer.dir_path().to_path_buf(); - let watchdog = Watchdog::new(watchdog_config, spawner.child_pid, Some(temp_dir_path)); + + // Get the raw fd for the self-pipe write end for the watchdog to signal timeout + let watchdog_self_pipe_fd = Some(self_pipe_write.as_raw_fd()); + + let watchdog = Watchdog::new(watchdog_config, spawner.child_pid, Some(temp_dir_path), watchdog_self_pipe_fd); let watchdog_state = watchdog.state(); diff --git a/src/watchdog.rs b/src/watchdog.rs index fa027d9..fa80812 100644 --- a/src/watchdog.rs +++ b/src/watchdog.rs @@ -9,7 +9,9 @@ //! The watchdog ensures that hung child processes are terminated with //! proper cleanup (SIGTERM → SIGKILL) and clear diagnostics. +use std::os::fd::{AsRawFd, BorrowedFd}; use std::path::PathBuf; +use std::os::unix::io::OwnedFd; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::thread; @@ -205,6 +207,8 @@ pub struct Watchdog { child_pid: nix::unistd::Pid, /// Temp directory path where transcript will be written. temp_dir_path: Option, + /// Self-pipe write end raw fd for signaling the event loop on timeout. + self_pipe_write_fd: Option, } impl Watchdog { @@ -213,12 +217,14 @@ impl Watchdog { config: WatchdogConfig, child_pid: nix::unistd::Pid, temp_dir_path: Option, + self_pipe_write_fd: Option, ) -> Self { Self { config, state: WatchdogState::new(), child_pid, temp_dir_path, + self_pipe_write_fd, } } @@ -247,6 +253,8 @@ impl Watchdog { let prompt_injected_at = Arc::clone(&self.state.prompt_injected_at); let session_start = Arc::clone(&self.state.session_start); let temp_dir_path = self.temp_dir_path.clone(); + // Copy the raw fd for signaling the event loop + let self_pipe_write_fd = self.self_pipe_write_fd; thread::spawn(move || { let session_start_time = Instant::now(); @@ -282,6 +290,13 @@ impl Watchdog { let _ = nix::sys::signal::kill(child_pid, nix::sys::signal::Signal::SIGTERM); timeout_fired.store(true, Ordering::SeqCst); timeout_type.store(1, Ordering::SeqCst); // PtyFirstOutput + // Signal the event loop via self-pipe + if let Some(fd) = self_pipe_write_fd { + let byte: [u8; 1] = [1]; + unsafe { + let _ = libc::write(fd as i32, byte.as_ptr() as *const libc::c_void, 1); + } + } return; } } @@ -292,6 +307,13 @@ impl Watchdog { let _ = nix::sys::signal::kill(child_pid, nix::sys::signal::Signal::SIGTERM); timeout_fired.store(true, Ordering::SeqCst); timeout_type.store(2, Ordering::SeqCst); // StreamJsonFirstOutput + // Signal the event loop via self-pipe + if let Some(fd) = self_pipe_write_fd { + let byte: [u8; 1] = [1]; + unsafe { + let _ = libc::write(fd as i32, byte.as_ptr() as *const libc::c_void, 1); + } + } return; } } @@ -302,6 +324,13 @@ impl Watchdog { let _ = nix::sys::signal::kill(child_pid, nix::sys::signal::Signal::SIGTERM); timeout_fired.store(true, Ordering::SeqCst); timeout_type.store(3, Ordering::SeqCst); // OverallTimeout + // Signal the event loop via self-pipe + if let Some(fd) = self_pipe_write_fd { + let byte: [u8; 1] = [1]; + unsafe { + let _ = libc::write(fd as i32, byte.as_ptr() as *const libc::c_void, 1); + } + } return; } } @@ -314,6 +343,13 @@ impl Watchdog { let _ = nix::sys::signal::kill(child_pid, nix::sys::signal::Signal::SIGTERM); timeout_fired.store(true, Ordering::SeqCst); timeout_type.store(4, Ordering::SeqCst); // StopHookTimeout + // Signal the event loop via self-pipe + if let Some(fd) = self_pipe_write_fd { + let byte: [u8; 1] = [1]; + unsafe { + let _ = libc::write(fd as i32, byte.as_ptr() as *const libc::c_void, 1); + } + } return; } }