fix(daemon): unskip and fix 16 daemon tests #215

Merged
xiaomo merged 1 commits from fix/213-daemon-tests into main 2026-04-28 04:50:32 +00:00
4 changed files with 78 additions and 72 deletions
@@ -42,25 +42,19 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
};
}
/** Poll until predicate returns true, with a timeout. */
/** Poll until predicate returns true (async, yields to the event loop each step). */
async function pollUntil(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
intervalMs = 25,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
const deadline = Date.now() + timeoutMs;
while (!predicate()) {
if (Date.now() >= deadline) {
throw new Error(`pollUntil timed out after ${timeoutMs}ms`);
}
await new Promise<void>((resolve) => setTimeout(resolve, intervalMs));
}
}
describe("kernel integration — real child processes", () => {
@@ -121,7 +115,7 @@ describe("kernel integration — real child processes", () => {
expect(kernel.senseCount).toBe(3);
});
it.skip("workers start and respond to compute messages with signals", async () => {
it("workers start and respond to compute messages with signals", async () => {
const config = makeConfig();
kernel = createKernel(config, nerveRoot, {
workerScript: MOCK_WORKER,
@@ -140,13 +134,13 @@ describe("kernel integration — real child processes", () => {
kernel.triggerCompute("cpu-usage");
// Poll until a signal arrives on the bus (event-driven, no fixed delay)
await pollUntil(() => received.length > 0, 3000);
await pollUntil(() => received.length > 0, 10_000);
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 10_000);
}, 15_000);
it("graceful shutdown: stop() resolves after all workers exit", async () => {
const config = makeConfig({
@@ -185,7 +179,7 @@ describe("kernel integration — real child processes", () => {
await expect(stopPromise).resolves.toBeUndefined();
}, 10_000);
it.skip("compute round-trip: worker receives compute and sends signal back through bus", async () => {
it("compute round-trip: worker receives compute and sends signal back through bus", async () => {
const config = makeConfig();
kernel = createKernel(config, nerveRoot, {
workerScript: MOCK_WORKER,
@@ -204,15 +198,15 @@ describe("kernel integration — real child processes", () => {
kernel.triggerCompute("cpu-usage");
// Poll for the signal on the bus (no fixed delay)
await pollUntil(() => received.length > 0, 3000);
await pollUntil(() => received.length > 0, 10_000);
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 10_000);
}, 15_000);
it.skip("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
const config = makeConfig();
kernel = createKernel(config, nerveRoot, {
workerScript: MOCK_WORKER,
@@ -233,7 +227,7 @@ describe("kernel integration — real child processes", () => {
await pollUntil(() => {
const pid = kernel?.getWorkerPid("system");
return pid !== null && pid !== originalPid;
}, 5000);
}, 12_000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
@@ -250,7 +244,7 @@ describe("kernel integration — real child processes", () => {
kernel.triggerCompute("cpu-usage");
// Poll for the signal — verifies the new worker is fully functional
await pollUntil(() => postRespawnSignals.length > 0, 5000);
await pollUntil(() => postRespawnSignals.length > 0, 15_000);
expect(postRespawnSignals).toHaveLength(1);
expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
@@ -260,5 +254,5 @@ describe("kernel integration — real child processes", () => {
// Kernel should still stop gracefully after respawn
await kernel.stop();
kernel = null;
}, 15_000);
}, 30_000);
});
@@ -35,17 +35,26 @@ function makeMockChild(pid = 1): MockChild {
child.connected = true;
child.exitCode = null;
child.pid = pid;
// Align with fixtures/mock-worker.mjs: announce readiness once after spawn.
setImmediate(() => {
child.emit("message", { type: "ready" });
});
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
if (msg === null || typeof msg !== "object") return;
const m = msg as Record<string, unknown>;
if (m.type === "shutdown") {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
return;
}
// Sense IPC: reply to compute so scheduler completes (onComputeComplete).
if (m.type === "compute" && typeof m.sense === "string") {
setImmediate(() => {
child.emit("message", { type: "signal", sense: m.sense, payload: 42 });
});
}
});
child.kill = vi.fn((_signal?: string) => {
@@ -132,7 +141,7 @@ describe("kernel + workflowManager integration", () => {
});
describe("sense compute triggers workflow via return value", () => {
it.skip("calls workflowManager.startWorkflow when a sense compute returns a workflow launch", async () => {
it("calls workflowManager.startWorkflow when a sense compute returns a workflow launch", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
@@ -190,7 +199,7 @@ describe("kernel + workflowManager integration", () => {
await stopPromise;
});
it.skip("passes prompt and maxRounds from the workflow field to the workflow", async () => {
it("passes prompt and maxRounds from the workflow field to the workflow", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
@@ -254,7 +263,7 @@ describe("kernel + workflowManager integration", () => {
await stopPromise;
});
it.skip("logs sense signal before workflow-launch when both are present", async () => {
it("logs sense signal before workflow-launch when both are present", async () => {
const logStore = makeLogStore();
const config = makeConfig({
workflows: { "order-wf": { concurrency: 1, overflow: "drop" } },
@@ -296,7 +305,7 @@ describe("kernel + workflowManager integration", () => {
await stopPromise;
});
it.skip("does not trigger workflow when signal senseId is not in 'on' list", async () => {
it("does not trigger workflow when signal senseId is not in 'on' list", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
@@ -355,7 +364,7 @@ describe("kernel + workflowManager integration", () => {
});
describe("workflow events are logged", () => {
it.skip("logs a 'started' event when workflow thread is triggered via sense compute", async () => {
it("logs a 'started' event when workflow thread is triggered via sense compute", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
@@ -407,7 +416,7 @@ describe("kernel + workflowManager integration", () => {
});
describe("reloadConfig handles workflow changes", () => {
it.skip("new workflows are available after reloadConfig", async () => {
it("new workflows are available after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
@@ -484,7 +493,7 @@ describe("kernel + workflowManager integration", () => {
await stopPromise;
});
it.skip("old workflows are removed after reloadConfig", async () => {
it("old workflows are removed after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
@@ -566,7 +575,7 @@ describe("kernel + workflowManager integration", () => {
});
describe("graceful shutdown stops workflow workers", () => {
it.skip("stop() resolves after workflow workers exit", async () => {
it("stop() resolves after workflow workers exit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
+18 -9
View File
@@ -20,14 +20,20 @@ type MockChild = EventEmitter & {
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
setImmediate(() => {
child.emit("message", { type: "ready" });
});
child.send = vi.fn((msg: unknown) => {
// Auto-exit on shutdown so stop() resolves
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
if (msg === null || typeof msg !== "object") return;
const m = msg as Record<string, unknown>;
if (m.type === "shutdown") {
setImmediate(() => child.emit("exit", 0, null));
return;
}
if (m.type === "compute" && typeof m.sense === "string") {
setImmediate(() => {
child.emit("message", { type: "signal", sense: m.sense, payload: 42 });
});
}
});
child.kill = vi.fn((_signal?: string) => {
@@ -91,7 +97,7 @@ describe("kernel — message routing", () => {
rmSync(nerveRoot, { recursive: true, force: true });
});
it.skip("routes signal message to bus without throwing", async () => {
it("routes signal message to bus without throwing", async () => {
const config = makeConfig({
senses: {
"cpu-usage": {
@@ -106,6 +112,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1);
const child = mockChildren[0];
@@ -115,9 +122,10 @@ describe("kernel — message routing", () => {
}).not.toThrow();
await kernel.stop();
await vi.runAllTimersAsync();
});
it.skip("persists emitted signals as sense/signal log entries", async () => {
it("persists emitted signals as sense/signal log entries", async () => {
const tmpDir = mkdtempSync(join(tmpdir(), "nerve-kernel-sig-"));
const logStore = createLogStore(join(tmpDir, "logs.db"));
try {
@@ -135,13 +143,14 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, tmpDir, { logStore });
await vi.runAllTimersAsync();
const child = mockChildren[0];
child.emit("message", { type: "ready" });
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 123 });
const rows = logStore.query({ source: "sense", type: "signal", refId: "cpu-usage" });
expect(rows).toHaveLength(1);
expect(rows[0].payload).toBe(JSON.stringify(123));
await kernel.stop();
await vi.runAllTimersAsync();
} finally {
rmSync(tmpDir, { recursive: true, force: true });
}
@@ -42,21 +42,15 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
async function pollUntil(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
intervalMs = 25,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
const deadline = Date.now() + timeoutMs;
while (!predicate()) {
if (Date.now() >= deadline) {
throw new Error(`pollUntil timed out after ${timeoutMs}ms`);
}
await new Promise<void>((resolve) => setTimeout(resolve, intervalMs));
}
}
// ---------------------------------------------------------------------------
@@ -79,7 +73,7 @@ describe("phase6 — restartGroup", () => {
rmSync(nerveRoot, { recursive: true, force: true });
});
it.skip("restartGroup stops old worker and spawns a new one", async () => {
it("restartGroup stops old worker and spawns a new one", async () => {
const config = makeConfig();
kernel = createKernel(config, nerveRoot, {
workerScript: MOCK_WORKER,
@@ -96,7 +90,7 @@ describe("phase6 — restartGroup", () => {
await pollUntil(() => {
const newPid = kernel?.getWorkerPid("system");
return newPid !== null && newPid !== oldPid;
}, 5000);
}, 12_000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
@@ -106,10 +100,10 @@ describe("phase6 — restartGroup", () => {
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("cpu-usage");
await pollUntil(() => received.length > 0, 3000);
await pollUntil(() => received.length > 0, 10_000);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 15_000);
}, 35_000);
it("restartGroup on nonexistent group does nothing", async () => {
const config = makeConfig();
@@ -263,7 +257,7 @@ describe("phase6 — error isolation", () => {
rmSync(nerveRoot, { recursive: true, force: true });
});
it.skip("error from one sense does not crash the worker — other senses still work", async () => {
it("error from one sense does not crash the worker — other senses still work", async () => {
const config: NerveConfig = {
senses: {
"good-sense": {
@@ -300,11 +294,11 @@ describe("phase6 — error isolation", () => {
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("good-sense");
await pollUntil(() => received.length > 0, 3000);
await pollUntil(() => received.length > 0, 10_000);
expect(received[0]).toMatchObject({ senseId: "good-sense" });
kernel.triggerCompute("bad-sense");
await pollUntil(() => received.length > 1, 3000);
await pollUntil(() => received.length > 1, 10_000);
expect(received[1]).toMatchObject({ senseId: "bad-sense" });
unsub();
@@ -466,7 +460,7 @@ describe("phase6 — auto-respawn on worker crash", () => {
rmSync(nerveRoot, { recursive: true, force: true });
});
it.skip("kernel auto-respawns worker and new worker is functional", async () => {
it("kernel auto-respawns worker and new worker is functional", async () => {
const config = makeConfig();
kernel = createKernel(config, nerveRoot, {
workerScript: MOCK_WORKER,
@@ -483,7 +477,7 @@ describe("phase6 — auto-respawn on worker crash", () => {
await pollUntil(() => {
const pid = kernel?.getWorkerPid("system");
return pid !== null && pid !== originalPid;
}, 5000);
}, 12_000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
@@ -493,11 +487,11 @@ describe("phase6 — auto-respawn on worker crash", () => {
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("cpu-usage");
await pollUntil(() => received.length > 0, 5000);
await pollUntil(() => received.length > 0, 10_000);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
await kernel.stop();
kernel = null;
}, 15_000);
}, 30_000);
});