feat: add wave-based parallel task execution to Task Loop

This commit is contained in:
2026-03-07 01:02:14 -08:00
committed by Naomi Carrigan
parent 7a07958b65
commit f60e45e486
4 changed files with 439 additions and 117 deletions
+158 -72
View File
@@ -4,7 +4,9 @@
import { invoke } from "@tauri-apps/api/core"; import { invoke } from "@tauri-apps/api/core";
import { import {
taskLoopStore, taskLoopStore,
findNextPendingIndex, getReadyTasks,
computeWaves,
isTaskBlocked,
buildTaskPrompt, buildTaskPrompt,
normalizeToUnixPath, normalizeToUnixPath,
type TaskLoopTask, type TaskLoopTask,
@@ -23,55 +25,62 @@
const tasks = $derived(taskLoopStore.tasks); const tasks = $derived(taskLoopStore.tasks);
const loopStatus = $derived(taskLoopStore.loopStatus); const loopStatus = $derived(taskLoopStore.loopStatus);
const currentTaskIndex = $derived(taskLoopStore.currentTaskIndex);
const sourceFile = $derived(taskLoopStore.sourceFile); const sourceFile = $derived(taskLoopStore.sourceFile);
const conversations = $derived(claudeStore.conversations); const conversations = $derived(claudeStore.conversations);
const concurrencyLimit = $derived(taskLoopStore.concurrencyLimit);
// Orchestration phase (panel-local, not persisted) // Per-task orchestration phases (panel-local, not persisted)
type LoopPhase = "waiting_for_connection" | "waiting_for_completion"; type LoopPhase = "waiting_for_connection" | "waiting_for_completion";
let loopPhase = $state<LoopPhase | null>(null); let activePhases = $state<Record<number, LoopPhase>>({});
let taskEverStarted = $state(false); let taskEverStartedMap = $state<Record<number, boolean>>({});
let isLoading = $state(false); let isLoading = $state(false);
let errorMessage = $state<string | null>(null); let errorMessage = $state<string | null>(null);
const completedCount = $derived($tasks.filter((t) => t.status === "completed").length); const completedCount = $derived($tasks.filter((t) => t.status === "completed").length);
const failedCount = $derived($tasks.filter((t) => t.status === "failed").length); const failedCount = $derived($tasks.filter((t) => t.status === "failed").length);
const blockedCount = $derived($tasks.filter((t) => t.status === "blocked").length);
const runningCount = $derived($tasks.filter((t) => t.status === "running").length);
const totalCount = $derived($tasks.length); const totalCount = $derived($tasks.length);
const waves = $derived(computeWaves($tasks));
const multiWave = $derived(waves.length > 1);
const workingStates: CharacterState[] = ["thinking", "typing", "coding", "searching", "mcp"]; const workingStates: CharacterState[] = ["thinking", "typing", "coding", "searching", "mcp"];
// Watch the current task's conversation for state transitions // Watch all active tasks' conversations for state transitions
$effect(() => { $effect(() => {
const phase = loopPhase; for (const [idxStr, phase] of Object.entries(activePhases)) {
if (!phase) return; const taskIdx = Number(idxStr);
const taskIdx = $currentTaskIndex;
const taskList = $tasks; const taskList = $tasks;
if (taskIdx < 0 || taskIdx >= taskList.length) return; if (taskIdx < 0 || taskIdx >= taskList.length) continue;
const currentTask = taskList[taskIdx]; const currentTask = taskList[taskIdx];
if (!currentTask.conversationId) return; if (!currentTask.conversationId) continue;
const conv = $conversations.get(currentTask.conversationId); const conv = $conversations.get(currentTask.conversationId);
if (!conv) return; if (!conv) continue;
if (phase === "waiting_for_connection" && conv.connectionStatus === "connected") { if (phase === "waiting_for_connection" && conv.connectionStatus === "connected") {
loopPhase = "waiting_for_completion"; activePhases = { ...activePhases, [taskIdx]: "waiting_for_completion" };
taskEverStarted = false; taskEverStartedMap = { ...taskEverStartedMap, [taskIdx]: false };
void sendTaskPrompt(currentTask, taskIdx, taskList.length); void sendTaskPrompt(currentTask, taskIdx, taskList.length);
return; continue;
} }
if (phase === "waiting_for_completion") { if (phase === "waiting_for_completion") {
if (workingStates.includes(conv.characterState)) { if (workingStates.includes(conv.characterState)) {
taskEverStarted = true; taskEverStartedMap = { ...taskEverStartedMap, [taskIdx]: true };
} }
if (taskEverStarted && conv.characterState === "idle") { if (taskEverStartedMap[taskIdx] && conv.characterState === "idle") {
taskEverStarted = false; activePhases = Object.fromEntries(
loopPhase = null; Object.entries(activePhases).filter(([k]) => Number(k) !== taskIdx)
);
taskEverStartedMap = Object.fromEntries(
Object.entries(taskEverStartedMap).filter(([k]) => Number(k) !== taskIdx)
);
void onTaskCompleted(taskIdx, "completed"); void onTaskCompleted(taskIdx, "completed");
} }
} }
}
}); });
async function sendTaskPrompt(task: TaskLoopTask, taskIdx: number, total: number): Promise<void> { async function sendTaskPrompt(task: TaskLoopTask, taskIdx: number, total: number): Promise<void> {
@@ -83,7 +92,9 @@
}); });
} catch (error) { } catch (error) {
console.error("Failed to send task prompt:", error); console.error("Failed to send task prompt:", error);
loopPhase = null; activePhases = Object.fromEntries(
Object.entries(activePhases).filter(([k]) => Number(k) !== taskIdx)
);
void onTaskCompleted(taskIdx, "failed"); void onTaskCompleted(taskIdx, "failed");
} }
} }
@@ -94,16 +105,34 @@
const currentLoopStatus = get(taskLoopStore.loopStatus); const currentLoopStatus = get(taskLoopStore.loopStatus);
if (currentLoopStatus !== "running") return; if (currentLoopStatus !== "running") return;
const taskList = get(taskLoopStore.tasks); // If any tasks are still active, wait for them
const nextIdx = findNextPendingIndex(taskList); if (Object.keys(activePhases).length > 0) return;
if (nextIdx === -1) { await advanceToNextWave();
}
async function advanceToNextWave(): Promise<void> {
const currentLoopStatus = get(taskLoopStore.loopStatus);
if (currentLoopStatus !== "running") return;
// Mark any newly-blocked tasks
const taskList = get(taskLoopStore.tasks);
taskList.forEach((task, i) => {
if (task.status === "pending" && isTaskBlocked(task, taskList)) {
taskLoopStore.setTaskStatus(i, "blocked");
}
});
const updatedTaskList = get(taskLoopStore.tasks);
const limit = get(taskLoopStore.concurrencyLimit);
const readyIndices = getReadyTasks(updatedTaskList, limit);
if (readyIndices.length === 0) {
taskLoopStore.setLoopStatus("stopped"); taskLoopStore.setLoopStatus("stopped");
taskLoopStore.setCurrentTaskIndex(-1);
return; return;
} }
await startTask(nextIdx, taskList); await Promise.all(readyIndices.map((i) => startTask(i, updatedTaskList)));
} }
async function startTask(taskIdx: number, taskList: TaskLoopTask[]): Promise<void> { async function startTask(taskIdx: number, taskList: TaskLoopTask[]): Promise<void> {
@@ -113,19 +142,17 @@
...new Set([...get(claudeStore.grantedTools), ...(config.auto_granted_tools ?? [])]), ...new Set([...get(claudeStore.grantedTools), ...(config.auto_granted_tools ?? [])]),
]; ];
// sourceFile is already normalised to a Unix path at import time.
const filePath = get(taskLoopStore.sourceFile); const filePath = get(taskLoopStore.sourceFile);
const workingDir = filePath.split("/").slice(0, -1).join("/"); const workingDir = filePath.split("/").slice(0, -1).join("/");
// Create a new conversation for this task
const conversationId = claudeStore.createConversation(task.title); const conversationId = claudeStore.createConversation(task.title);
void claudeStore.switchConversation(conversationId); void claudeStore.switchConversation(conversationId);
taskLoopStore.setTaskConversationId(taskIdx, conversationId); taskLoopStore.setTaskConversationId(taskIdx, conversationId);
taskLoopStore.setTaskStatus(taskIdx, "running"); taskLoopStore.setTaskStatus(taskIdx, "running");
taskLoopStore.setCurrentTaskIndex(taskIdx);
loopPhase = "waiting_for_connection"; activePhases = { ...activePhases, [taskIdx]: "waiting_for_connection" };
taskEverStarted = false; taskEverStartedMap = { ...taskEverStartedMap, [taskIdx]: false };
try { try {
await invoke("start_claude", { await invoke("start_claude", {
@@ -144,7 +171,9 @@
}); });
} catch (error) { } catch (error) {
console.error("Failed to start Claude for task:", error); console.error("Failed to start Claude for task:", error);
loopPhase = null; activePhases = Object.fromEntries(
Object.entries(activePhases).filter(([k]) => Number(k) !== taskIdx)
);
void onTaskCompleted(taskIdx, "failed"); void onTaskCompleted(taskIdx, "failed");
} }
} }
@@ -170,58 +199,54 @@
async function handleStart(): Promise<void> { async function handleStart(): Promise<void> {
const taskList = get(taskLoopStore.tasks); const taskList = get(taskLoopStore.tasks);
const nextIdx = findNextPendingIndex(taskList); const limit = get(taskLoopStore.concurrencyLimit);
if (nextIdx === -1) return; const readyIndices = getReadyTasks(taskList, limit);
if (readyIndices.length === 0) return;
taskLoopStore.setLoopStatus("running"); taskLoopStore.setLoopStatus("running");
await startTask(nextIdx, taskList); await Promise.all(readyIndices.map((i) => startTask(i, taskList)));
} }
function handlePause(): void { function handlePause(): void {
taskLoopStore.setLoopStatus("paused"); taskLoopStore.setLoopStatus("paused");
} }
function handleResume(): void { async function handleResume(): Promise<void> {
taskLoopStore.setLoopStatus("running"); taskLoopStore.setLoopStatus("running");
// If we're between tasks (no active phase), advance immediately if (Object.keys(activePhases).length === 0) {
if (!loopPhase) { await advanceToNextWave();
const taskList = get(taskLoopStore.tasks);
const nextIdx = findNextPendingIndex(taskList);
if (nextIdx !== -1) {
void startTask(nextIdx, taskList);
} else {
taskLoopStore.setLoopStatus("stopped");
}
} }
} }
async function handleStop(): Promise<void> { async function handleStop(): Promise<void> {
const taskIdx = get(taskLoopStore.currentTaskIndex);
const taskList = get(taskLoopStore.tasks);
const currentTask = taskIdx >= 0 ? taskList[taskIdx] : null;
taskLoopStore.setLoopStatus("stopped"); taskLoopStore.setLoopStatus("stopped");
loopPhase = null;
// Stop Claude process for the current task if running // Stop all active Claude processes
if (currentTask?.conversationId) { const taskList = get(taskLoopStore.tasks);
const stopPromises = Object.keys(activePhases).map(async (idxStr) => {
const taskIdx = Number(idxStr);
const task = taskList[taskIdx];
if (task?.conversationId) {
try { try {
await invoke("stop_claude", { conversationId: currentTask.conversationId }); await invoke("stop_claude", { conversationId: task.conversationId });
} catch (error) { } catch (error) {
console.error("Failed to stop Claude for current task:", error); console.error("Failed to stop Claude for task:", error);
} }
if (currentTask.status === "running") { if (task.status === "running") {
taskLoopStore.setTaskStatus(taskIdx, "failed"); taskLoopStore.setTaskStatus(taskIdx, "failed");
} }
} }
});
await Promise.all(stopPromises);
taskLoopStore.setCurrentTaskIndex(-1); activePhases = {};
taskEverStartedMap = {};
} }
function handleReset(): void { function handleReset(): void {
taskLoopStore.reset(); taskLoopStore.reset();
loopPhase = null; activePhases = {};
taskEverStarted = false; taskEverStartedMap = {};
errorMessage = null; errorMessage = null;
} }
@@ -235,6 +260,8 @@
return "text-green-400"; return "text-green-400";
case "failed": case "failed":
return "text-red-400"; return "text-red-400";
case "blocked":
return "text-[var(--text-tertiary)] opacity-50";
} }
} }
@@ -248,6 +275,8 @@
return "✓"; return "✓";
case "failed": case "failed":
return "✗"; return "✗";
case "blocked":
return "⊘";
} }
} }
@@ -261,6 +290,8 @@
return "bg-green-500/20 text-green-400 border-green-500/30"; return "bg-green-500/20 text-green-400 border-green-500/30";
} }
} }
const hasPendingTasks = $derived($tasks.some((t) => t.status === "pending"));
</script> </script>
<div <div
@@ -288,9 +319,7 @@
<span <span
class="text-xs px-2 py-0.5 rounded-full bg-blue-500/20 text-blue-400 border border-blue-500/30 animate-pulse" class="text-xs px-2 py-0.5 rounded-full bg-blue-500/20 text-blue-400 border border-blue-500/30 animate-pulse"
> >
Running {completedCount + {runningCount} running · {completedCount}/{totalCount} done
failedCount +
($loopStatus === "running" ? 1 : 0)}/{totalCount}
</span> </span>
{:else if $loopStatus === "paused"} {:else if $loopStatus === "paused"}
<span <span
@@ -304,7 +333,7 @@
> >
{completedCount}/{totalCount} completed{failedCount > 0 {completedCount}/{totalCount} completed{failedCount > 0
? `, ${failedCount} failed` ? `, ${failedCount} failed`
: ""} : ""}{blockedCount > 0 ? `, ${blockedCount} blocked` : ""}
</span> </span>
{/if} {/if}
</div> </div>
@@ -373,20 +402,42 @@
{$sourceFile} {$sourceFile}
</div> </div>
<!-- Task list --> <!-- Wave-grouped task list -->
<div class="flex flex-col gap-4">
{#each waves as waveIndices, waveIdx (waveIdx)}
<div>
{#if multiWave}
<div class="flex items-center gap-2 mb-2">
<span
class="text-xs font-semibold text-[var(--text-tertiary)] uppercase tracking-wide"
>
Wave {waveIdx + 1}
</span>
{#if waveIndices.length > 1}
<span class="text-xs text-[var(--text-tertiary)]">
({waveIndices.length} parallel)
</span>
{/if}
<div class="flex-1 border-t border-[var(--border-color)]"></div>
</div>
{/if}
<div class="flex flex-col gap-2"> <div class="flex flex-col gap-2">
{#each $tasks as task, index (task.id)} {#each waveIndices as taskIdx (taskIdx)}
{@const task = $tasks[taskIdx]}
{#if task}
<div <div
class="bg-[var(--bg-secondary)] border border-[var(--border-color)] rounded-lg p-3 flex items-start gap-3 {$currentTaskIndex === class="bg-[var(--bg-secondary)] border border-[var(--border-color)] rounded-lg p-3 flex items-start gap-3 {task.status ===
index && $loopStatus === 'running' 'running'
? 'border-blue-500/40 bg-blue-500/5' ? 'border-blue-500/40 bg-blue-500/5'
: task.status === 'blocked'
? 'opacity-50'
: ''}" : ''}"
> >
<!-- Status icon --> <!-- Status icon -->
<span <span
class="text-sm font-mono mt-0.5 w-4 text-center shrink-0 {statusColour( class="text-sm font-mono mt-0.5 w-4 text-center shrink-0 {statusColour(
task.status task.status
)}" )} {task.status === 'running' ? 'animate-spin' : ''}"
> >
{statusIcon(task.status)} {statusIcon(task.status)}
</span> </span>
@@ -403,19 +454,30 @@
> >
{task.priority} {task.priority}
</span> </span>
{#if $currentTaskIndex === index && $loopStatus === "running"} {#if task.status === "running"}
<span class="text-xs text-blue-400 animate-pulse shrink-0">● running</span> <span class="text-xs text-blue-400 animate-pulse shrink-0"
>● running</span
>
{:else if task.status === "blocked"}
<span class="text-xs text-[var(--text-tertiary)] shrink-0">blocked</span
>
{/if} {/if}
</div> </div>
<p class="text-xs text-[var(--text-tertiary)] mt-0.5 line-clamp-2 font-mono"> <p
class="text-xs text-[var(--text-tertiary)] mt-0.5 line-clamp-2 font-mono"
>
{task.prompt} {task.prompt}
</p> </p>
</div> </div>
<!-- Task number --> <!-- Task number -->
<span class="text-xs text-[var(--text-tertiary)] font-mono shrink-0" <span class="text-xs text-[var(--text-tertiary)] font-mono shrink-0"
>#{index + 1}</span >#{taskIdx + 1}</span
> >
</div> </div>
{/if}
{/each}
</div>
</div>
{/each} {/each}
</div> </div>
{/if} {/if}
@@ -447,13 +509,37 @@
Reset Reset
</button> </button>
{/if} {/if}
<!-- Concurrency limit control -->
{#if totalCount > 0}
<div class="flex items-center gap-1 ml-2">
<span class="text-xs text-[var(--text-tertiary)]">Parallel:</span>
<button
onclick={() => taskLoopStore.setConcurrencyLimit($concurrencyLimit - 1)}
class="w-5 h-5 flex items-center justify-center text-xs text-[var(--text-secondary)] hover:text-[var(--text-primary)] bg-[var(--bg-secondary)] hover:bg-[var(--bg-tertiary)] border border-[var(--border-color)] rounded transition-colors"
aria-label="Decrease concurrency limit"
>
</button>
<span class="text-xs font-mono text-[var(--text-primary)] w-4 text-center"
>{$concurrencyLimit}</span
>
<button
onclick={() => taskLoopStore.setConcurrencyLimit($concurrencyLimit + 1)}
class="w-5 h-5 flex items-center justify-center text-xs text-[var(--text-secondary)] hover:text-[var(--text-primary)] bg-[var(--bg-secondary)] hover:bg-[var(--bg-tertiary)] border border-[var(--border-color)] rounded transition-colors"
aria-label="Increase concurrency limit"
>
+
</button>
</div>
{/if}
</div> </div>
<div class="flex items-center gap-2"> <div class="flex items-center gap-2">
{#if totalCount === 0} {#if totalCount === 0}
<!-- no actions until tasks are loaded --> <!-- no actions until tasks are loaded -->
{:else if $loopStatus === "idle" || $loopStatus === "stopped"} {:else if $loopStatus === "idle" || $loopStatus === "stopped"}
{#if findNextPendingIndex($tasks) !== -1} {#if hasPendingTasks}
<button <button
onclick={handleStart} onclick={handleStart}
class="px-4 py-1.5 text-sm btn-trans-gradient rounded-lg transition-colors" class="px-4 py-1.5 text-sm btn-trans-gradient rounded-lg transition-colors"
+4 -1
View File
@@ -6,6 +6,7 @@ export interface PrdTask {
title: string; title: string;
prompt: string; prompt: string;
priority: "high" | "medium" | "low"; priority: "high" | "medium" | "low";
dependsOn?: string[];
} }
export interface PrdFile { export interface PrdFile {
@@ -31,7 +32,8 @@ Write the file to \`${workingDirectory}/hikari-tasks.json\` containing valid JSO
"id": "task-1", "id": "task-1",
"title": "<short descriptive title>", "title": "<short descriptive title>",
"prompt": "<detailed prompt that Claude Code can execute to complete this task>", "prompt": "<detailed prompt that Claude Code can execute to complete this task>",
"priority": "<high|medium|low>" "priority": "<high|medium|low>",
"dependsOn": []
} }
] ]
} }
@@ -43,6 +45,7 @@ Guidelines:
- Prompts should be specific and actionable, not vague - Prompts should be specific and actionable, not vague
- Order tasks logically (dependencies first) - Order tasks logically (dependencies first)
- Assign priority: high for critical path, medium for features, low for polish/cleanup - Assign priority: high for critical path, medium for features, low for polish/cleanup
- Fill in \`dependsOn\` with IDs of tasks that must complete before this one (use \`[]\` if none)
- Write only the JSON file — no explanations needed`; - Write only the JSON file — no explanations needed`;
} }
+146 -1
View File
@@ -4,15 +4,23 @@ import {
countByStatus, countByStatus,
buildTaskPrompt, buildTaskPrompt,
normalizeToUnixPath, normalizeToUnixPath,
isTaskBlocked,
getReadyTasks,
computeWaves,
type TaskLoopTask, type TaskLoopTask,
} from "./taskLoop"; } from "./taskLoop";
const makeTask = (id: string, status: TaskLoopTask["status"] = "pending"): TaskLoopTask => ({ const makeTask = (
id: string,
status: TaskLoopTask["status"] = "pending",
dependsOn?: string[]
): TaskLoopTask => ({
id, id,
title: `Task ${id}`, title: `Task ${id}`,
prompt: `Do the thing for ${id}`, prompt: `Do the thing for ${id}`,
priority: "medium", priority: "medium",
status, status,
dependsOn,
}); });
describe("findNextPendingIndex", () => { describe("findNextPendingIndex", () => {
@@ -125,3 +133,140 @@ describe("normalizeToUnixPath", () => {
expect(normalizeToUnixPath("")).toBe(""); expect(normalizeToUnixPath("")).toBe("");
}); });
}); });
describe("isTaskBlocked", () => {
it("returns false when dependsOn is empty", () => {
const task = makeTask("a", "pending", []);
expect(isTaskBlocked(task, [task])).toBe(false);
});
it("returns false when dependsOn is undefined", () => {
const task = makeTask("a", "pending");
expect(isTaskBlocked(task, [task])).toBe(false);
});
it("returns false when all dependencies are completed", () => {
const dep = makeTask("dep", "completed");
const task = makeTask("a", "pending", ["dep"]);
expect(isTaskBlocked(task, [dep, task])).toBe(false);
});
it("returns true when a dependency has failed", () => {
const dep = makeTask("dep", "failed");
const task = makeTask("a", "pending", ["dep"]);
expect(isTaskBlocked(task, [dep, task])).toBe(true);
});
it("returns true when a dependency is blocked", () => {
const dep = makeTask("dep", "blocked");
const task = makeTask("a", "pending", ["dep"]);
expect(isTaskBlocked(task, [dep, task])).toBe(true);
});
it("returns false when a dependency is still pending (not yet failed)", () => {
const dep = makeTask("dep", "pending");
const task = makeTask("a", "pending", ["dep"]);
expect(isTaskBlocked(task, [dep, task])).toBe(false);
});
it("returns false when dependency ID does not exist in task list", () => {
const task = makeTask("a", "pending", ["nonexistent"]);
expect(isTaskBlocked(task, [task])).toBe(false);
});
});
describe("getReadyTasks", () => {
it("returns empty array when task list is empty", () => {
expect(getReadyTasks([], 3)).toEqual([]);
});
it("returns all pending tasks with no deps when under limit", () => {
const tasks = [makeTask("a", "pending"), makeTask("b", "pending"), makeTask("c", "pending")];
expect(getReadyTasks(tasks, 5)).toEqual([0, 1, 2]);
});
it("respects the concurrency limit", () => {
const tasks = [makeTask("a", "pending"), makeTask("b", "pending"), makeTask("c", "pending")];
expect(getReadyTasks(tasks, 2)).toEqual([0, 1]);
});
it("skips tasks whose dependencies are not completed", () => {
const tasks = [makeTask("a", "pending"), makeTask("b", "pending", ["a"])];
// b depends on a which is pending, not completed — so only a is ready
expect(getReadyTasks(tasks, 5)).toEqual([0]);
});
it("includes task when all its dependencies are completed", () => {
const tasks = [makeTask("a", "completed"), makeTask("b", "pending", ["a"])];
expect(getReadyTasks(tasks, 5)).toEqual([1]);
});
it("skips running, completed, failed, and blocked tasks", () => {
const tasks = [
makeTask("a", "running"),
makeTask("b", "completed"),
makeTask("c", "failed"),
makeTask("d", "blocked"),
makeTask("e", "pending"),
];
expect(getReadyTasks(tasks, 5)).toEqual([4]);
});
it("returns empty when limit is 0", () => {
const tasks = [makeTask("a", "pending")];
expect(getReadyTasks(tasks, 0)).toEqual([]);
});
});
describe("computeWaves", () => {
it("returns empty array for empty task list", () => {
expect(computeWaves([])).toEqual([]);
});
it("puts all independent tasks in a single wave", () => {
const tasks = [makeTask("a", "pending"), makeTask("b", "pending"), makeTask("c", "pending")];
expect(computeWaves(tasks)).toEqual([[0, 1, 2]]);
});
it("creates one wave per task for a linear chain", () => {
const tasks = [
makeTask("a", "pending"),
makeTask("b", "pending", ["a"]),
makeTask("c", "pending", ["b"]),
];
expect(computeWaves(tasks)).toEqual([[0], [1], [2]]);
});
it("handles diamond dependency: A → B,C → D", () => {
// A has no deps, B and C depend on A, D depends on B and C
const tasks = [
makeTask("a", "pending"),
makeTask("b", "pending", ["a"]),
makeTask("c", "pending", ["a"]),
makeTask("d", "pending", ["b", "c"]),
];
const waves = computeWaves(tasks);
expect(waves).toHaveLength(3);
expect(waves[0]).toEqual([0]);
expect(waves[1]).toEqual([1, 2]);
expect(waves[2]).toEqual([3]);
});
it("groups circular dependencies into a final overflow wave", () => {
// a→b, b→a — circular; c has no deps so goes in wave 0
const tasks = [
makeTask("a", "pending", ["b"]),
makeTask("b", "pending", ["a"]),
makeTask("c", "pending"),
];
const waves = computeWaves(tasks);
// c goes in wave 0, then a+b get dumped in overflow
expect(waves[0]).toEqual([2]);
expect(waves[1]).toEqual([0, 1]);
});
it("ignores unknown dependency IDs (treats them as satisfied)", () => {
const tasks = [makeTask("a", "pending", ["nonexistent"])];
expect(computeWaves(tasks)).toEqual([[0]]);
});
});
+89 -1
View File
@@ -2,7 +2,7 @@ import { writable } from "svelte/store";
import { invoke } from "@tauri-apps/api/core"; import { invoke } from "@tauri-apps/api/core";
import type { PrdTask, PrdFile } from "./prd"; import type { PrdTask, PrdFile } from "./prd";
export type TaskStatus = "pending" | "running" | "completed" | "failed"; export type TaskStatus = "pending" | "running" | "completed" | "failed" | "blocked";
export type LoopStatus = "idle" | "running" | "paused" | "stopped"; export type LoopStatus = "idle" | "running" | "paused" | "stopped";
export interface TaskLoopTask extends PrdTask { export interface TaskLoopTask extends PrdTask {
@@ -20,6 +20,87 @@ export function countByStatus(tasks: TaskLoopTask[], status: TaskStatus): number
return tasks.filter((t) => t.status === status).length; return tasks.filter((t) => t.status === status).length;
} }
/**
* Returns true if a task is blocked — i.e. any of its `dependsOn` IDs refer to a
* task that has failed or is already blocked.
*/
export function isTaskBlocked(task: TaskLoopTask, allTasks: TaskLoopTask[]): boolean {
if (!task.dependsOn || task.dependsOn.length === 0) return false;
return task.dependsOn.some((depId) => {
const dep = allTasks.find((t) => t.id === depId);
return dep !== undefined && (dep.status === "failed" || dep.status === "blocked");
});
}
/**
* Returns indices of tasks that are ready to start: status is `pending` and all
* `dependsOn` tasks are `completed`. Respects `limit` (concurrency cap).
*/
export function getReadyTasks(tasks: TaskLoopTask[], limit: number): number[] {
const ready: number[] = [];
for (let i = 0; i < tasks.length; i++) {
if (ready.length >= limit) break;
const task = tasks[i];
if (task.status !== "pending") continue;
const depsAllDone =
!task.dependsOn ||
task.dependsOn.length === 0 ||
task.dependsOn.every((depId) => {
const dep = tasks.find((t) => t.id === depId);
return dep === undefined || dep.status === "completed";
});
if (depsAllDone) {
ready.push(i);
}
}
return ready;
}
/**
* Groups task indices into waves for UI display. Tasks with no pending dependencies
* form wave 0; tasks whose deps are all in earlier waves form the next wave, etc.
* Circular dependencies are collected into a final "overflow" wave.
*/
export function computeWaves(tasks: TaskLoopTask[]): number[][] {
const waves: number[][] = [];
const assigned = new Set<number>();
// Build an id→index map
const idToIndex = new Map<string, number>();
tasks.forEach((t, i) => idToIndex.set(t.id, i));
let remaining = tasks.map((_, i) => i).filter((i) => !assigned.has(i));
while (remaining.length > 0) {
const wave: number[] = [];
for (const i of remaining) {
const task = tasks[i];
const depsAllAssigned =
!task.dependsOn ||
task.dependsOn.length === 0 ||
task.dependsOn.every((depId) => {
const depIdx = idToIndex.get(depId);
return depIdx === undefined || assigned.has(depIdx);
});
if (depsAllAssigned) {
wave.push(i);
}
}
if (wave.length === 0) {
// Circular dependency — dump all remaining into a single wave
waves.push(remaining);
break;
}
wave.forEach((i) => assigned.add(i));
waves.push(wave);
remaining = remaining.filter((i) => !assigned.has(i));
}
return waves;
}
/** /**
* Normalises a file-picker path to a Unix path. * Normalises a file-picker path to a Unix path.
* *
@@ -51,6 +132,7 @@ function createTaskLoopStore() {
const loopStatus = writable<LoopStatus>("idle"); const loopStatus = writable<LoopStatus>("idle");
const currentTaskIndex = writable<number>(-1); const currentTaskIndex = writable<number>(-1);
const sourceFile = writable<string>(""); const sourceFile = writable<string>("");
const concurrencyLimit = writable<number>(3);
async function loadFile(path: string): Promise<void> { async function loadFile(path: string): Promise<void> {
const content = await invoke<string>("read_file_content", { path }); const content = await invoke<string>("read_file_content", { path });
@@ -90,6 +172,10 @@ function createTaskLoopStore() {
currentTaskIndex.set(index); currentTaskIndex.set(index);
} }
function setConcurrencyLimit(limit: number): void {
concurrencyLimit.set(Math.max(1, limit));
}
function reset(): void { function reset(): void {
tasks.set([]); tasks.set([]);
loopStatus.set("idle"); loopStatus.set("idle");
@@ -102,11 +188,13 @@ function createTaskLoopStore() {
loopStatus: { subscribe: loopStatus.subscribe }, loopStatus: { subscribe: loopStatus.subscribe },
currentTaskIndex: { subscribe: currentTaskIndex.subscribe }, currentTaskIndex: { subscribe: currentTaskIndex.subscribe },
sourceFile: { subscribe: sourceFile.subscribe }, sourceFile: { subscribe: sourceFile.subscribe },
concurrencyLimit: { subscribe: concurrencyLimit.subscribe },
loadFile, loadFile,
setTaskStatus, setTaskStatus,
setTaskConversationId, setTaskConversationId,
setLoopStatus, setLoopStatus,
setCurrentTaskIndex, setCurrentTaskIndex,
setConcurrencyLimit,
reset, reset,
}; };
} }