generated from nhcarrigan/template
fix: watchdog for hung wsl bridge process (#166)
Adds a 60-second watchdog that silently kills the Claude Code process if system:init never arrives, preventing the UI from being stuck in a "Connected" state indefinitely. Refactors process handle to Arc<Mutex<Option<Child>>> for cross-thread access, and removes the unused CommandExt import.
This commit is contained in:
+75
-21
@@ -1,14 +1,13 @@
|
|||||||
use std::io::{BufRead, BufReader, Write};
|
use std::io::{BufRead, BufReader, Write};
|
||||||
use std::process::{Child, ChildStdin, Command, Stdio};
|
use std::process::{Child, ChildStdin, Command, Stdio};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||||
|
use parking_lot::Mutex;
|
||||||
use tauri::{AppHandle, Emitter};
|
use tauri::{AppHandle, Emitter};
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
|
|
||||||
#[cfg(target_os = "windows")]
|
|
||||||
use std::os::windows::process::CommandExt;
|
|
||||||
|
|
||||||
use crate::achievements::{get_achievement_info, AchievementUnlockedEvent};
|
use crate::achievements::{get_achievement_info, AchievementUnlockedEvent};
|
||||||
use crate::commands::record_cost;
|
use crate::commands::record_cost;
|
||||||
use crate::config::ClaudeStartOptions;
|
use crate::config::ClaudeStartOptions;
|
||||||
@@ -103,52 +102,58 @@ fn find_claude_binary() -> Option<String> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct WslBridge {
|
pub struct WslBridge {
|
||||||
process: Option<Child>,
|
process: Arc<Mutex<Option<Child>>>,
|
||||||
stdin: Option<ChildStdin>,
|
stdin: Option<ChildStdin>,
|
||||||
working_directory: String,
|
working_directory: String,
|
||||||
session_id: Option<String>,
|
session_id: Option<String>,
|
||||||
mcp_config_file: Option<NamedTempFile>,
|
mcp_config_file: Option<NamedTempFile>,
|
||||||
stats: Arc<RwLock<UsageStats>>,
|
stats: Arc<RwLock<UsageStats>>,
|
||||||
conversation_id: Option<String>,
|
conversation_id: Option<String>,
|
||||||
|
/// Set to true once the `system:init` message arrives, false at the start of every new session.
|
||||||
|
received_init: Arc<AtomicBool>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WslBridge {
|
impl WslBridge {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
WslBridge {
|
WslBridge {
|
||||||
process: None,
|
process: Arc::new(Mutex::new(None)),
|
||||||
stdin: None,
|
stdin: None,
|
||||||
working_directory: String::new(),
|
working_directory: String::new(),
|
||||||
session_id: None,
|
session_id: None,
|
||||||
mcp_config_file: None,
|
mcp_config_file: None,
|
||||||
stats: Arc::new(RwLock::new(UsageStats::new())),
|
stats: Arc::new(RwLock::new(UsageStats::new())),
|
||||||
conversation_id: None,
|
conversation_id: None,
|
||||||
|
received_init: Arc::new(AtomicBool::new(false)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_with_conversation_id(conversation_id: String) -> Self {
|
pub fn new_with_conversation_id(conversation_id: String) -> Self {
|
||||||
WslBridge {
|
WslBridge {
|
||||||
process: None,
|
process: Arc::new(Mutex::new(None)),
|
||||||
stdin: None,
|
stdin: None,
|
||||||
working_directory: String::new(),
|
working_directory: String::new(),
|
||||||
session_id: None,
|
session_id: None,
|
||||||
mcp_config_file: None,
|
mcp_config_file: None,
|
||||||
stats: Arc::new(RwLock::new(UsageStats::new())),
|
stats: Arc::new(RwLock::new(UsageStats::new())),
|
||||||
conversation_id: Some(conversation_id),
|
conversation_id: Some(conversation_id),
|
||||||
|
received_init: Arc::new(AtomicBool::new(false)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn start(&mut self, app: AppHandle, options: ClaudeStartOptions) -> Result<(), String> {
|
pub fn start(&mut self, app: AppHandle, options: ClaudeStartOptions) -> Result<(), String> {
|
||||||
// If a process handle exists but the process has already exited (e.g. due to a
|
// If a process handle exists but the process has already exited (e.g. due to a
|
||||||
// failed working directory), clean up the stale handle so we can restart cleanly.
|
// failed working directory), clean up the stale handle so we can restart cleanly.
|
||||||
if let Some(ref mut process) = self.process {
|
{
|
||||||
if process.try_wait().map(|s| s.is_some()).unwrap_or(false) {
|
let mut proc_guard = self.process.lock();
|
||||||
self.process = None;
|
if let Some(ref mut proc) = *proc_guard {
|
||||||
self.stdin = None;
|
if proc.try_wait().map(|s| s.is_some()).unwrap_or(false) {
|
||||||
|
*proc_guard = None;
|
||||||
|
self.stdin = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if proc_guard.is_some() {
|
||||||
|
return Err("Process already running".to_string());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if self.process.is_some() {
|
|
||||||
return Err("Process already running".to_string());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load saved achievements and stats when starting a new session
|
// Load saved achievements and stats when starting a new session
|
||||||
@@ -399,7 +404,10 @@ impl WslBridge {
|
|||||||
let stderr = child.stderr.take();
|
let stderr = child.stderr.take();
|
||||||
|
|
||||||
self.stdin = stdin;
|
self.stdin = stdin;
|
||||||
self.process = Some(child);
|
*self.process.lock() = Some(child);
|
||||||
|
|
||||||
|
// Reset the init flag so the watchdog and stdout handler start fresh.
|
||||||
|
self.received_init.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
// Note: We no longer reset stats here - stats persist across reconnects
|
// Note: We no longer reset stats here - stats persist across reconnects
|
||||||
// Stats are only reset when explicitly disconnecting via stop()
|
// Stats are only reset when explicitly disconnecting via stop()
|
||||||
@@ -416,8 +424,9 @@ impl WslBridge {
|
|||||||
let app_clone = app.clone();
|
let app_clone = app.clone();
|
||||||
let stats_clone = self.stats.clone();
|
let stats_clone = self.stats.clone();
|
||||||
let conv_id = self.conversation_id.clone();
|
let conv_id = self.conversation_id.clone();
|
||||||
|
let received_init_clone = self.received_init.clone();
|
||||||
thread::spawn(move || {
|
thread::spawn(move || {
|
||||||
handle_stdout(stdout, app_clone, stats_clone, conv_id);
|
handle_stdout(stdout, app_clone, stats_clone, conv_id, received_init_clone);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -429,12 +438,31 @@ impl WslBridge {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Emit Connected immediately so the frontend can send the greeting message.
|
||||||
|
// This is intentionally optimistic — Claude Code buffers stdout until stdin receives
|
||||||
|
// data on Windows/WSL, so we must send something to stdin first or system:init never
|
||||||
|
// arrives. The received_init flag below tracks whether init actually arrived.
|
||||||
emit_connection_status(
|
emit_connection_status(
|
||||||
&app,
|
&app,
|
||||||
ConnectionStatus::Connected,
|
ConnectionStatus::Connected,
|
||||||
self.conversation_id.clone(),
|
self.conversation_id.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Watchdog: if system:init never arrives the process is truly hung (e.g. a silent crash
|
||||||
|
// after spawning). After 5 minutes we kill it so the user isn't stuck forever.
|
||||||
|
// handle_stdout will surface the error when stdout closes after the kill.
|
||||||
|
let process_watchdog = self.process.clone();
|
||||||
|
let received_init_watchdog = self.received_init.clone();
|
||||||
|
thread::spawn(move || {
|
||||||
|
thread::sleep(Duration::from_secs(60));
|
||||||
|
if !received_init_watchdog.load(Ordering::SeqCst) {
|
||||||
|
if let Some(mut proc) = process_watchdog.lock().take() {
|
||||||
|
let _ = proc.kill();
|
||||||
|
let _ = proc.wait();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -513,7 +541,10 @@ impl WslBridge {
|
|||||||
// Due to persistent bug in Claude Code where ESC/Ctrl+C doesn't work,
|
// Due to persistent bug in Claude Code where ESC/Ctrl+C doesn't work,
|
||||||
// we have to kill the process. This is the only reliable way to stop it.
|
// we have to kill the process. This is the only reliable way to stop it.
|
||||||
// See: https://github.com/anthropics/claude-code/issues/3455
|
// See: https://github.com/anthropics/claude-code/issues/3455
|
||||||
if let Some(mut process) = self.process.take() {
|
// Extract the process first so the MutexGuard is dropped before we mutably
|
||||||
|
// borrow `self` again via estimate_interrupted_request_cost.
|
||||||
|
let maybe_process = self.process.lock().take();
|
||||||
|
if let Some(mut process) = maybe_process {
|
||||||
// Estimate cost for interrupted request before killing
|
// Estimate cost for interrupted request before killing
|
||||||
self.estimate_interrupted_request_cost(app);
|
self.estimate_interrupted_request_cost(app);
|
||||||
|
|
||||||
@@ -643,7 +674,7 @@ impl WslBridge {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop(&mut self, app: &AppHandle) {
|
pub fn stop(&mut self, app: &AppHandle) {
|
||||||
if let Some(mut process) = self.process.take() {
|
if let Some(mut process) = self.process.lock().take() {
|
||||||
let _ = process.kill();
|
let _ = process.kill();
|
||||||
let _ = process.wait();
|
let _ = process.wait();
|
||||||
}
|
}
|
||||||
@@ -674,7 +705,7 @@ impl WslBridge {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_running(&self) -> bool {
|
pub fn is_running(&self) -> bool {
|
||||||
self.process.is_some()
|
self.process.lock().is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_working_directory(&self) -> &str {
|
pub fn get_working_directory(&self) -> &str {
|
||||||
@@ -697,13 +728,16 @@ fn handle_stdout(
|
|||||||
app: AppHandle,
|
app: AppHandle,
|
||||||
stats: Arc<RwLock<UsageStats>>,
|
stats: Arc<RwLock<UsageStats>>,
|
||||||
conversation_id: Option<String>,
|
conversation_id: Option<String>,
|
||||||
|
received_init: Arc<AtomicBool>,
|
||||||
) {
|
) {
|
||||||
let reader = BufReader::new(stdout);
|
let reader = BufReader::new(stdout);
|
||||||
|
|
||||||
for line in reader.lines() {
|
for line in reader.lines() {
|
||||||
match line {
|
match line {
|
||||||
Ok(line) if !line.is_empty() => {
|
Ok(line) if !line.is_empty() => {
|
||||||
if let Err(e) = process_json_line(&line, &app, &stats, &conversation_id) {
|
if let Err(e) =
|
||||||
|
process_json_line(&line, &app, &stats, &conversation_id, &received_init)
|
||||||
|
{
|
||||||
tracing::error!("Error processing line: {}", e);
|
tracing::error!("Error processing line: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -715,6 +749,22 @@ fn handle_stdout(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If stdout closed before system:init arrived the process exited without initialising.
|
||||||
|
// Emit an error line so the user understands why the connection failed.
|
||||||
|
if !received_init.load(Ordering::SeqCst) {
|
||||||
|
let _ = app.emit(
|
||||||
|
"claude:output",
|
||||||
|
OutputEvent {
|
||||||
|
line_type: "error".to_string(),
|
||||||
|
content: "Claude Code exited before initialising. Check the working directory and Claude Code installation, then try connecting again.".to_string(),
|
||||||
|
tool_name: None,
|
||||||
|
conversation_id: conversation_id.clone(),
|
||||||
|
cost: None,
|
||||||
|
parent_tool_use_id: None,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
emit_connection_status(&app, ConnectionStatus::Disconnected, conversation_id);
|
emit_connection_status(&app, ConnectionStatus::Disconnected, conversation_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -919,6 +969,7 @@ fn process_json_line(
|
|||||||
app: &AppHandle,
|
app: &AppHandle,
|
||||||
stats: &Arc<RwLock<UsageStats>>,
|
stats: &Arc<RwLock<UsageStats>>,
|
||||||
conversation_id: &Option<String>,
|
conversation_id: &Option<String>,
|
||||||
|
received_init: &Arc<AtomicBool>,
|
||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
let message: ClaudeMessage = serde_json::from_str(line)
|
let message: ClaudeMessage = serde_json::from_str(line)
|
||||||
.map_err(|e| format!("Failed to parse JSON: {} - Line: {}", e, line))?;
|
.map_err(|e| format!("Failed to parse JSON: {} - Line: {}", e, line))?;
|
||||||
@@ -931,6 +982,9 @@ fn process_json_line(
|
|||||||
..
|
..
|
||||||
} => {
|
} => {
|
||||||
if subtype == "init" {
|
if subtype == "init" {
|
||||||
|
// Mark as initialised so the watchdog knows the process is healthy.
|
||||||
|
received_init.store(true, Ordering::SeqCst);
|
||||||
|
|
||||||
if let Some(id) = session_id {
|
if let Some(id) = session_id {
|
||||||
let _ = app.emit(
|
let _ = app.emit(
|
||||||
"claude:session",
|
"claude:session",
|
||||||
|
|||||||
Reference in New Issue
Block a user