diff --git a/src-tauri/src/wsl_bridge.rs b/src-tauri/src/wsl_bridge.rs index c2da718..945a11d 100644 --- a/src-tauri/src/wsl_bridge.rs +++ b/src-tauri/src/wsl_bridge.rs @@ -1,9 +1,9 @@ use std::io::{BufRead, BufReader, Write}; use std::process::{Child, ChildStdin, Command, Stdio}; -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; use std::thread; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use parking_lot::Mutex; use tauri::{AppHandle, Emitter}; use tempfile::NamedTempFile; @@ -114,6 +114,11 @@ pub struct WslBridge { /// Set to true by stop()/interrupt() before killing the process so handle_stdout knows /// the disconnect was intentional and should not emit a second Disconnected event. intentional_stop: Arc, + /// Tracks when the most recent user message was sent. Cleared when a `Result` message + /// arrives. The mid-session watchdog uses this to detect a stuck process. + pending_since: Arc>>, + /// Incremented each time `start()` is called so each session's watchdog knows when to exit. + watchdog_generation: Arc, } impl WslBridge { @@ -128,6 +133,8 @@ impl WslBridge { conversation_id: None, received_init: Arc::new(AtomicBool::new(false)), intentional_stop: Arc::new(AtomicBool::new(false)), + pending_since: Arc::new(Mutex::new(None)), + watchdog_generation: Arc::new(AtomicU64::new(0)), } } @@ -142,6 +149,8 @@ impl WslBridge { conversation_id: Some(conversation_id), received_init: Arc::new(AtomicBool::new(false)), intentional_stop: Arc::new(AtomicBool::new(false)), + pending_since: Arc::new(Mutex::new(None)), + watchdog_generation: Arc::new(AtomicU64::new(0)), } } @@ -442,6 +451,7 @@ impl WslBridge { let conv_id = self.conversation_id.clone(); let received_init_clone = self.received_init.clone(); let intentional_stop_clone = self.intentional_stop.clone(); + let pending_since_clone = self.pending_since.clone(); thread::spawn(move || { handle_stdout( stdout, @@ -450,6 +460,7 @@ impl WslBridge { conv_id, received_init_clone, intentional_stop_clone, + pending_since_clone, ); }); } @@ -487,6 +498,48 @@ impl WslBridge { } }); + // Reset the pending-since tracker for this new session so stale state from a previous + // session never triggers the mid-session watchdog immediately. + *self.pending_since.lock() = None; + + // Mid-session watchdog: if a user message is sent but no Result arrives within 5 minutes, + // the Claude Code process is stuck. Kill it so the user gets a disconnect event and can + // reconnect. The generation counter ensures old watchdogs from previous sessions exit + // cleanly when `start()` is called again. + let generation = self.watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1; + let process_mid_watchdog = self.process.clone(); + let pending_since_watchdog = self.pending_since.clone(); + let generation_watchdog = self.watchdog_generation.clone(); + const STUCK_TIMEOUT: Duration = Duration::from_secs(5 * 60); + const POLL_INTERVAL: Duration = Duration::from_secs(30); + thread::spawn(move || { + loop { + thread::sleep(POLL_INTERVAL); + // Exit if a newer session has started. + if generation_watchdog.load(Ordering::SeqCst) != generation { + break; + } + // Exit if the process has already been taken (killed or stopped). + if process_mid_watchdog.lock().is_none() { + break; + } + let elapsed = (*pending_since_watchdog.lock()).map(|t| t.elapsed()); + if let Some(elapsed) = elapsed { + if elapsed >= STUCK_TIMEOUT { + tracing::warn!( + "Mid-session watchdog: no Result received in {:?}; killing stuck process", + elapsed + ); + if let Some(mut proc) = process_mid_watchdog.lock().take() { + let _ = proc.kill(); + let _ = proc.wait(); + } + break; + } + } + } + }); + Ok(()) } @@ -523,6 +576,10 @@ impl WslBridge { .flush() .map_err(|e| format!("Failed to flush stdin: {}", e))?; + // Record the time this message was sent so the mid-session watchdog can detect + // if no Result ever arrives (i.e. the process is stuck). + *self.pending_since.lock() = Some(Instant::now()); + Ok(()) } @@ -762,15 +819,21 @@ fn handle_stdout( conversation_id: Option, received_init: Arc, intentional_stop: Arc, + pending_since: Arc>>, ) { let reader = BufReader::new(stdout); for line in reader.lines() { match line { Ok(line) if !line.is_empty() => { - if let Err(e) = - process_json_line(&line, &app, &stats, &conversation_id, &received_init) - { + if let Err(e) = process_json_line( + &line, + &app, + &stats, + &conversation_id, + &received_init, + &pending_since, + ) { tracing::error!("Error processing line: {}", e); } } @@ -1026,6 +1089,7 @@ fn process_json_line( stats: &Arc>, conversation_id: &Option, received_init: &Arc, + pending_since: &Arc>>, ) -> Result<(), String> { let message: ClaudeMessage = serde_json::from_str(line) .map_err(|e| format!("Failed to parse JSON: {} - Line: {}", e, line))?; @@ -1412,6 +1476,10 @@ fn process_json_line( permission_denials.as_ref().map(|d| d.len()) ); + // A Result means the turn is complete — clear pending_since so the mid-session + // watchdog knows the process is not stuck. + *pending_since.lock() = None; + let state = if subtype == "success" { CharacterState::Success } else { @@ -2493,4 +2561,98 @@ mod tests { panic!("Expected ToolResult variant"); } } + + // Mid-session watchdog: pending_since lifecycle tests + + #[test] + fn test_pending_since_starts_as_none() { + let bridge = WslBridge::new(); + assert!(bridge.pending_since.lock().is_none()); + } + + #[test] + fn test_watchdog_generation_starts_at_zero() { + let bridge = WslBridge::new(); + assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 0); + } + + #[test] + fn test_pending_since_set_reflects_elapsed_time() { + let pending_since: Arc>> = Arc::new(Mutex::new(None)); + + // Simulate send_message setting pending_since + *pending_since.lock() = Some(Instant::now()); + + // Should be Some and elapsed should be tiny (< 1 second) + let elapsed = (*pending_since.lock()).map(|t| t.elapsed()); + assert!(elapsed.is_some()); + assert!(elapsed.unwrap() < Duration::from_secs(1)); + } + + #[test] + fn test_pending_since_cleared_on_result_simulates_watchdog_safe() { + let pending_since: Arc>> = Arc::new(Mutex::new(None)); + + // Simulate send_message + *pending_since.lock() = Some(Instant::now()); + assert!(pending_since.lock().is_some()); + + // Simulate Result message arriving (as process_json_line does) + *pending_since.lock() = None; + assert!(pending_since.lock().is_none()); + + // Watchdog check: elapsed is None → no kill triggered + let elapsed = (*pending_since.lock()).map(|t| t.elapsed()); + assert!(elapsed.is_none()); + } + + #[test] + fn test_watchdog_generation_increments_per_session() { + let bridge = WslBridge::new(); + assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 0); + + // Simulate what start() does: fetch_add(1) returns old value, +1 gives new generation + let gen1 = bridge.watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1; + assert_eq!(gen1, 1); + assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 1); + + let gen2 = bridge.watchdog_generation.fetch_add(1, Ordering::SeqCst) + 1; + assert_eq!(gen2, 2); + assert_eq!(bridge.watchdog_generation.load(Ordering::SeqCst), 2); + } + + #[test] + fn test_watchdog_generation_mismatch_means_old_session() { + // Simulate: watchdog captured generation=1, but start() was called again → generation=2. + // The watchdog should detect this and exit without killing. + let generation_arc: Arc = Arc::new(AtomicU64::new(1)); + let captured_generation: u64 = 1; + + assert_eq!(generation_arc.load(Ordering::SeqCst), captured_generation, "same session"); + + // New start() call increments generation + generation_arc.fetch_add(1, Ordering::SeqCst); + + assert_ne!( + generation_arc.load(Ordering::SeqCst), + captured_generation, + "old watchdog detects new session and should exit" + ); + } + + #[test] + fn test_stuck_timeout_threshold() { + // The timeout constant used in the mid-session watchdog is 5 minutes. + // This test documents and validates that threshold. + const STUCK_TIMEOUT: Duration = Duration::from_secs(5 * 60); + assert_eq!(STUCK_TIMEOUT, Duration::from_secs(300)); + + // A message sent 4m59s ago should NOT trigger the watchdog + let just_under = Duration::from_secs(299); + assert!(just_under < STUCK_TIMEOUT); + + // A message sent 5m0s ago SHOULD trigger the watchdog + let exactly_at = Duration::from_secs(300); + assert!(exactly_at >= STUCK_TIMEOUT); + } } diff --git a/src/lib/stores/conversations.test.ts b/src/lib/stores/conversations.test.ts index c5cc6c7..2a5d9e7 100644 --- a/src/lib/stores/conversations.test.ts +++ b/src/lib/stores/conversations.test.ts @@ -607,7 +607,9 @@ describe("isProcessing state management", () => { }); it("setProcessingForConversation does nothing for unknown conversation", () => { - const conversations = new Map([["conv-1", { isProcessing: false, lastActivityAt: new Date(0) }]]); + const conversations = new Map([ + ["conv-1", { isProcessing: false, lastActivityAt: new Date(0) }], + ]); const setProcessingForConversation = (conversationId: string, processing: boolean) => { const conv = conversations.get(conversationId); diff --git a/src/lib/tauri.ts b/src/lib/tauri.ts index 910002c..274d7f1 100644 --- a/src/lib/tauri.ts +++ b/src/lib/tauri.ts @@ -236,9 +236,10 @@ export async function initializeTauriListeners() { ); } - // Update character state for this conversation + // Update character state and processing state for this conversation if (targetConversationId) { claudeStore.setCharacterStateForConversation(targetConversationId, "idle"); + claudeStore.setProcessingForConversation(targetConversationId, false); } } else if (status === "error") { const targetConversationId = conversation_id || get(claudeStore.activeConversationId);