From 5609cd36f4f36f67259aa85e4aeb8aa5d3b95ffd Mon Sep 17 00:00:00 2001 From: xiaomo Date: Sat, 18 Apr 2026 10:04:11 +0000 Subject: [PATCH] refactor: pulseflare as passive event store + projection API (refs #5) - Remove durable-tick.ts and executor-sigil.ts (no more active ticking) - Remove Durable Objects bindings from wrangler.toml - Add CAS objects table to schema.sql - Rewrite index.ts as pure API service: * POST /events - push events with auto-generated occurredAt * GET /events?kind&key&since&limit - query events * GET /projections/:key - get latest collect event meta for key * POST /objects - CAS store objects, return hash * GET /objects/:hash - retrieve objects by hash * GET /health - service health check - No more /tick, /start, /configure, /status endpoints - Pure passive event store, no rules engine or Sigil integration --- packages/pulseflare/src/durable-tick.ts | 300 ---------------------- packages/pulseflare/src/executor-sigil.ts | 20 -- packages/pulseflare/src/index.ts | 285 +++++++++++++------- packages/pulseflare/src/schema.sql | 7 + packages/pulseflare/wrangler.toml | 14 +- 5 files changed, 206 insertions(+), 420 deletions(-) delete mode 100644 packages/pulseflare/src/durable-tick.ts delete mode 100644 packages/pulseflare/src/executor-sigil.ts diff --git a/packages/pulseflare/src/durable-tick.ts b/packages/pulseflare/src/durable-tick.ts deleted file mode 100644 index 4a193f5..0000000 --- a/packages/pulseflare/src/durable-tick.ts +++ /dev/null @@ -1,300 +0,0 @@ -import type { D1Database, DurableObject, DurableObjectState } from '@cloudflare/workers-types'; -import type { Sensed } from '@uncaged/pulse'; -import { D1PulseStore } from './store-d1.js'; -import { createSigilExecutor, type SigilExecutor } from './executor-sigil.js'; - -export interface PulseTickEnv { - DB: D1Database; - SIGIL_URL: string; - SIGIL_TOKEN?: string; - TICK_INTERVAL_MS?: string; -} - -interface PulseSnapshot { - timestamp: number; - [key: string]: Sensed | number; -} - -interface PulseEffect { - type?: string; - kind?: string; - [key: string]: unknown; -} - -export class PulseTick implements DurableObject { - private store: D1PulseStore; - private sigilExecutor: SigilExecutor; - - constructor(private state: DurableObjectState, private env: PulseTickEnv) { - this.store = new D1PulseStore(env.DB); - this.sigilExecutor = createSigilExecutor( - env.SIGIL_URL, - env.SIGIL_TOKEN ?? 'default-token' - ); - } - - async fetch(request: Request): Promise { - const url = new URL(request.url); - - if (request.method === 'POST' && url.pathname === '/tick') { - // Manual tick trigger - await this.performTick(); - return new Response(JSON.stringify({ status: 'tick performed' }), { - headers: { 'Content-Type': 'application/json' } - }); - } - - if (request.method === 'GET' && url.pathname === '/status') { - try { - const status = await this.getStatus(); - return new Response(JSON.stringify(status), { - headers: { 'Content-Type': 'application/json' } - }); - } catch (e: any) { - return new Response(JSON.stringify({ error: e.message, stack: e.stack }), { - status: 500, - headers: { 'Content-Type': 'application/json' } - }); - } - } - - if (request.method === 'POST' && url.pathname === '/start') { - await this.startTicking(); - return new Response(JSON.stringify({ status: 'ticking started' }), { - headers: { 'Content-Type': 'application/json' } - }); - } - - if (request.method === 'POST' && url.pathname === '/configure') { - const body = await request.json() as any; - await this.configure(body); - return new Response(JSON.stringify({ status: 'configured' }), { - headers: { 'Content-Type': 'application/json' } - }); - } - - return new Response('Not found', { status: 404 }); - } - - async alarm(): Promise { - try { - await this.performTick(); - - // Get adaptive tick interval - const tickMs = await this.state.storage.get('tickMs') ?? 30000; - const nextTickTime = Date.now() + tickMs; - await this.state.storage.setAlarm(nextTickTime); - } catch (error) { - console.error('Error during tick:', error); - - // Still schedule next tick even if this one failed - const fallbackTickMs = 60000; // 1 minute fallback - const nextTickTime = Date.now() + fallbackTickMs; - await this.state.storage.setAlarm(nextTickTime); - } - } - - private async startTicking(): Promise { - // Cancel any existing alarm - await this.state.storage.deleteAlarm(); - - // Initialize default configuration if not set - const senseKeys = await this.state.storage.get('senseKeys'); - if (!senseKeys) { - await this.state.storage.put('senseKeys', ['cpu_usage', 'memory_usage']); - } - - const tickMs = await this.state.storage.get('tickMs') ?? 30000; - const firstTickTime = Date.now() + tickMs; - await this.state.storage.setAlarm(firstTickTime); - - await this.state.storage.put('ticking', true); - await this.state.storage.put('lastTickAt', null); - } - - private async configure(config: { senseKeys?: string[]; tickMs?: number; codeRev?: string }): Promise { - if (config.senseKeys) { - await this.state.storage.put('senseKeys', config.senseKeys); - } - - if (config.tickMs) { - await this.state.storage.put('tickMs', config.tickMs); - } - - if (config.codeRev) { - await this.state.storage.put('codeRev', config.codeRev); - } - } - - private async performTick(): Promise { - const tickStartTime = Date.now(); - console.log('Performing Pulse tick at', new Date(tickStartTime).toISOString()); - - try { - // 1. Get prev snapshot from DO storage - const prev = await this.state.storage.get('prev') ?? { timestamp: 0 }; - const senseKeys = await this.state.storage.get('senseKeys') ?? []; - - // 2. Rebuild current snapshot from D1 - const curr = await this.rebuildSnapshot(senseKeys); - - // 3. Run pulse rules - const [effects, nextTickMs] = await this.pulse(prev, curr); - - // 4. Write effect events to D1 - for (const effect of effects) { - await this.store.appendEvent({ - occurredAt: Date.now(), - kind: 'effect', - meta: JSON.stringify(effect), - }); - } - - // 5. Execute effects through Sigil - for (const effect of effects) { - const type = effect.type || effect.kind || 'unknown'; - try { - const result = await this.sigilExecutor.invoke(type, effect); - console.log(`Effect executed: ${type}`, result.status === 200 ? 'success' : 'failed'); - } catch (e) { - console.error('Effect execution failed:', type, e); - } - } - - // 6. Save curr as next prev - await this.state.storage.put('prev', curr); - - // 7. Write tick event - await this.store.appendEvent({ - occurredAt: Date.now(), - kind: 'pulse.tick', - meta: JSON.stringify({ - tickMs: nextTickMs, - effectCount: effects.length, - durationMs: Date.now() - tickStartTime - }), - }); - - // 8. Adaptive tick + set next alarm time - const adaptedTickMs = await this.adaptTickMs(nextTickMs, effects.length); - await this.state.storage.put('tickMs', adaptedTickMs); - await this.state.storage.put('lastTickAt', tickStartTime); - - console.log(`Pulse tick completed: ${effects.length} effects, next tick in ${adaptedTickMs}ms`); - } catch (error) { - console.error('Pulse tick error:', error); - // Record error but don't throw - let alarm reschedule - await this.store.appendEvent({ - occurredAt: Date.now(), - kind: 'pulse.error', - meta: JSON.stringify({ - error: error instanceof Error ? error.message : String(error), - tickStartTime - }), - }); - } - } - - private async rebuildSnapshot(senseKeys: string[]): Promise { - const snapshot: PulseSnapshot = { timestamp: Date.now() }; - - for (const key of senseKeys) { - // Get the latest collect event for this sense key - const collectEvent = await this.store.getLatest('collect', key); - if (collectEvent?.meta) { - try { - const data = JSON.parse(collectEvent.meta); - snapshot[key] = { - data, - refreshedAt: collectEvent.occurredAt, - } as Sensed; - } catch (e) { - console.warn(`Failed to parse collect event for ${key}:`, e); - } - } - } - - return snapshot; - } - - private async pulse(prev: PulseSnapshot, curr: PulseSnapshot): Promise<[PulseEffect[], number]> { - const effects: PulseEffect[] = []; - let nextTickMs = 30000; // Default 30s - - // Built-in demo rule: detect CPU usage spikes - if (curr.cpu_usage && prev.cpu_usage) { - const currCpu = (curr.cpu_usage as Sensed).data; - const prevCpu = (prev.cpu_usage as Sensed).data; - - if (currCpu > 80 && prevCpu <= 80) { - effects.push({ - type: 'cpu-alert', - kind: 'alert', - message: `CPU usage spiked to ${currCpu}%`, - threshold: 80, - current: currCpu, - previous: prevCpu, - }); - } - } - - // Demo rule: send periodic heartbeat every 5 minutes if no activity - const lastHeartbeat = await this.store.getLatest('effect', 'heartbeat'); - const heartbeatInterval = 5 * 60 * 1000; // 5 minutes - - if (!lastHeartbeat || (Date.now() - lastHeartbeat.occurredAt > heartbeatInterval)) { - effects.push({ - type: 'heartbeat', - kind: 'system', - timestamp: Date.now(), - status: 'alive', - }); - } - - return [effects, nextTickMs]; - } - - private async adaptTickMs(suggestedTickMs: number, effectCount: number): Promise { - const baseTickMs = 30000; // 30s - const maxTickMs = 5 * 60 * 1000; // 5 minutes - - if (effectCount > 0) { - // Active: use base tick rate - return baseTickMs; - } else { - // Idle: exponential backoff - const currentTickMs = await this.state.storage.get('tickMs') ?? baseTickMs; - const backoffTickMs = Math.min(currentTickMs * 2, maxTickMs); - - // Use rule's suggestion if provided and reasonable - if (suggestedTickMs !== baseTickMs && suggestedTickMs > 0) { - return Math.min(suggestedTickMs, maxTickMs); - } - - return backoffTickMs; - } - } - - private async getStatus(): Promise { - const isTicking = await this.state.storage.get('ticking') ?? false; - const currentAlarm = await this.state.storage.getAlarm(); - const tickMs = await this.state.storage.get('tickMs') ?? 30000; - const lastTickAt = await this.state.storage.get('lastTickAt'); - const senseKeys = await this.state.storage.get('senseKeys') ?? []; - const codeRev = await this.state.storage.get('codeRev'); - - // Get recent effect count - const recentEffects = await this.store.queryByKind('effect', { since: Date.now() - 60000 }); - - return { - isTicking, - nextAlarmAt: currentAlarm ? new Date(currentAlarm).toISOString() : null, - tickMs, - lastTickAt: lastTickAt ? new Date(lastTickAt).toISOString() : null, - senseKeys, - codeRev, - effectCount: recentEffects.length, - hasEvents: await this.store.hasEvents(), - }; - } -} \ No newline at end of file diff --git a/packages/pulseflare/src/executor-sigil.ts b/packages/pulseflare/src/executor-sigil.ts deleted file mode 100644 index 00404a7..0000000 --- a/packages/pulseflare/src/executor-sigil.ts +++ /dev/null @@ -1,20 +0,0 @@ -export interface SigilExecutor { - invoke(capability: string, input: unknown): Promise<{ status: number; payload: unknown }>; -} - -export function createSigilExecutor(sigilUrl: string, token: string): SigilExecutor { - return { - async invoke(capability, input) { - const res = await fetch(`${sigilUrl}/_api/invoke`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${token}`, - }, - body: JSON.stringify({ capability, params: input }), - }); - const data = await res.json(); - return { status: res.status, payload: data }; - } - }; -} \ No newline at end of file diff --git a/packages/pulseflare/src/index.ts b/packages/pulseflare/src/index.ts index 8d40743..a9bfbc5 100644 --- a/packages/pulseflare/src/index.ts +++ b/packages/pulseflare/src/index.ts @@ -1,105 +1,216 @@ -import type { D1Database, DurableObjectNamespace } from '@cloudflare/workers-types'; +import type { D1Database } from '@cloudflare/workers-types'; import { D1PulseStore } from './store-d1.js'; -import { PulseTick } from './durable-tick.js'; - -export { PulseTick }; export interface Env { DB: D1Database; - PULSE_TICK: DurableObjectNamespace; - SIGIL_URL: string; - SIGIL_TOKEN?: string; +} + +interface EventInput { + kind: string; + key?: string; + meta?: string; + codeRev?: string; +} + +interface EventResponse { + id: number; + occurredAt: number; +} + +interface ObjectResponse { + hash: string; } export default { async fetch(request: Request, env: Env): Promise { const url = new URL(request.url); const path = url.pathname; + const store = new D1PulseStore(env.DB); - // Health check - if (request.method === 'GET' && path === '/health') { + try { + // Health check + if (request.method === 'GET' && path === '/health') { + return new Response(JSON.stringify({ + status: 'healthy', + timestamp: new Date().toISOString(), + service: 'pulseflare', + version: '2.0.0-passive-eventstore' + }), { + headers: { 'Content-Type': 'application/json' }, + status: 200 + }); + } + + // POST /events - Push a new event + if (request.method === 'POST' && path === '/events') { + const eventData: EventInput = await request.json(); + + if (!eventData.kind) { + return new Response(JSON.stringify({ error: 'kind is required' }), { + status: 400, + headers: { 'Content-Type': 'application/json' } + }); + } + + const event = await store.appendEvent({ + occurredAt: Date.now(), + kind: eventData.kind, + key: eventData.key, + meta: eventData.meta, + codeRev: eventData.codeRev + }); + + const response: EventResponse = { + id: event.id, + occurredAt: event.occurredAt + }; + + return new Response(JSON.stringify(response), { + headers: { 'Content-Type': 'application/json' }, + status: 201 + }); + } + + // GET /events - Query events with filtering + if (request.method === 'GET' && path === '/events') { + const kind = url.searchParams.get('kind'); + const key = url.searchParams.get('key'); + const since = url.searchParams.get('since'); + const limit = parseInt(url.searchParams.get('limit') ?? '20'); + + let events; + + if (kind) { + // Query by kind with optional filters + const queryOpts: any = { + limit: Math.min(Math.max(1, limit), 1000) // Cap at 1000 + }; + + if (key) queryOpts.key = key; + if (since) queryOpts.since = parseInt(since); + + events = await store.queryByKind(kind, queryOpts); + } else { + // Get recent events + events = await store.getRecent(Math.min(Math.max(1, limit), 1000)); + } + + return new Response(JSON.stringify({ + events, + count: events.length, + timestamp: new Date().toISOString() + }), { + headers: { 'Content-Type': 'application/json' }, + status: 200 + }); + } + + // GET /projections/:key - Get projection for a key + if (request.method === 'GET' && path.startsWith('/projections/')) { + const key = path.split('/projections/')[1]; + + if (!key) { + return new Response(JSON.stringify({ error: 'key is required' }), { + status: 400, + headers: { 'Content-Type': 'application/json' } + }); + } + + // Get the latest 'collect' event for this key + const latestCollectEvent = await store.getLatest('collect', key); + + if (!latestCollectEvent) { + return new Response(JSON.stringify({ + key, + meta: null, + message: 'No collect event found for this key' + }), { + status: 404, + headers: { 'Content-Type': 'application/json' } + }); + } + + return new Response(JSON.stringify({ + key, + meta: latestCollectEvent.meta ? JSON.parse(latestCollectEvent.meta) : null, + eventId: latestCollectEvent.id, + occurredAt: latestCollectEvent.occurredAt + }), { + headers: { 'Content-Type': 'application/json' }, + status: 200 + }); + } + + // POST /objects - Store object with CAS + if (request.method === 'POST' && path === '/objects') { + const objectData = await request.json(); + const hash = await store.putObject(objectData); + + const response: ObjectResponse = { hash }; + + return new Response(JSON.stringify(response), { + headers: { 'Content-Type': 'application/json' }, + status: 201 + }); + } + + // GET /objects/:hash - Get object by hash + if (request.method === 'GET' && path.startsWith('/objects/')) { + const hash = path.split('/objects/')[1]; + + if (!hash) { + return new Response(JSON.stringify({ error: 'hash is required' }), { + status: 400, + headers: { 'Content-Type': 'application/json' } + }); + } + + const object = await store.getObject(hash); + + if (object === null) { + return new Response(JSON.stringify({ + error: 'Object not found', + hash + }), { + status: 404, + headers: { 'Content-Type': 'application/json' } + }); + } + + return new Response(JSON.stringify(object), { + headers: { 'Content-Type': 'application/json' }, + status: 200 + }); + } + + // Default 404 return new Response(JSON.stringify({ - status: 'healthy', - timestamp: new Date().toISOString(), - service: 'pulseflare' + error: 'Not found', + path, + method: request.method, + available_endpoints: [ + 'GET /health', + 'POST /events', + 'GET /events?kind=xxx&key=xxx&since=xxx&limit=20', + 'GET /projections/:key', + 'POST /objects', + 'GET /objects/:hash' + ] }), { + status: 404, + headers: { 'Content-Type': 'application/json' } + }); + + } catch (error) { + console.error('Request error:', error); + + return new Response(JSON.stringify({ + error: 'Internal server error', + message: error instanceof Error ? error.message : 'Unknown error' + }), { + status: 500, headers: { 'Content-Type': 'application/json' } }); } - - // Manual tick trigger - if (request.method === 'POST' && path === '/tick') { - const durableObjectId = env.PULSE_TICK.idFromName('default'); - const durableObject = env.PULSE_TICK.get(durableObjectId); - - return await durableObject.fetch(new Request(request.url.replace(path, '/tick'), { - method: 'POST' - })); - } - - // Get events - if (request.method === 'GET' && path === '/events') { - const store = new D1PulseStore(env.DB); - const limit = parseInt(url.searchParams.get('limit') ?? '20'); - const events = await store.getRecent(limit); - - return new Response(JSON.stringify({ - events, - count: events.length, - timestamp: new Date().toISOString() - }), { - headers: { 'Content-Type': 'application/json' } - }); - } - - // Get status (from Durable Object) - if (request.method === 'GET' && path === '/status') { - const durableObjectId = env.PULSE_TICK.idFromName('default'); - const durableObject = env.PULSE_TICK.get(durableObjectId); - - return await durableObject.fetch(new Request(request.url.replace(path, '/status'), { - method: 'GET' - })); - } - - // Start ticking - if (request.method === 'POST' && path === '/start') { - const durableObjectId = env.PULSE_TICK.idFromName('default'); - const durableObject = env.PULSE_TICK.get(durableObjectId); - - return await durableObject.fetch(new Request(request.url.replace(path, '/start'), { - method: 'POST' - })); - } - - // Configure sense keys and rules - if (request.method === 'POST' && path === '/configure') { - const durableObjectId = env.PULSE_TICK.idFromName('default'); - const durableObject = env.PULSE_TICK.get(durableObjectId); - - return await durableObject.fetch(new Request(request.url.replace(path, '/configure'), { - method: 'POST', - body: request.body, - headers: request.headers - })); - } - - // Default 404 - return new Response(JSON.stringify({ - error: 'Not found', - path, - method: request.method, - available_endpoints: [ - 'GET /health', - 'POST /tick', - 'GET /events', - 'GET /status', - 'POST /start', - 'POST /configure' - ] - }), { - status: 404, - headers: { 'Content-Type': 'application/json' } - }); }, }; \ No newline at end of file diff --git a/packages/pulseflare/src/schema.sql b/packages/pulseflare/src/schema.sql index 5cb5b25..694b554 100644 --- a/packages/pulseflare/src/schema.sql +++ b/packages/pulseflare/src/schema.sql @@ -30,6 +30,13 @@ CREATE TABLE IF NOT EXISTS projections ( last_event_id INTEGER NOT NULL DEFAULT 0 ); +-- CAS Objects table (Content-addressable storage) +CREATE TABLE IF NOT EXISTS cas_objects ( + hash TEXT PRIMARY KEY, + data TEXT NOT NULL, + created_at INTEGER NOT NULL +); + -- Defs tables (from defs.js) CREATE TABLE IF NOT EXISTS event_defs ( id INTEGER PRIMARY KEY AUTOINCREMENT, diff --git a/packages/pulseflare/wrangler.toml b/packages/pulseflare/wrangler.toml index 62c7336..76d0aba 100644 --- a/packages/pulseflare/wrangler.toml +++ b/packages/pulseflare/wrangler.toml @@ -5,16 +5,4 @@ compatibility_date = "2024-09-25" [[d1_databases]] binding = "DB" database_name = "pulseflare-db" -database_id = "dee1eee4-99e7-4aa3-85ec-dd376d288c9e" - -[durable_objects] -bindings = [ - { name = "PULSE_TICK", class_name = "PulseTick" } -] - -[[migrations]] -tag = "v1" -new_classes = ["PulseTick"] - -[vars] -SIGIL_URL = "https://sigil.shazhou.workers.dev" \ No newline at end of file +database_id = "dee1eee4-99e7-4aa3-85ec-dd376d288c9e" \ No newline at end of file