generated from nhcarrigan/template
fix: add mid-session watchdog to kill stuck Claude Code processes
Adds pending_since and watchdog_generation fields to WslBridge. When send_message is called, pending_since is set to the current instant. When a Result message arrives, it is cleared. A watchdog thread spawned per session polls every 30 seconds and kills the process if pending_since has been set for 5 minutes with no Result, triggering the existing disconnect flow so the user gets an error message and can reconnect. A generation counter ensures old watchdogs exit cleanly when start() is called again for a new session.
This commit is contained in:
+167
-5
@@ -1,9 +1,9 @@
|
||||
use std::io::{BufRead, BufReader, Write};
|
||||
use std::process::{Child, ChildStdin, Command, Stdio};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use parking_lot::Mutex;
|
||||
use tauri::{AppHandle, Emitter};
|
||||
use tempfile::NamedTempFile;
|
||||
@@ -114,6 +114,11 @@ pub struct WslBridge {
|
||||
/// Set to true by stop()/interrupt() before killing the process so handle_stdout knows
|
||||
/// the disconnect was intentional and should not emit a second Disconnected event.
|
||||
intentional_stop: Arc<AtomicBool>,
|
||||
/// Tracks when the most recent user message was sent. Cleared when a `Result` message
|
||||
/// arrives. The mid-session watchdog uses this to detect a stuck process.
|
||||
pending_since: Arc<Mutex<Option<Instant>>>,
|
||||
/// Incremented each time `start()` is called so each session's watchdog knows when to exit.
|
||||
watchdog_generation: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
impl WslBridge {
|
||||
@@ -128,6 +133,8 @@ impl WslBridge {
|
||||
conversation_id: None,
|
||||
received_init: Arc::new(AtomicBool::new(false)),
|
||||
intentional_stop: Arc::new(AtomicBool::new(false)),
|
||||
pending_since: Arc::new(Mutex::new(None)),
|
||||
watchdog_generation: Arc::new(AtomicU64::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,6 +149,8 @@ impl WslBridge {
|
||||
conversation_id: Some(conversation_id),
|
||||
received_init: Arc::new(AtomicBool::new(false)),
|
||||
intentional_stop: Arc::new(AtomicBool::new(false)),
|
||||
pending_since: Arc::new(Mutex::new(None)),
|
||||
watchdog_generation: Arc::new(AtomicU64::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -442,6 +451,7 @@ impl WslBridge {
|
||||
let conv_id = self.conversation_id.clone();
|
||||
let received_init_clone = self.received_init.clone();
|
||||
let intentional_stop_clone = self.intentional_stop.clone();
|
||||
let pending_since_clone = self.pending_since.clone();
|
||||
thread::spawn(move || {
|
||||
handle_stdout(
|
||||
stdout,
|
||||
@@ -450,6 +460,7 @@ impl WslBridge {
|
||||
conv_id,
|
||||
received_init_clone,
|
||||
intentional_stop_clone,
|
||||
pending_since_clone,
|
||||
);
|
||||
});
|
||||
}
|
||||
@@ -487,6 +498,48 @@ impl WslBridge {
|
||||
}
|
||||
});
|
||||
|
||||
// Reset the pending-since tracker for this new session so stale state from a previous
|
||||
// session never triggers the mid-session watchdog immediately.
|
||||
*self.pending_since.lock() = None;
|
||||
|
||||
// Mid-session watchdog: if a user message is sent but no Result arrives within 5 minutes,
|
||||
// the Claude Code process is stuck. Kill it so the user gets a disconnect event and can
|
||||
// reconnect. The generation counter ensures old watchdogs from previous sessions exit
|
||||
// cleanly when `start()` is called again.
|
||||
let generation = self.watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
let process_mid_watchdog = self.process.clone();
|
||||
let pending_since_watchdog = self.pending_since.clone();
|
||||
let generation_watchdog = self.watchdog_generation.clone();
|
||||
const STUCK_TIMEOUT: Duration = Duration::from_secs(5 * 60);
|
||||
const POLL_INTERVAL: Duration = Duration::from_secs(30);
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
thread::sleep(POLL_INTERVAL);
|
||||
// Exit if a newer session has started.
|
||||
if generation_watchdog.load(Ordering::SeqCst) != generation {
|
||||
break;
|
||||
}
|
||||
// Exit if the process has already been taken (killed or stopped).
|
||||
if process_mid_watchdog.lock().is_none() {
|
||||
break;
|
||||
}
|
||||
let elapsed = (*pending_since_watchdog.lock()).map(|t| t.elapsed());
|
||||
if let Some(elapsed) = elapsed {
|
||||
if elapsed >= STUCK_TIMEOUT {
|
||||
tracing::warn!(
|
||||
"Mid-session watchdog: no Result received in {:?}; killing stuck process",
|
||||
elapsed
|
||||
);
|
||||
if let Some(mut proc) = process_mid_watchdog.lock().take() {
|
||||
let _ = proc.kill();
|
||||
let _ = proc.wait();
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -523,6 +576,10 @@ impl WslBridge {
|
||||
.flush()
|
||||
.map_err(|e| format!("Failed to flush stdin: {}", e))?;
|
||||
|
||||
// Record the time this message was sent so the mid-session watchdog can detect
|
||||
// if no Result ever arrives (i.e. the process is stuck).
|
||||
*self.pending_since.lock() = Some(Instant::now());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -762,15 +819,21 @@ fn handle_stdout(
|
||||
conversation_id: Option<String>,
|
||||
received_init: Arc<AtomicBool>,
|
||||
intentional_stop: Arc<AtomicBool>,
|
||||
pending_since: Arc<Mutex<Option<Instant>>>,
|
||||
) {
|
||||
let reader = BufReader::new(stdout);
|
||||
|
||||
for line in reader.lines() {
|
||||
match line {
|
||||
Ok(line) if !line.is_empty() => {
|
||||
if let Err(e) =
|
||||
process_json_line(&line, &app, &stats, &conversation_id, &received_init)
|
||||
{
|
||||
if let Err(e) = process_json_line(
|
||||
&line,
|
||||
&app,
|
||||
&stats,
|
||||
&conversation_id,
|
||||
&received_init,
|
||||
&pending_since,
|
||||
) {
|
||||
tracing::error!("Error processing line: {}", e);
|
||||
}
|
||||
}
|
||||
@@ -1026,6 +1089,7 @@ fn process_json_line(
|
||||
stats: &Arc<RwLock<UsageStats>>,
|
||||
conversation_id: &Option<String>,
|
||||
received_init: &Arc<AtomicBool>,
|
||||
pending_since: &Arc<Mutex<Option<Instant>>>,
|
||||
) -> Result<(), String> {
|
||||
let message: ClaudeMessage = serde_json::from_str(line)
|
||||
.map_err(|e| format!("Failed to parse JSON: {} - Line: {}", e, line))?;
|
||||
@@ -1412,6 +1476,10 @@ fn process_json_line(
|
||||
permission_denials.as_ref().map(|d| d.len())
|
||||
);
|
||||
|
||||
// A Result means the turn is complete — clear pending_since so the mid-session
|
||||
// watchdog knows the process is not stuck.
|
||||
*pending_since.lock() = None;
|
||||
|
||||
let state = if subtype == "success" {
|
||||
CharacterState::Success
|
||||
} else {
|
||||
@@ -2493,4 +2561,98 @@ mod tests {
|
||||
panic!("Expected ToolResult variant");
|
||||
}
|
||||
}
|
||||
|
||||
// Mid-session watchdog: pending_since lifecycle tests
|
||||
|
||||
#[test]
|
||||
fn test_pending_since_starts_as_none() {
|
||||
let bridge = WslBridge::new();
|
||||
assert!(bridge.pending_since.lock().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watchdog_generation_starts_at_zero() {
|
||||
let bridge = WslBridge::new();
|
||||
assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pending_since_set_reflects_elapsed_time() {
|
||||
let pending_since: Arc<Mutex<Option<Instant>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
// Simulate send_message setting pending_since
|
||||
*pending_since.lock() = Some(Instant::now());
|
||||
|
||||
// Should be Some and elapsed should be tiny (< 1 second)
|
||||
let elapsed = (*pending_since.lock()).map(|t| t.elapsed());
|
||||
assert!(elapsed.is_some());
|
||||
assert!(elapsed.unwrap() < Duration::from_secs(1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_pending_since_cleared_on_result_simulates_watchdog_safe() {
|
||||
let pending_since: Arc<Mutex<Option<Instant>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
// Simulate send_message
|
||||
*pending_since.lock() = Some(Instant::now());
|
||||
assert!(pending_since.lock().is_some());
|
||||
|
||||
// Simulate Result message arriving (as process_json_line does)
|
||||
*pending_since.lock() = None;
|
||||
assert!(pending_since.lock().is_none());
|
||||
|
||||
// Watchdog check: elapsed is None → no kill triggered
|
||||
let elapsed = (*pending_since.lock()).map(|t| t.elapsed());
|
||||
assert!(elapsed.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watchdog_generation_increments_per_session() {
|
||||
let bridge = WslBridge::new();
|
||||
assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 0);
|
||||
|
||||
// Simulate what start() does: fetch_add(1) returns old value, +1 gives new generation
|
||||
let gen1 = bridge.watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
assert_eq!(gen1, 1);
|
||||
assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 1);
|
||||
|
||||
let gen2 = bridge.watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
assert_eq!(gen2, 2);
|
||||
assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_watchdog_generation_mismatch_means_old_session() {
|
||||
// Simulate: watchdog captured generation=1, but start() was called again → generation=2.
|
||||
// The watchdog should detect this and exit without killing.
|
||||
let generation_arc: Arc<AtomicU64> = Arc::new(AtomicU64::new(1));
|
||||
let captured_generation: u64 = 1;
|
||||
|
||||
assert_eq!(generation_arc.load(Ordering::SeqCst), captured_generation, "same session");
|
||||
|
||||
// New start() call increments generation
|
||||
generation_arc.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
assert_ne!(
|
||||
generation_arc.load(Ordering::SeqCst),
|
||||
captured_generation,
|
||||
"old watchdog detects new session and should exit"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stuck_timeout_threshold() {
|
||||
// The timeout constant used in the mid-session watchdog is 5 minutes.
|
||||
// This test documents and validates that threshold.
|
||||
const STUCK_TIMEOUT: Duration = Duration::from_secs(5 * 60);
|
||||
assert_eq!(STUCK_TIMEOUT, Duration::from_secs(300));
|
||||
|
||||
// A message sent 4m59s ago should NOT trigger the watchdog
|
||||
let just_under = Duration::from_secs(299);
|
||||
assert!(just_under < STUCK_TIMEOUT);
|
||||
|
||||
// A message sent 5m0s ago SHOULD trigger the watchdog
|
||||
let exactly_at = Duration::from_secs(300);
|
||||
assert!(exactly_at >= STUCK_TIMEOUT);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user