refactor: pulseflare as passive event store + projection API (refs #5)
CI / test (push) Has been cancelled

- Remove durable-tick.ts and executor-sigil.ts (no more active ticking)
- Remove Durable Objects bindings from wrangler.toml
- Add CAS objects table to schema.sql
- Rewrite index.ts as pure API service:
  * POST /events - push events with auto-generated occurredAt
  * GET /events?kind&key&since&limit - query events
  * GET /projections/:key - get latest collect event meta for key
  * POST /objects - CAS store objects, return hash
  * GET /objects/:hash - retrieve objects by hash
  * GET /health - service health check
- No more /tick, /start, /configure, /status endpoints
- Pure passive event store, no rules engine or Sigil integration
This commit is contained in:
2026-04-18 10:04:11 +00:00
parent 40a7804dfb
commit 5609cd36f4
5 changed files with 206 additions and 420 deletions
-300
View File
@@ -1,300 +0,0 @@
import type { D1Database, DurableObject, DurableObjectState } from '@cloudflare/workers-types';
import type { Sensed } from '@uncaged/pulse';
import { D1PulseStore } from './store-d1.js';
import { createSigilExecutor, type SigilExecutor } from './executor-sigil.js';
export interface PulseTickEnv {
DB: D1Database;
SIGIL_URL: string;
SIGIL_TOKEN?: string;
TICK_INTERVAL_MS?: string;
}
interface PulseSnapshot {
timestamp: number;
[key: string]: Sensed<unknown> | number;
}
interface PulseEffect {
type?: string;
kind?: string;
[key: string]: unknown;
}
export class PulseTick implements DurableObject {
private store: D1PulseStore;
private sigilExecutor: SigilExecutor;
constructor(private state: DurableObjectState, private env: PulseTickEnv) {
this.store = new D1PulseStore(env.DB);
this.sigilExecutor = createSigilExecutor(
env.SIGIL_URL,
env.SIGIL_TOKEN ?? 'default-token'
);
}
async fetch(request: Request): Promise<Response> {
const url = new URL(request.url);
if (request.method === 'POST' && url.pathname === '/tick') {
// Manual tick trigger
await this.performTick();
return new Response(JSON.stringify({ status: 'tick performed' }), {
headers: { 'Content-Type': 'application/json' }
});
}
if (request.method === 'GET' && url.pathname === '/status') {
try {
const status = await this.getStatus();
return new Response(JSON.stringify(status), {
headers: { 'Content-Type': 'application/json' }
});
} catch (e: any) {
return new Response(JSON.stringify({ error: e.message, stack: e.stack }), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
}
if (request.method === 'POST' && url.pathname === '/start') {
await this.startTicking();
return new Response(JSON.stringify({ status: 'ticking started' }), {
headers: { 'Content-Type': 'application/json' }
});
}
if (request.method === 'POST' && url.pathname === '/configure') {
const body = await request.json() as any;
await this.configure(body);
return new Response(JSON.stringify({ status: 'configured' }), {
headers: { 'Content-Type': 'application/json' }
});
}
return new Response('Not found', { status: 404 });
}
async alarm(): Promise<void> {
try {
await this.performTick();
// Get adaptive tick interval
const tickMs = await this.state.storage.get<number>('tickMs') ?? 30000;
const nextTickTime = Date.now() + tickMs;
await this.state.storage.setAlarm(nextTickTime);
} catch (error) {
console.error('Error during tick:', error);
// Still schedule next tick even if this one failed
const fallbackTickMs = 60000; // 1 minute fallback
const nextTickTime = Date.now() + fallbackTickMs;
await this.state.storage.setAlarm(nextTickTime);
}
}
private async startTicking(): Promise<void> {
// Cancel any existing alarm
await this.state.storage.deleteAlarm();
// Initialize default configuration if not set
const senseKeys = await this.state.storage.get<string[]>('senseKeys');
if (!senseKeys) {
await this.state.storage.put('senseKeys', ['cpu_usage', 'memory_usage']);
}
const tickMs = await this.state.storage.get<number>('tickMs') ?? 30000;
const firstTickTime = Date.now() + tickMs;
await this.state.storage.setAlarm(firstTickTime);
await this.state.storage.put('ticking', true);
await this.state.storage.put('lastTickAt', null);
}
private async configure(config: { senseKeys?: string[]; tickMs?: number; codeRev?: string }): Promise<void> {
if (config.senseKeys) {
await this.state.storage.put('senseKeys', config.senseKeys);
}
if (config.tickMs) {
await this.state.storage.put('tickMs', config.tickMs);
}
if (config.codeRev) {
await this.state.storage.put('codeRev', config.codeRev);
}
}
private async performTick(): Promise<void> {
const tickStartTime = Date.now();
console.log('Performing Pulse tick at', new Date(tickStartTime).toISOString());
try {
// 1. Get prev snapshot from DO storage
const prev = await this.state.storage.get<PulseSnapshot>('prev') ?? { timestamp: 0 };
const senseKeys = await this.state.storage.get<string[]>('senseKeys') ?? [];
// 2. Rebuild current snapshot from D1
const curr = await this.rebuildSnapshot(senseKeys);
// 3. Run pulse rules
const [effects, nextTickMs] = await this.pulse(prev, curr);
// 4. Write effect events to D1
for (const effect of effects) {
await this.store.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
meta: JSON.stringify(effect),
});
}
// 5. Execute effects through Sigil
for (const effect of effects) {
const type = effect.type || effect.kind || 'unknown';
try {
const result = await this.sigilExecutor.invoke(type, effect);
console.log(`Effect executed: ${type}`, result.status === 200 ? 'success' : 'failed');
} catch (e) {
console.error('Effect execution failed:', type, e);
}
}
// 6. Save curr as next prev
await this.state.storage.put('prev', curr);
// 7. Write tick event
await this.store.appendEvent({
occurredAt: Date.now(),
kind: 'pulse.tick',
meta: JSON.stringify({
tickMs: nextTickMs,
effectCount: effects.length,
durationMs: Date.now() - tickStartTime
}),
});
// 8. Adaptive tick + set next alarm time
const adaptedTickMs = await this.adaptTickMs(nextTickMs, effects.length);
await this.state.storage.put('tickMs', adaptedTickMs);
await this.state.storage.put('lastTickAt', tickStartTime);
console.log(`Pulse tick completed: ${effects.length} effects, next tick in ${adaptedTickMs}ms`);
} catch (error) {
console.error('Pulse tick error:', error);
// Record error but don't throw - let alarm reschedule
await this.store.appendEvent({
occurredAt: Date.now(),
kind: 'pulse.error',
meta: JSON.stringify({
error: error instanceof Error ? error.message : String(error),
tickStartTime
}),
});
}
}
private async rebuildSnapshot(senseKeys: string[]): Promise<PulseSnapshot> {
const snapshot: PulseSnapshot = { timestamp: Date.now() };
for (const key of senseKeys) {
// Get the latest collect event for this sense key
const collectEvent = await this.store.getLatest('collect', key);
if (collectEvent?.meta) {
try {
const data = JSON.parse(collectEvent.meta);
snapshot[key] = {
data,
refreshedAt: collectEvent.occurredAt,
} as Sensed<unknown>;
} catch (e) {
console.warn(`Failed to parse collect event for ${key}:`, e);
}
}
}
return snapshot;
}
private async pulse(prev: PulseSnapshot, curr: PulseSnapshot): Promise<[PulseEffect[], number]> {
const effects: PulseEffect[] = [];
let nextTickMs = 30000; // Default 30s
// Built-in demo rule: detect CPU usage spikes
if (curr.cpu_usage && prev.cpu_usage) {
const currCpu = (curr.cpu_usage as Sensed<number>).data;
const prevCpu = (prev.cpu_usage as Sensed<number>).data;
if (currCpu > 80 && prevCpu <= 80) {
effects.push({
type: 'cpu-alert',
kind: 'alert',
message: `CPU usage spiked to ${currCpu}%`,
threshold: 80,
current: currCpu,
previous: prevCpu,
});
}
}
// Demo rule: send periodic heartbeat every 5 minutes if no activity
const lastHeartbeat = await this.store.getLatest('effect', 'heartbeat');
const heartbeatInterval = 5 * 60 * 1000; // 5 minutes
if (!lastHeartbeat || (Date.now() - lastHeartbeat.occurredAt > heartbeatInterval)) {
effects.push({
type: 'heartbeat',
kind: 'system',
timestamp: Date.now(),
status: 'alive',
});
}
return [effects, nextTickMs];
}
private async adaptTickMs(suggestedTickMs: number, effectCount: number): Promise<number> {
const baseTickMs = 30000; // 30s
const maxTickMs = 5 * 60 * 1000; // 5 minutes
if (effectCount > 0) {
// Active: use base tick rate
return baseTickMs;
} else {
// Idle: exponential backoff
const currentTickMs = await this.state.storage.get<number>('tickMs') ?? baseTickMs;
const backoffTickMs = Math.min(currentTickMs * 2, maxTickMs);
// Use rule's suggestion if provided and reasonable
if (suggestedTickMs !== baseTickMs && suggestedTickMs > 0) {
return Math.min(suggestedTickMs, maxTickMs);
}
return backoffTickMs;
}
}
private async getStatus(): Promise<any> {
const isTicking = await this.state.storage.get<boolean>('ticking') ?? false;
const currentAlarm = await this.state.storage.getAlarm();
const tickMs = await this.state.storage.get<number>('tickMs') ?? 30000;
const lastTickAt = await this.state.storage.get<number>('lastTickAt');
const senseKeys = await this.state.storage.get<string[]>('senseKeys') ?? [];
const codeRev = await this.state.storage.get<string>('codeRev');
// Get recent effect count
const recentEffects = await this.store.queryByKind('effect', { since: Date.now() - 60000 });
return {
isTicking,
nextAlarmAt: currentAlarm ? new Date(currentAlarm).toISOString() : null,
tickMs,
lastTickAt: lastTickAt ? new Date(lastTickAt).toISOString() : null,
senseKeys,
codeRev,
effectCount: recentEffects.length,
hasEvents: await this.store.hasEvents(),
};
}
}
-20
View File
@@ -1,20 +0,0 @@
export interface SigilExecutor {
invoke(capability: string, input: unknown): Promise<{ status: number; payload: unknown }>;
}
export function createSigilExecutor(sigilUrl: string, token: string): SigilExecutor {
return {
async invoke(capability, input) {
const res = await fetch(`${sigilUrl}/_api/invoke`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`,
},
body: JSON.stringify({ capability, params: input }),
});
const data = await res.json();
return { status: res.status, payload: data };
}
};
}
+198 -87
View File
@@ -1,105 +1,216 @@
import type { D1Database, DurableObjectNamespace } from '@cloudflare/workers-types';
import type { D1Database } from '@cloudflare/workers-types';
import { D1PulseStore } from './store-d1.js';
import { PulseTick } from './durable-tick.js';
export { PulseTick };
export interface Env {
DB: D1Database;
PULSE_TICK: DurableObjectNamespace;
SIGIL_URL: string;
SIGIL_TOKEN?: string;
}
interface EventInput {
kind: string;
key?: string;
meta?: string;
codeRev?: string;
}
interface EventResponse {
id: number;
occurredAt: number;
}
interface ObjectResponse {
hash: string;
}
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const path = url.pathname;
const store = new D1PulseStore(env.DB);
// Health check
if (request.method === 'GET' && path === '/health') {
try {
// Health check
if (request.method === 'GET' && path === '/health') {
return new Response(JSON.stringify({
status: 'healthy',
timestamp: new Date().toISOString(),
service: 'pulseflare',
version: '2.0.0-passive-eventstore'
}), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
}
// POST /events - Push a new event
if (request.method === 'POST' && path === '/events') {
const eventData: EventInput = await request.json();
if (!eventData.kind) {
return new Response(JSON.stringify({ error: 'kind is required' }), {
status: 400,
headers: { 'Content-Type': 'application/json' }
});
}
const event = await store.appendEvent({
occurredAt: Date.now(),
kind: eventData.kind,
key: eventData.key,
meta: eventData.meta,
codeRev: eventData.codeRev
});
const response: EventResponse = {
id: event.id,
occurredAt: event.occurredAt
};
return new Response(JSON.stringify(response), {
headers: { 'Content-Type': 'application/json' },
status: 201
});
}
// GET /events - Query events with filtering
if (request.method === 'GET' && path === '/events') {
const kind = url.searchParams.get('kind');
const key = url.searchParams.get('key');
const since = url.searchParams.get('since');
const limit = parseInt(url.searchParams.get('limit') ?? '20');
let events;
if (kind) {
// Query by kind with optional filters
const queryOpts: any = {
limit: Math.min(Math.max(1, limit), 1000) // Cap at 1000
};
if (key) queryOpts.key = key;
if (since) queryOpts.since = parseInt(since);
events = await store.queryByKind(kind, queryOpts);
} else {
// Get recent events
events = await store.getRecent(Math.min(Math.max(1, limit), 1000));
}
return new Response(JSON.stringify({
events,
count: events.length,
timestamp: new Date().toISOString()
}), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
}
// GET /projections/:key - Get projection for a key
if (request.method === 'GET' && path.startsWith('/projections/')) {
const key = path.split('/projections/')[1];
if (!key) {
return new Response(JSON.stringify({ error: 'key is required' }), {
status: 400,
headers: { 'Content-Type': 'application/json' }
});
}
// Get the latest 'collect' event for this key
const latestCollectEvent = await store.getLatest('collect', key);
if (!latestCollectEvent) {
return new Response(JSON.stringify({
key,
meta: null,
message: 'No collect event found for this key'
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}
return new Response(JSON.stringify({
key,
meta: latestCollectEvent.meta ? JSON.parse(latestCollectEvent.meta) : null,
eventId: latestCollectEvent.id,
occurredAt: latestCollectEvent.occurredAt
}), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
}
// POST /objects - Store object with CAS
if (request.method === 'POST' && path === '/objects') {
const objectData = await request.json();
const hash = await store.putObject(objectData);
const response: ObjectResponse = { hash };
return new Response(JSON.stringify(response), {
headers: { 'Content-Type': 'application/json' },
status: 201
});
}
// GET /objects/:hash - Get object by hash
if (request.method === 'GET' && path.startsWith('/objects/')) {
const hash = path.split('/objects/')[1];
if (!hash) {
return new Response(JSON.stringify({ error: 'hash is required' }), {
status: 400,
headers: { 'Content-Type': 'application/json' }
});
}
const object = await store.getObject(hash);
if (object === null) {
return new Response(JSON.stringify({
error: 'Object not found',
hash
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}
return new Response(JSON.stringify(object), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
}
// Default 404
return new Response(JSON.stringify({
status: 'healthy',
timestamp: new Date().toISOString(),
service: 'pulseflare'
error: 'Not found',
path,
method: request.method,
available_endpoints: [
'GET /health',
'POST /events',
'GET /events?kind=xxx&key=xxx&since=xxx&limit=20',
'GET /projections/:key',
'POST /objects',
'GET /objects/:hash'
]
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
} catch (error) {
console.error('Request error:', error);
return new Response(JSON.stringify({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error'
}), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
}
// Manual tick trigger
if (request.method === 'POST' && path === '/tick') {
const durableObjectId = env.PULSE_TICK.idFromName('default');
const durableObject = env.PULSE_TICK.get(durableObjectId);
return await durableObject.fetch(new Request(request.url.replace(path, '/tick'), {
method: 'POST'
}));
}
// Get events
if (request.method === 'GET' && path === '/events') {
const store = new D1PulseStore(env.DB);
const limit = parseInt(url.searchParams.get('limit') ?? '20');
const events = await store.getRecent(limit);
return new Response(JSON.stringify({
events,
count: events.length,
timestamp: new Date().toISOString()
}), {
headers: { 'Content-Type': 'application/json' }
});
}
// Get status (from Durable Object)
if (request.method === 'GET' && path === '/status') {
const durableObjectId = env.PULSE_TICK.idFromName('default');
const durableObject = env.PULSE_TICK.get(durableObjectId);
return await durableObject.fetch(new Request(request.url.replace(path, '/status'), {
method: 'GET'
}));
}
// Start ticking
if (request.method === 'POST' && path === '/start') {
const durableObjectId = env.PULSE_TICK.idFromName('default');
const durableObject = env.PULSE_TICK.get(durableObjectId);
return await durableObject.fetch(new Request(request.url.replace(path, '/start'), {
method: 'POST'
}));
}
// Configure sense keys and rules
if (request.method === 'POST' && path === '/configure') {
const durableObjectId = env.PULSE_TICK.idFromName('default');
const durableObject = env.PULSE_TICK.get(durableObjectId);
return await durableObject.fetch(new Request(request.url.replace(path, '/configure'), {
method: 'POST',
body: request.body,
headers: request.headers
}));
}
// Default 404
return new Response(JSON.stringify({
error: 'Not found',
path,
method: request.method,
available_endpoints: [
'GET /health',
'POST /tick',
'GET /events',
'GET /status',
'POST /start',
'POST /configure'
]
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
},
};
+7
View File
@@ -30,6 +30,13 @@ CREATE TABLE IF NOT EXISTS projections (
last_event_id INTEGER NOT NULL DEFAULT 0
);
-- CAS Objects table (Content-addressable storage)
CREATE TABLE IF NOT EXISTS cas_objects (
hash TEXT PRIMARY KEY,
data TEXT NOT NULL,
created_at INTEGER NOT NULL
);
-- Defs tables (from defs.js)
CREATE TABLE IF NOT EXISTS event_defs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
+1 -13
View File
@@ -5,16 +5,4 @@ compatibility_date = "2024-09-25"
[[d1_databases]]
binding = "DB"
database_name = "pulseflare-db"
database_id = "dee1eee4-99e7-4aa3-85ec-dd376d288c9e"
[durable_objects]
bindings = [
{ name = "PULSE_TICK", class_name = "PulseTick" }
]
[[migrations]]
tag = "v1"
new_classes = ["PulseTick"]
[vars]
SIGIL_URL = "https://sigil.shazhou.workers.dev"
database_id = "dee1eee4-99e7-4aa3-85ec-dd376d288c9e"