feat(pulse): async effect execution — fire-and-forget executor loop

Co-authored-by: 小墨 <xiaomooo@shazhou.work>
This commit is contained in:
小橘 🍊
2026-04-15 19:01:42 +08:00
committed by GitHub
parent 4cfb45dc8b
commit 413c496f99
4 changed files with 460 additions and 30 deletions
+274
View File
@@ -0,0 +1,274 @@
/**
* @uncaged/pulse — executorLoop tests
*
* Tests for the fire-and-forget executor loop that decouples
* effect execution from the sense·think tick loop.
*/
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore, executorLoop, type PulseStore } from './index.js';
// ── Test helpers ───────────────────────────────────────────────
interface TestEffect {
type: string;
payload?: string;
}
function makeStore(tmpDir: string, name = 'events'): PulseStore {
return createStore({
eventsDbPath: join(tmpDir, `${name}.db`),
objectsDir: join(tmpDir, 'objects'),
});
}
function writeEffectEvent(store: PulseStore, effect: TestEffect): string {
const hash = store.putObject(effect);
const event = store.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
key: hash,
hash,
meta: JSON.stringify({ type: effect.type }),
});
return event.id;
}
// ── Tests ──────────────────────────────────────────────────────
describe('executorLoop', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-executor-'));
store = makeStore(tmpDir);
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
// ── Test 1: effect event picked up and executed ──────────────
it('detects effect event and calls execute', async () => {
const executed: TestEffect[] = [];
const ac = new AbortController();
const loopDone = executorLoop<TestEffect>({
store,
execute: async (effects) => {
executed.push(...effects);
},
scanIntervalMs: 20,
signal: ac.signal,
});
// Write effect event before the first scan
const effect: TestEffect = { type: 'send-message', payload: 'hello' };
writeEffectEvent(store, effect);
// Wait for executor loop to pick it up
await new Promise((r) => setTimeout(r, 100));
ac.abort();
await loopDone;
expect(executed.length).toBe(1);
expect(executed[0]?.type).toBe('send-message');
expect(executed[0]?.payload).toBe('hello');
});
// ── Test 2: long-running effect doesn't block tick ───────────
it('long-running effect does not block subsequent scans (non-blocking)', async () => {
let slowEffectStarted = false;
let slowEffectDone = false;
// Keep a reference to the inflight promise so we can await it after abort
let inflightResolve: (() => void) | null = null;
const inflightPromise = new Promise<void>((r) => {
inflightResolve = r;
});
const ac = new AbortController();
const loopDone = executorLoop<TestEffect>({
store,
execute: async () => {
slowEffectStarted = true;
// Simulate a 150ms slow effect
await new Promise<void>((r) => setTimeout(r, 150));
slowEffectDone = true;
inflightResolve?.();
},
scanIntervalMs: 30,
signal: ac.signal,
});
const effect: TestEffect = { type: 'slow-task' };
writeEffectEvent(store, effect);
const start = Date.now();
// Wait less than the effect duration — the loop should have started the effect
// but the tick loop must not be blocked waiting for it
await new Promise((r) => setTimeout(r, 80));
const elapsed = Date.now() - start;
// Effect should have started within the 80ms window
expect(slowEffectStarted).toBe(true);
// The scan should have returned quickly (well under 150ms effect duration)
expect(elapsed).toBeLessThan(150);
// Now wait for the slow effect to complete before aborting + closing store
await inflightPromise;
ac.abort();
await loopDone;
expect(slowEffectDone).toBe(true);
});
// ── Test 3: effect-executing / effect-acked events written ───
it('writes effect-executing and effect-acked events on success', async () => {
// Track when execution completes so we can wait before reading audit events
let resolveExec: (() => void) | null = null;
const execDone = new Promise<void>((r) => {
resolveExec = r;
});
const ac = new AbortController();
const loopDone = executorLoop<TestEffect>({
store,
execute: async () => {
await new Promise((r) => setTimeout(r, 10));
resolveExec?.();
},
scanIntervalMs: 20,
signal: ac.signal,
});
const effect: TestEffect = { type: 'notify' };
const effectEventId = writeEffectEvent(store, effect);
// Wait until execute has returned (so .then() has written effect-acked)
await execDone;
// Give the .then() microtask a tick to write the acked event
await new Promise((r) => setTimeout(r, 20));
ac.abort();
await loopDone;
// effect-executing should have been written with key = effectEventId
const executing = store.getLatest('effect-executing', effectEventId);
expect(executing).not.toBeNull();
expect(executing?.key).toBe(effectEventId);
// effect-acked should have been written with key = effectEventId
const acked = store.getLatest('effect-acked', effectEventId);
expect(acked).not.toBeNull();
expect(acked?.key).toBe(effectEventId);
});
// ── Test 4: executor failure writes effect-failed ────────────
it('writes effect-failed event when executor throws', async () => {
const ac = new AbortController();
const loopDone = executorLoop<TestEffect>({
store,
execute: async () => {
throw new Error('executor boom');
},
scanIntervalMs: 20,
signal: ac.signal,
});
const effect: TestEffect = { type: 'risky-op' };
const effectEventId = writeEffectEvent(store, effect);
await new Promise((r) => setTimeout(r, 150));
ac.abort();
await loopDone;
// effect-failed should be recorded
const failed = store.getLatest('effect-failed', effectEventId);
expect(failed).not.toBeNull();
expect(failed?.key).toBe(effectEventId);
// meta should contain the error message
const meta = JSON.parse(failed?.meta ?? '{}') as { error?: string };
expect(meta.error).toContain('executor boom');
// effect-acked must NOT be written
const acked = store.getLatest('effect-acked', effectEventId);
expect(acked).toBeNull();
});
// ── Test 5: same effect not executed twice (idempotent) ──────
it('does not execute the same effect event twice (idempotent)', async () => {
let execCount = 0;
const ac = new AbortController();
const loopDone = executorLoop<TestEffect>({
store,
execute: async () => {
execCount++;
},
scanIntervalMs: 20,
signal: ac.signal,
});
const effect: TestEffect = { type: 'once-only' };
writeEffectEvent(store, effect);
// Let the loop run for several scan intervals
await new Promise((r) => setTimeout(r, 150));
ac.abort();
await loopDone;
// Even with multiple scans, execute should be called exactly once
expect(execCount).toBe(1);
});
// ── Test 6: multiple effects executed concurrently ───────────
it('executes multiple concurrent effects in parallel', async () => {
const startTimes: number[] = [];
const doneTimes: number[] = [];
const ac = new AbortController();
const loopDone = executorLoop<TestEffect>({
store,
execute: async () => {
startTimes.push(Date.now());
// Each effect takes 80ms
await new Promise((r) => setTimeout(r, 80));
doneTimes.push(Date.now());
},
scanIntervalMs: 20,
signal: ac.signal,
});
// Write 3 effect events before the first scan
const e1: TestEffect = { type: 'task-a' };
const e2: TestEffect = { type: 'task-b' };
const e3: TestEffect = { type: 'task-c' };
writeEffectEvent(store, e1);
writeEffectEvent(store, e2);
writeEffectEvent(store, e3);
// Wait long enough for all 3 to complete if concurrent (80ms),
// but not if sequential (3×80=240ms)
await new Promise((r) => setTimeout(r, 200));
ac.abort();
await loopDone;
// All 3 should have started and completed
expect(startTimes.length).toBe(3);
expect(doneTimes.length).toBe(3);
// The total elapsed from first start to last done should be < 200ms
// (concurrent), not 240ms (sequential)
const totalElapsed = Math.max(...doneTimes) - Math.min(...startTimes);
expect(totalElapsed).toBeLessThan(200);
});
});
+6 -3
View File
@@ -358,9 +358,10 @@ describe('runPulse effects', () => {
rules: [mixedRule],
senseKeys: ['sys'],
defaultTickMs: 30,
executorScanIntervalMs: 30,
}).catch(() => {}); // loop will error when store closes
await new Promise((r) => setTimeout(r, 150));
await new Promise((r) => setTimeout(r, 300));
// execute should get ALL effects — including collect
expect(executedEffects.length >= 2).toBeTruthy();
@@ -942,9 +943,10 @@ describe('runPulse execute-driven collect', () => {
rules: [collectRule],
senseKeys: ['sys'],
defaultTickMs: 30,
executorScanIntervalMs: 30,
}).catch(() => {});
await new Promise((r) => setTimeout(r, 150));
await new Promise((r) => setTimeout(r, 300));
const collectEvents = store.queryByKind('collect', { key: 'sys' });
expect(collectEvents.length >= 2).toBeTruthy();
@@ -1126,9 +1128,10 @@ describe('runPulse with ScopedStore', () => {
rules: [rule],
senseKeys: [],
defaultTickMs: 30,
executorScanIntervalMs: 30,
}).catch(() => {});
await new Promise((r) => setTimeout(r, 150));
await new Promise((r) => setTimeout(r, 300));
expect(executedEffects.length >= 1).toBeTruthy();
expect(executedEffects.some((e) => e.kind === 'notify')).toBeTruthy();
+177 -25
View File
@@ -399,6 +399,8 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
defaultTickMs?: number;
codeRev?: string;
watchers?: WatcherDef[];
/** Scan interval for the background executor loop (ms). Default 1000. */
executorScanIntervalMs?: number;
}): Promise<never> {
const { execute, rules, senseKeys, defaultTickMs = 15000, codeRev } = options;
@@ -456,6 +458,15 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
startWatcher(def, vitalsStore, wakeTick);
}
// Start executor loop — picks up effect events and executes them asynchronously
const executorAbort = new AbortController();
startExecutorLoop({
store: systemStore,
execute,
scanIntervalMs: options.executorScanIntervalMs ?? 1000,
signal: executorAbort.signal,
});
// Future: Rule scopes declaration
// Rules will be able to declare which scopes they need:
// createRule({ scopes: ['_system', '_vitals', 'neko'], accessor, decide })
@@ -476,19 +487,19 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
const tickStart = Date.now();
const [effects, nextTickMs] = await pulse(prev, curr);
// ALL effects go to execute — including collect effects
// Write effect events to store (fire-and-forget — executorLoop picks them up)
if (effects.length > 0) {
await execute(effects);
// Record effects event in system store
const effectsHash = systemStore.putObject(effects);
systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
hash: effectsHash,
meta: JSON.stringify({ count: effects.length }),
codeRev,
});
for (const effect of effects) {
const effectHash = systemStore.putObject(effect);
systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
key: effectHash,
hash: effectHash,
meta: JSON.stringify({ type: (effect as any).type || 'unknown' }),
codeRev,
});
}
}
// Record tick event in system store
@@ -524,6 +535,8 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
defaultTickMs?: number;
codeRev: string;
watchers?: WatcherDef[];
/** Scan interval for the background executor loop (ms). Default 1000. */
executorScanIntervalMs?: number;
}): Promise<never> {
const {
scopedStore,
@@ -596,6 +609,15 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
}
}
// Start executor loop — picks up effect events and executes them asynchronously
const executorAbort = new AbortController();
startExecutorLoop({
store: scopedStore.scope('_system'),
execute,
scanIntervalMs: options.executorScanIntervalMs ?? 1000,
signal: executorAbort.signal,
});
// Main loop
while (true) {
await interruptibleSleep(tickMs);
@@ -622,20 +644,20 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
const tickStart = Date.now();
const [effects, nextTickMs] = await pulse(prev, curr);
// Execute effects
// Write effect events to store (fire-and-forget — executorLoop picks them up)
if (effects.length > 0) {
await execute(effects);
// Record effects event in system store
const systemStore = scopedStore.scope('_system');
const effectsHash = systemStore.putObject(effects);
systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
hash: effectsHash,
meta: JSON.stringify({ count: effects.length }),
codeRev,
});
const sysStore = scopedStore.scope('_system');
for (const effect of effects) {
const effectHash = sysStore.putObject(effect);
sysStore.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
key: effectHash,
hash: effectHash,
meta: JSON.stringify({ type: (effect as any).type || 'unknown' }),
codeRev,
});
}
}
// Record tick event in system store
@@ -657,6 +679,136 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
}
}
// ── Executor Loop (fire-and-forget) ────────────────────────────
/**
* Independent executor loop that scans for pending effect events
* and executes them asynchronously (fire-and-forget).
*
* Decouples effect execution from the sense·think tick loop:
* ticks write effect events, executorLoop picks them up and runs them.
*/
export async function executorLoop<E>(options: {
store: PulseStore;
execute: Executor<E>;
scanIntervalMs?: number;
signal?: AbortSignal;
}): Promise<void> {
const { store, execute, scanIntervalMs = 5000, signal } = options;
/** Set of effect event ids currently being executed (prevents double-exec). */
const inflight = new Set<string>();
while (!signal?.aborted) {
try {
// Find all pending effect events
const effectEvents = store.queryByKind('effect');
for (const effectEvent of effectEvents) {
if (signal?.aborted) break;
if (inflight.has(effectEvent.id)) continue;
// Check if already acked or failed
const acked = store.getLatest('effect-acked', effectEvent.id);
if (acked) continue;
const failed = store.getLatest('effect-failed', effectEvent.id);
if (failed) continue;
const executing = store.getLatest('effect-executing', effectEvent.id);
if (executing) continue;
// Retrieve the effect object from CAS
if (!effectEvent.hash) continue;
const effectObj = store.getObject(effectEvent.hash) as E | null;
if (effectObj === null) continue;
// Mark as inflight
inflight.add(effectEvent.id);
// Write effect-executing event
store.appendEvent({
occurredAt: Date.now(),
kind: 'effect-executing',
key: effectEvent.id,
hash: effectEvent.hash,
meta: effectEvent.meta,
codeRev: effectEvent.codeRev,
});
// Fire-and-forget: execute asynchronously
execute([effectObj])
.then(() => {
// Write effect-acked event
store.appendEvent({
occurredAt: Date.now(),
kind: 'effect-acked',
key: effectEvent.id,
hash: effectEvent.hash,
codeRev: effectEvent.codeRev,
});
})
.catch((err: unknown) => {
// Write effect-failed event
const errorMessage =
err instanceof Error ? err.message : String(err);
store.appendEvent({
occurredAt: Date.now(),
kind: 'effect-failed',
key: effectEvent.id,
hash: effectEvent.hash,
meta: JSON.stringify({ error: errorMessage }),
codeRev: effectEvent.codeRev,
});
})
.finally(() => {
inflight.delete(effectEvent.id);
});
}
} catch (err) {
// Database may have been closed (e.g. during test teardown) — exit gracefully
if (
err instanceof RangeError &&
String(err.message).includes('closed database')
) {
return;
}
throw err;
}
// Sleep until next scan (or abort)
await new Promise<void>((resolve) => {
if (signal?.aborted) {
resolve();
return;
}
const timer = setTimeout(resolve, scanIntervalMs);
signal?.addEventListener(
'abort',
() => {
clearTimeout(timer);
resolve();
},
{ once: true },
);
});
}
}
/**
* Start the executor loop in the background.
* Returns the AbortController so the caller can stop it.
*/
export function startExecutorLoop<E>(options: {
store: PulseStore;
execute: Executor<E>;
scanIntervalMs?: number;
signal?: AbortSignal;
}): void {
// Fire-and-forget — the loop runs until signal is aborted
executorLoop(options).catch((err) => {
console.error('[pulse] executorLoop crashed:', err);
});
}
// ── Helpers ────────────────────────────────────────────────────
/**
+3 -2
View File
@@ -405,10 +405,11 @@ describe('E2E Tick Loop', () => {
ruleDefs: [ruleDef],
defaultTickMs: 50,
codeRev: CODEREV,
executorScanIntervalMs: 30,
});
// Wait for at least one tick cycle
await new Promise((resolve) => setTimeout(resolve, 150));
// Wait for at least one tick cycle + executor loop scan
await new Promise((resolve) => setTimeout(resolve, 300));
// Verify that effects were executed
expect(executedEffects.length).toBeGreaterThan(0);