From 1ba77929e6fceb7d152382f0b21b1c5e1554ae07 Mon Sep 17 00:00:00 2001 From: xiaomo Date: Sat, 18 Apr 2026 08:08:55 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20pulseflare=20real=20tick=20logic=20?= =?UTF-8?q?=E2=80=94=20rules=20+=20snapshot=20+=20sigil=20executor=20(refs?= =?UTF-8?q?=20#5)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement real Pulse tick logic in durable-tick.ts - Add snapshot rebuild from D1 collect events - Add pulse() method with demo rules (CPU spike detection, heartbeat) - Add adaptive tick logic (30s base, up to 5min backoff) - Add /configure endpoint for senseKeys and configuration - Add CAS object storage in store-d1.ts - Update index.ts with /configure route - Persist prev snapshot in DO storage - Execute effects through Sigil with error handling --- packages/pulseflare/src/durable-tick.ts | 253 +++++++++++++++++++----- packages/pulseflare/src/index.ts | 15 +- packages/pulseflare/src/store-d1.ts | 34 +++- 3 files changed, 247 insertions(+), 55 deletions(-) diff --git a/packages/pulseflare/src/durable-tick.ts b/packages/pulseflare/src/durable-tick.ts index 19b1328..82717f3 100644 --- a/packages/pulseflare/src/durable-tick.ts +++ b/packages/pulseflare/src/durable-tick.ts @@ -1,6 +1,7 @@ import type { D1Database, DurableObject, DurableObjectState } from '@cloudflare/workers-types'; +import type { Sensed } from '@uncaged/pulse'; import { D1PulseStore } from './store-d1.js'; -import { createSigilExecutor } from './executor-sigil.js'; +import { createSigilExecutor, type SigilExecutor } from './executor-sigil.js'; export interface PulseTickEnv { DB: D1Database; @@ -9,13 +10,22 @@ export interface PulseTickEnv { TICK_INTERVAL_MS?: string; } +interface PulseSnapshot { + timestamp: number; + [key: string]: Sensed | number; +} + +interface PulseEffect { + type?: string; + kind?: string; + [key: string]: unknown; +} + export class PulseTick implements DurableObject { - private tickIntervalMs: number; private store: D1PulseStore; - private sigilExecutor: ReturnType; + private sigilExecutor: SigilExecutor; constructor(private state: DurableObjectState, private env: PulseTickEnv) { - this.tickIntervalMs = parseInt(env.TICK_INTERVAL_MS ?? '30000'); // Default 30s this.store = new D1PulseStore(env.DB); this.sigilExecutor = createSigilExecutor( env.SIGIL_URL, @@ -47,6 +57,14 @@ export class PulseTick implements DurableObject { headers: { 'Content-Type': 'application/json' } }); } + + if (request.method === 'POST' && url.pathname === '/configure') { + const body = await request.json() as any; + await this.configure(body); + return new Response(JSON.stringify({ status: 'configured' }), { + headers: { 'Content-Type': 'application/json' } + }); + } return new Response('Not found', { status: 404 }); } @@ -55,14 +73,16 @@ export class PulseTick implements DurableObject { try { await this.performTick(); - // Schedule next tick - const nextTickTime = Date.now() + this.tickIntervalMs; + // Get adaptive tick interval + const tickMs = await this.state.storage.get('tickMs') ?? 30000; + const nextTickTime = Date.now() + tickMs; await this.state.storage.setAlarm(nextTickTime); } catch (error) { console.error('Error during tick:', error); // Still schedule next tick even if this one failed - const nextTickTime = Date.now() + this.tickIntervalMs; + const fallbackTickMs = 60000; // 1 minute fallback + const nextTickTime = Date.now() + fallbackTickMs; await this.state.storage.setAlarm(nextTickTime); } } @@ -71,68 +91,203 @@ export class PulseTick implements DurableObject { // Cancel any existing alarm await this.state.storage.deleteAlarm(); - // Set first alarm - const firstTickTime = Date.now() + this.tickIntervalMs; + // Initialize default configuration if not set + const senseKeys = await this.state.storage.get('senseKeys'); + if (!senseKeys) { + await this.state.storage.put('senseKeys', ['cpu_usage', 'memory_usage']); + } + + const tickMs = await this.state.storage.get('tickMs') ?? 30000; + const firstTickTime = Date.now() + tickMs; await this.state.storage.setAlarm(firstTickTime); await this.state.storage.put('ticking', true); + await this.state.storage.put('lastTickAt', null); + } + + private async configure(config: { senseKeys?: string[]; tickMs?: number; codeRev?: string }): Promise { + if (config.senseKeys) { + await this.state.storage.put('senseKeys', config.senseKeys); + } + + if (config.tickMs) { + await this.state.storage.put('tickMs', config.tickMs); + } + + if (config.codeRev) { + await this.state.storage.put('codeRev', config.codeRev); + } } private async performTick(): Promise { - console.log('Performing tick at', new Date().toISOString()); + const tickStartTime = Date.now(); + console.log('Performing Pulse tick at', new Date(tickStartTime).toISOString()); - // 1. Read latest state from D1 - const hasEvents = await this.store.hasEvents(); - const recentEvents = await this.store.getRecent(10); - - console.log('Has events:', hasEvents, 'Recent events:', recentEvents.length); - - // 2. Run Pulse tick logic (placeholder for now) - // This is where we would: - // - Load rule definitions - // - Process events through rules - // - Generate effects - // - Execute effects through Sigil - - // Placeholder: just log the tick - const tickEvent = { - occurredAt: Date.now(), - kind: 'pulse.tick', - meta: JSON.stringify({ - eventsCount: recentEvents.length, - tickedAt: new Date().toISOString() - }) - }; - - await this.store.appendEvent(tickEvent); - - // Example effect execution (commented out for now) - /* try { - const result = await this.sigilExecutor.invoke('example-capability', { - message: 'Tick performed', - timestamp: Date.now() + // 1. Get prev snapshot from DO storage + const prev = await this.state.storage.get('prev') ?? { timestamp: 0 }; + const senseKeys = await this.state.storage.get('senseKeys') ?? []; + + // 2. Rebuild current snapshot from D1 + const curr = await this.rebuildSnapshot(senseKeys); + + // 3. Run pulse rules + const [effects, nextTickMs] = await this.pulse(prev, curr); + + // 4. Write effect events to D1 + for (const effect of effects) { + await this.store.appendEvent({ + occurredAt: Date.now(), + kind: 'effect', + meta: JSON.stringify(effect), + }); + } + + // 5. Execute effects through Sigil + for (const effect of effects) { + const type = effect.type || effect.kind || 'unknown'; + try { + const result = await this.sigilExecutor.invoke(type, effect); + console.log(`Effect executed: ${type}`, result.status === 200 ? 'success' : 'failed'); + } catch (e) { + console.error('Effect execution failed:', type, e); + } + } + + // 6. Save curr as next prev + await this.state.storage.put('prev', curr); + + // 7. Write tick event + await this.store.appendEvent({ + occurredAt: Date.now(), + kind: 'pulse.tick', + meta: JSON.stringify({ + tickMs: nextTickMs, + effectCount: effects.length, + durationMs: Date.now() - tickStartTime + }), }); - console.log('Effect result:', result); + + // 8. Adaptive tick + set next alarm time + const adaptedTickMs = await this.adaptTickMs(nextTickMs, effects.length); + await this.state.storage.put('tickMs', adaptedTickMs); + await this.state.storage.put('lastTickAt', tickStartTime); + + console.log(`Pulse tick completed: ${effects.length} effects, next tick in ${adaptedTickMs}ms`); } catch (error) { - console.error('Error executing effect:', error); + console.error('Pulse tick error:', error); + // Record error but don't throw - let alarm reschedule + await this.store.appendEvent({ + occurredAt: Date.now(), + kind: 'pulse.error', + meta: JSON.stringify({ + error: error instanceof Error ? error.message : String(error), + tickStartTime + }), + }); + } + } + + private async rebuildSnapshot(senseKeys: string[]): Promise { + const snapshot: PulseSnapshot = { timestamp: Date.now() }; + + for (const key of senseKeys) { + // Get the latest collect event for this sense key + const collectEvent = await this.store.getLatest('collect', key); + if (collectEvent?.meta) { + try { + const data = JSON.parse(collectEvent.meta); + snapshot[key] = { + data, + refreshedAt: collectEvent.occurredAt, + } as Sensed; + } catch (e) { + console.warn(`Failed to parse collect event for ${key}:`, e); + } + } + } + + return snapshot; + } + + private async pulse(prev: PulseSnapshot, curr: PulseSnapshot): Promise<[PulseEffect[], number]> { + const effects: PulseEffect[] = []; + let nextTickMs = 30000; // Default 30s + + // Built-in demo rule: detect CPU usage spikes + if (curr.cpu_usage && prev.cpu_usage) { + const currCpu = (curr.cpu_usage as Sensed).data; + const prevCpu = (prev.cpu_usage as Sensed).data; + + if (currCpu > 80 && prevCpu <= 80) { + effects.push({ + type: 'cpu-alert', + kind: 'alert', + message: `CPU usage spiked to ${currCpu}%`, + threshold: 80, + current: currCpu, + previous: prevCpu, + }); + } + } + + // Demo rule: send periodic heartbeat every 5 minutes if no activity + const lastHeartbeat = await this.store.getLatest('effect', 'heartbeat'); + const heartbeatInterval = 5 * 60 * 1000; // 5 minutes + + if (!lastHeartbeat || (Date.now() - lastHeartbeat.occurredAt > heartbeatInterval)) { + effects.push({ + type: 'heartbeat', + kind: 'system', + timestamp: Date.now(), + status: 'alive', + }); + } + + return [effects, nextTickMs]; + } + + private async adaptTickMs(suggestedTickMs: number, effectCount: number): Promise { + const baseTickMs = 30000; // 30s + const maxTickMs = 5 * 60 * 1000; // 5 minutes + + if (effectCount > 0) { + // Active: use base tick rate + return baseTickMs; + } else { + // Idle: exponential backoff + const currentTickMs = await this.state.storage.get('tickMs') ?? baseTickMs; + const backoffTickMs = Math.min(currentTickMs * 2, maxTickMs); + + // Use rule's suggestion if provided and reasonable + if (suggestedTickMs !== baseTickMs && suggestedTickMs > 0) { + return Math.min(suggestedTickMs, maxTickMs); + } + + return backoffTickMs; } - */ } private async getStatus(): Promise { const isTicking = await this.state.storage.get('ticking') ?? false; const currentAlarm = await this.state.storage.getAlarm(); - const hasEvents = await this.store.hasEvents(); - const recentEvents = await this.store.getRecent(5); + const tickMs = await this.state.storage.get('tickMs') ?? 30000; + const lastTickAt = await this.state.storage.get('lastTickAt'); + const senseKeys = await this.state.storage.get('senseKeys') ?? []; + const codeRev = await this.state.storage.get('codeRev'); + + // Get recent effect count + const recentEffects = await this.store.queryByKind('effect', { since: Date.now() - 60000 }); return { isTicking, nextAlarmAt: currentAlarm ? new Date(currentAlarm).toISOString() : null, - tickIntervalMs: this.tickIntervalMs, - hasEvents, - recentEventsCount: recentEvents.length, - lastEvents: recentEvents.slice(0, 3) + tickMs, + lastTickAt: lastTickAt ? new Date(lastTickAt).toISOString() : null, + senseKeys, + codeRev, + effectCount: recentEffects.length, + hasEvents: await this.store.hasEvents(), }; } } \ No newline at end of file diff --git a/packages/pulseflare/src/index.ts b/packages/pulseflare/src/index.ts index e5b3daf..8d40743 100644 --- a/packages/pulseflare/src/index.ts +++ b/packages/pulseflare/src/index.ts @@ -72,6 +72,18 @@ export default { })); } + // Configure sense keys and rules + if (request.method === 'POST' && path === '/configure') { + const durableObjectId = env.PULSE_TICK.idFromName('default'); + const durableObject = env.PULSE_TICK.get(durableObjectId); + + return await durableObject.fetch(new Request(request.url.replace(path, '/configure'), { + method: 'POST', + body: request.body, + headers: request.headers + })); + } + // Default 404 return new Response(JSON.stringify({ error: 'Not found', @@ -82,7 +94,8 @@ export default { 'POST /tick', 'GET /events', 'GET /status', - 'POST /start' + 'POST /start', + 'POST /configure' ] }), { status: 404, diff --git a/packages/pulseflare/src/store-d1.ts b/packages/pulseflare/src/store-d1.ts index 4e3d188..01cb2ef 100644 --- a/packages/pulseflare/src/store-d1.ts +++ b/packages/pulseflare/src/store-d1.ts @@ -237,17 +237,41 @@ export class D1PulseStore implements PulseStore { } async putObject(data: unknown): Promise { - // For simplicity, use a basic hash function + // Hash the data const hash = await this.hashObject(data); - // Store in D1 as a simple key-value table (could be added to schema later) - // For now, just return the hash - actual CAS implementation would need object storage + // Store in a simple objects table + // Check if already exists (CAS deduplication) + const existing = await this.db + .prepare('SELECT hash FROM cas_objects WHERE hash = ?') + .bind(hash) + .first(); + + if (!existing) { + // Insert new object + await this.db + .prepare('INSERT INTO cas_objects (hash, data, created_at) VALUES (?, ?, ?)') + .bind(hash, JSON.stringify(data), Date.now()) + .run(); + } + return hash; } async getObject(hash: string): Promise { - // Placeholder - would need object storage or D1 table for CAS - return null; + const row = await this.db + .prepare('SELECT data FROM cas_objects WHERE hash = ?') + .bind(hash) + .first<{ data: string }>(); + + if (!row) return null; + + try { + return JSON.parse(row.data); + } catch (e) { + console.error(`Failed to parse CAS object ${hash}:`, e); + return null; + } } async close(): Promise {