feat: pulseflare real tick logic — rules + snapshot + sigil executor (refs #5)
CI / test (push) Has been cancelled

- Implement real Pulse tick logic in durable-tick.ts
- Add snapshot rebuild from D1 collect events
- Add pulse() method with demo rules (CPU spike detection, heartbeat)
- Add adaptive tick logic (30s base, up to 5min backoff)
- Add /configure endpoint for senseKeys and configuration
- Add CAS object storage in store-d1.ts
- Update index.ts with /configure route
- Persist prev snapshot in DO storage
- Execute effects through Sigil with error handling
This commit is contained in:
2026-04-18 08:08:55 +00:00
parent 7168b68896
commit 1ba77929e6
3 changed files with 247 additions and 55 deletions
+204 -49
View File
@@ -1,6 +1,7 @@
import type { D1Database, DurableObject, DurableObjectState } from '@cloudflare/workers-types';
import type { Sensed } from '@uncaged/pulse';
import { D1PulseStore } from './store-d1.js';
import { createSigilExecutor } from './executor-sigil.js';
import { createSigilExecutor, type SigilExecutor } from './executor-sigil.js';
export interface PulseTickEnv {
DB: D1Database;
@@ -9,13 +10,22 @@ export interface PulseTickEnv {
TICK_INTERVAL_MS?: string;
}
interface PulseSnapshot {
timestamp: number;
[key: string]: Sensed<unknown> | number;
}
interface PulseEffect {
type?: string;
kind?: string;
[key: string]: unknown;
}
export class PulseTick implements DurableObject {
private tickIntervalMs: number;
private store: D1PulseStore;
private sigilExecutor: ReturnType<typeof createSigilExecutor>;
private sigilExecutor: SigilExecutor;
constructor(private state: DurableObjectState, private env: PulseTickEnv) {
this.tickIntervalMs = parseInt(env.TICK_INTERVAL_MS ?? '30000'); // Default 30s
this.store = new D1PulseStore(env.DB);
this.sigilExecutor = createSigilExecutor(
env.SIGIL_URL,
@@ -47,6 +57,14 @@ export class PulseTick implements DurableObject {
headers: { 'Content-Type': 'application/json' }
});
}
if (request.method === 'POST' && url.pathname === '/configure') {
const body = await request.json() as any;
await this.configure(body);
return new Response(JSON.stringify({ status: 'configured' }), {
headers: { 'Content-Type': 'application/json' }
});
}
return new Response('Not found', { status: 404 });
}
@@ -55,14 +73,16 @@ export class PulseTick implements DurableObject {
try {
await this.performTick();
// Schedule next tick
const nextTickTime = Date.now() + this.tickIntervalMs;
// Get adaptive tick interval
const tickMs = await this.state.storage.get<number>('tickMs') ?? 30000;
const nextTickTime = Date.now() + tickMs;
await this.state.storage.setAlarm(nextTickTime);
} catch (error) {
console.error('Error during tick:', error);
// Still schedule next tick even if this one failed
const nextTickTime = Date.now() + this.tickIntervalMs;
const fallbackTickMs = 60000; // 1 minute fallback
const nextTickTime = Date.now() + fallbackTickMs;
await this.state.storage.setAlarm(nextTickTime);
}
}
@@ -71,68 +91,203 @@ export class PulseTick implements DurableObject {
// Cancel any existing alarm
await this.state.storage.deleteAlarm();
// Set first alarm
const firstTickTime = Date.now() + this.tickIntervalMs;
// Initialize default configuration if not set
const senseKeys = await this.state.storage.get<string[]>('senseKeys');
if (!senseKeys) {
await this.state.storage.put('senseKeys', ['cpu_usage', 'memory_usage']);
}
const tickMs = await this.state.storage.get<number>('tickMs') ?? 30000;
const firstTickTime = Date.now() + tickMs;
await this.state.storage.setAlarm(firstTickTime);
await this.state.storage.put('ticking', true);
await this.state.storage.put('lastTickAt', null);
}
private async configure(config: { senseKeys?: string[]; tickMs?: number; codeRev?: string }): Promise<void> {
if (config.senseKeys) {
await this.state.storage.put('senseKeys', config.senseKeys);
}
if (config.tickMs) {
await this.state.storage.put('tickMs', config.tickMs);
}
if (config.codeRev) {
await this.state.storage.put('codeRev', config.codeRev);
}
}
private async performTick(): Promise<void> {
console.log('Performing tick at', new Date().toISOString());
const tickStartTime = Date.now();
console.log('Performing Pulse tick at', new Date(tickStartTime).toISOString());
// 1. Read latest state from D1
const hasEvents = await this.store.hasEvents();
const recentEvents = await this.store.getRecent(10);
console.log('Has events:', hasEvents, 'Recent events:', recentEvents.length);
// 2. Run Pulse tick logic (placeholder for now)
// This is where we would:
// - Load rule definitions
// - Process events through rules
// - Generate effects
// - Execute effects through Sigil
// Placeholder: just log the tick
const tickEvent = {
occurredAt: Date.now(),
kind: 'pulse.tick',
meta: JSON.stringify({
eventsCount: recentEvents.length,
tickedAt: new Date().toISOString()
})
};
await this.store.appendEvent(tickEvent);
// Example effect execution (commented out for now)
/*
try {
const result = await this.sigilExecutor.invoke('example-capability', {
message: 'Tick performed',
timestamp: Date.now()
// 1. Get prev snapshot from DO storage
const prev = await this.state.storage.get<PulseSnapshot>('prev') ?? { timestamp: 0 };
const senseKeys = await this.state.storage.get<string[]>('senseKeys') ?? [];
// 2. Rebuild current snapshot from D1
const curr = await this.rebuildSnapshot(senseKeys);
// 3. Run pulse rules
const [effects, nextTickMs] = await this.pulse(prev, curr);
// 4. Write effect events to D1
for (const effect of effects) {
await this.store.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
meta: JSON.stringify(effect),
});
}
// 5. Execute effects through Sigil
for (const effect of effects) {
const type = effect.type || effect.kind || 'unknown';
try {
const result = await this.sigilExecutor.invoke(type, effect);
console.log(`Effect executed: ${type}`, result.status === 200 ? 'success' : 'failed');
} catch (e) {
console.error('Effect execution failed:', type, e);
}
}
// 6. Save curr as next prev
await this.state.storage.put('prev', curr);
// 7. Write tick event
await this.store.appendEvent({
occurredAt: Date.now(),
kind: 'pulse.tick',
meta: JSON.stringify({
tickMs: nextTickMs,
effectCount: effects.length,
durationMs: Date.now() - tickStartTime
}),
});
console.log('Effect result:', result);
// 8. Adaptive tick + set next alarm time
const adaptedTickMs = await this.adaptTickMs(nextTickMs, effects.length);
await this.state.storage.put('tickMs', adaptedTickMs);
await this.state.storage.put('lastTickAt', tickStartTime);
console.log(`Pulse tick completed: ${effects.length} effects, next tick in ${adaptedTickMs}ms`);
} catch (error) {
console.error('Error executing effect:', error);
console.error('Pulse tick error:', error);
// Record error but don't throw - let alarm reschedule
await this.store.appendEvent({
occurredAt: Date.now(),
kind: 'pulse.error',
meta: JSON.stringify({
error: error instanceof Error ? error.message : String(error),
tickStartTime
}),
});
}
}
private async rebuildSnapshot(senseKeys: string[]): Promise<PulseSnapshot> {
const snapshot: PulseSnapshot = { timestamp: Date.now() };
for (const key of senseKeys) {
// Get the latest collect event for this sense key
const collectEvent = await this.store.getLatest('collect', key);
if (collectEvent?.meta) {
try {
const data = JSON.parse(collectEvent.meta);
snapshot[key] = {
data,
refreshedAt: collectEvent.occurredAt,
} as Sensed<unknown>;
} catch (e) {
console.warn(`Failed to parse collect event for ${key}:`, e);
}
}
}
return snapshot;
}
private async pulse(prev: PulseSnapshot, curr: PulseSnapshot): Promise<[PulseEffect[], number]> {
const effects: PulseEffect[] = [];
let nextTickMs = 30000; // Default 30s
// Built-in demo rule: detect CPU usage spikes
if (curr.cpu_usage && prev.cpu_usage) {
const currCpu = (curr.cpu_usage as Sensed<number>).data;
const prevCpu = (prev.cpu_usage as Sensed<number>).data;
if (currCpu > 80 && prevCpu <= 80) {
effects.push({
type: 'cpu-alert',
kind: 'alert',
message: `CPU usage spiked to ${currCpu}%`,
threshold: 80,
current: currCpu,
previous: prevCpu,
});
}
}
// Demo rule: send periodic heartbeat every 5 minutes if no activity
const lastHeartbeat = await this.store.getLatest('effect', 'heartbeat');
const heartbeatInterval = 5 * 60 * 1000; // 5 minutes
if (!lastHeartbeat || (Date.now() - lastHeartbeat.occurredAt > heartbeatInterval)) {
effects.push({
type: 'heartbeat',
kind: 'system',
timestamp: Date.now(),
status: 'alive',
});
}
return [effects, nextTickMs];
}
private async adaptTickMs(suggestedTickMs: number, effectCount: number): Promise<number> {
const baseTickMs = 30000; // 30s
const maxTickMs = 5 * 60 * 1000; // 5 minutes
if (effectCount > 0) {
// Active: use base tick rate
return baseTickMs;
} else {
// Idle: exponential backoff
const currentTickMs = await this.state.storage.get<number>('tickMs') ?? baseTickMs;
const backoffTickMs = Math.min(currentTickMs * 2, maxTickMs);
// Use rule's suggestion if provided and reasonable
if (suggestedTickMs !== baseTickMs && suggestedTickMs > 0) {
return Math.min(suggestedTickMs, maxTickMs);
}
return backoffTickMs;
}
*/
}
private async getStatus(): Promise<any> {
const isTicking = await this.state.storage.get<boolean>('ticking') ?? false;
const currentAlarm = await this.state.storage.getAlarm();
const hasEvents = await this.store.hasEvents();
const recentEvents = await this.store.getRecent(5);
const tickMs = await this.state.storage.get<number>('tickMs') ?? 30000;
const lastTickAt = await this.state.storage.get<number>('lastTickAt');
const senseKeys = await this.state.storage.get<string[]>('senseKeys') ?? [];
const codeRev = await this.state.storage.get<string>('codeRev');
// Get recent effect count
const recentEffects = await this.store.queryByKind('effect', { since: Date.now() - 60000 });
return {
isTicking,
nextAlarmAt: currentAlarm ? new Date(currentAlarm).toISOString() : null,
tickIntervalMs: this.tickIntervalMs,
hasEvents,
recentEventsCount: recentEvents.length,
lastEvents: recentEvents.slice(0, 3)
tickMs,
lastTickAt: lastTickAt ? new Date(lastTickAt).toISOString() : null,
senseKeys,
codeRev,
effectCount: recentEffects.length,
hasEvents: await this.store.hasEvents(),
};
}
}
+14 -1
View File
@@ -72,6 +72,18 @@ export default {
}));
}
// Configure sense keys and rules
if (request.method === 'POST' && path === '/configure') {
const durableObjectId = env.PULSE_TICK.idFromName('default');
const durableObject = env.PULSE_TICK.get(durableObjectId);
return await durableObject.fetch(new Request(request.url.replace(path, '/configure'), {
method: 'POST',
body: request.body,
headers: request.headers
}));
}
// Default 404
return new Response(JSON.stringify({
error: 'Not found',
@@ -82,7 +94,8 @@ export default {
'POST /tick',
'GET /events',
'GET /status',
'POST /start'
'POST /start',
'POST /configure'
]
}), {
status: 404,
+29 -5
View File
@@ -237,17 +237,41 @@ export class D1PulseStore implements PulseStore {
}
async putObject(data: unknown): Promise<string> {
// For simplicity, use a basic hash function
// Hash the data
const hash = await this.hashObject(data);
// Store in D1 as a simple key-value table (could be added to schema later)
// For now, just return the hash - actual CAS implementation would need object storage
// Store in a simple objects table
// Check if already exists (CAS deduplication)
const existing = await this.db
.prepare('SELECT hash FROM cas_objects WHERE hash = ?')
.bind(hash)
.first();
if (!existing) {
// Insert new object
await this.db
.prepare('INSERT INTO cas_objects (hash, data, created_at) VALUES (?, ?, ?)')
.bind(hash, JSON.stringify(data), Date.now())
.run();
}
return hash;
}
async getObject(hash: string): Promise<unknown | null> {
// Placeholder - would need object storage or D1 table for CAS
return null;
const row = await this.db
.prepare('SELECT data FROM cas_objects WHERE hash = ?')
.bind(hash)
.first<{ data: string }>();
if (!row) return null;
try {
return JSON.parse(row.data);
} catch (e) {
console.error(`Failed to parse CAS object ${hash}:`, e);
return null;
}
}
async close(): Promise<void> {