feat(pulseflare): complete D1 event store Worker
CI / test (push) Has been cancelled

- Add Bearer token auth (API_TOKEN secret)
- Add POST /events/batch (max 100)
- Add GET /events?after_id= for cursor-based pagination
- Add POST/GET /instances for object instances
- Add POST /maintenance/archive for event cleanup
- Add D1 migration (0001_init)
- D1PulseStore fully implements PulseStore interface
- Type checks pass
This commit is contained in:
2026-04-20 01:38:03 +00:00
parent 7993ecc6d6
commit a7fd543648
4 changed files with 222 additions and 159 deletions
@@ -0,0 +1,63 @@
-- Events table
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
occurred_at INTEGER NOT NULL,
kind TEXT NOT NULL,
key TEXT,
hash TEXT,
code_rev TEXT,
meta TEXT,
object_id INTEGER REFERENCES objects(id)
);
CREATE INDEX IF NOT EXISTS idx_occurred ON events(occurred_at);
CREATE INDEX IF NOT EXISTS idx_kind_key ON events(kind, key, occurred_at);
CREATE INDEX IF NOT EXISTS idx_code_rev ON events(code_rev, occurred_at);
-- Objects table
CREATE TABLE IF NOT EXISTS objects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
object_type TEXT NOT NULL,
external_id TEXT,
created_at INTEGER NOT NULL,
code_rev TEXT NOT NULL,
UNIQUE(object_type, external_id)
);
-- Projections table (from projection-engine.js)
CREATE TABLE IF NOT EXISTS projections (
name TEXT PRIMARY KEY,
last_event_id INTEGER NOT NULL DEFAULT 0
);
-- CAS Objects table (Content-addressable storage)
CREATE TABLE IF NOT EXISTS cas_objects (
hash TEXT PRIMARY KEY,
data TEXT NOT NULL,
created_at INTEGER NOT NULL
);
-- Defs tables (from defs.js)
CREATE TABLE IF NOT EXISTS event_defs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code_hash TEXT NOT NULL UNIQUE,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS rule_defs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code_hash TEXT NOT NULL UNIQUE,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS executor_defs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code_hash TEXT NOT NULL UNIQUE,
data TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS projection_defs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
code_hash TEXT NOT NULL UNIQUE,
data TEXT NOT NULL
);
+154 -155
View File
@@ -3,214 +3,213 @@ import { D1PulseStore } from './store-d1.js';
export interface Env {
DB: D1Database;
API_TOKEN: string; // set via wrangler secret
}
interface EventInput {
kind: string;
key?: string;
meta?: string;
codeRev?: string;
// ── Helpers ─────────────────────────────────────────────────────
function json(data: unknown, status = 200): Response {
return new Response(JSON.stringify(data), {
status,
headers: { 'Content-Type': 'application/json' },
});
}
interface EventResponse {
id: number;
occurredAt: number;
function authorize(request: Request, env: Env): Response | null {
const token = request.headers.get('Authorization')?.replace('Bearer ', '');
if (!env.API_TOKEN) return null; // no token configured = open (dev mode)
if (token === env.API_TOKEN) return null;
return json({ error: 'Unauthorized' }, 401);
}
interface ObjectResponse {
hash: string;
}
// ── Router ──────────────────────────────────────────────────────
export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const path = url.pathname;
// Health — no auth required
if (request.method === 'GET' && path === '/health') {
return json({
status: 'healthy',
timestamp: new Date().toISOString(),
service: 'pulseflare',
version: '0.2.0',
});
}
// Auth check for all other routes
const authErr = authorize(request, env);
if (authErr) return authErr;
const store = new D1PulseStore(env.DB);
try {
// Health check
if (request.method === 'GET' && path === '/health') {
return new Response(JSON.stringify({
status: 'healthy',
timestamp: new Date().toISOString(),
service: 'pulseflare',
version: '2.0.0-passive-eventstore'
}), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
}
// ── Events ──────────────────────────────────────────────
// POST /events - Push a new event
// POST /events — single event
if (request.method === 'POST' && path === '/events') {
const eventData: EventInput = await request.json();
if (!eventData.kind) {
return new Response(JSON.stringify({ error: 'kind is required' }), {
status: 400,
headers: { 'Content-Type': 'application/json' }
});
}
const body = await request.json() as any;
if (!body.kind) return json({ error: 'kind is required' }, 400);
const event = await store.appendEvent({
occurredAt: Date.now(),
kind: eventData.kind,
key: eventData.key,
meta: eventData.meta,
codeRev: eventData.codeRev
occurredAt: body.occurredAt ?? Date.now(),
kind: body.kind,
key: body.key,
meta: body.meta,
codeRev: body.codeRev,
});
const response: EventResponse = {
id: event.id,
occurredAt: event.occurredAt
};
return new Response(JSON.stringify(response), {
headers: { 'Content-Type': 'application/json' },
status: 201
});
return json({ id: event.id, occurredAt: event.occurredAt }, 201);
}
// GET /events - Query events with filtering
// POST /events/batch — multiple events
if (request.method === 'POST' && path === '/events/batch') {
const body = await request.json() as any;
if (!Array.isArray(body.events) || body.events.length === 0) {
return json({ error: 'events array is required' }, 400);
}
if (body.events.length > 100) {
return json({ error: 'max 100 events per batch' }, 400);
}
const now = Date.now();
const events = await store.appendEvents(
body.events.map((e: any) => ({
occurredAt: e.occurredAt ?? now,
kind: e.kind,
key: e.key,
meta: e.meta,
codeRev: e.codeRev,
})),
);
return json({
events: events.map(e => ({ id: e.id, occurredAt: e.occurredAt })),
count: events.length,
}, 201);
}
// GET /events — query
if (request.method === 'GET' && path === '/events') {
const kind = url.searchParams.get('kind');
const key = url.searchParams.get('key');
const since = url.searchParams.get('since');
const limit = parseInt(url.searchParams.get('limit') ?? '20');
const afterId = url.searchParams.get('after_id');
const limit = Math.min(Math.max(1, parseInt(url.searchParams.get('limit') ?? '20')), 1000);
let events;
if (kind) {
// Query by kind with optional filters
const queryOpts: any = {
limit: Math.min(Math.max(1, limit), 1000) // Cap at 1000
};
if (key) queryOpts.key = key;
if (since) queryOpts.since = parseInt(since);
events = await store.queryByKind(kind, queryOpts);
if (afterId) {
events = await store.getAfter(parseInt(afterId), {
kind: kind ?? undefined,
key: key ?? undefined,
});
} else if (kind) {
events = await store.queryByKind(kind, {
key: key ?? undefined,
since: since ? parseInt(since) : undefined,
limit,
});
} else {
// Get recent events
events = await store.getRecent(Math.min(Math.max(1, limit), 1000));
events = await store.getRecent(limit);
}
return new Response(JSON.stringify({
events,
count: events.length,
timestamp: new Date().toISOString()
}), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
return json({ events, count: events.length });
}
// GET /projections/:key - Get projection for a key
// ── Projections (latest collect event) ──────────────────
if (request.method === 'GET' && path.startsWith('/projections/')) {
const key = path.split('/projections/')[1];
if (!key) {
return new Response(JSON.stringify({ error: 'key is required' }), {
status: 400,
headers: { 'Content-Type': 'application/json' }
});
}
const key = path.slice('/projections/'.length);
if (!key) return json({ error: 'key is required' }, 400);
// Get the latest 'collect' event for this key
const latestCollectEvent = await store.getLatest('collect', key);
if (!latestCollectEvent) {
return new Response(JSON.stringify({
key,
meta: null,
message: 'No collect event found for this key'
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}
const latest = await store.getLatest('collect', key);
if (!latest) return json({ key, meta: null, message: 'No collect event found' }, 404);
return new Response(JSON.stringify({
return json({
key,
meta: latestCollectEvent.meta ? JSON.parse(latestCollectEvent.meta) : null,
eventId: latestCollectEvent.id,
occurredAt: latestCollectEvent.occurredAt
}), {
headers: { 'Content-Type': 'application/json' },
status: 200
meta: latest.meta ? JSON.parse(latest.meta) : null,
eventId: latest.id,
occurredAt: latest.occurredAt,
});
}
// POST /objects - Store object with CAS
// ── CAS Objects ─────────────────────────────────────────
if (request.method === 'POST' && path === '/objects') {
const objectData = await request.json();
const hash = await store.putObject(objectData);
const response: ObjectResponse = { hash };
return new Response(JSON.stringify(response), {
headers: { 'Content-Type': 'application/json' },
status: 201
});
const data = await request.json();
const hash = await store.putObject(data);
return json({ hash }, 201);
}
// GET /objects/:hash - Get object by hash
if (request.method === 'GET' && path.startsWith('/objects/')) {
const hash = path.split('/objects/')[1];
if (!hash) {
return new Response(JSON.stringify({ error: 'hash is required' }), {
status: 400,
headers: { 'Content-Type': 'application/json' }
});
}
const hash = path.slice('/objects/'.length);
if (!hash) return json({ error: 'hash is required' }, 400);
const object = await store.getObject(hash);
if (object === null) {
return new Response(JSON.stringify({
error: 'Object not found',
hash
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
}
return new Response(JSON.stringify(object), {
headers: { 'Content-Type': 'application/json' },
status: 200
});
const obj = await store.getObject(hash);
if (obj === null) return json({ error: 'Not found', hash }, 404);
return json(obj);
}
// Default 404
return new Response(JSON.stringify({
// ── Object Instances ────────────────────────────────────
if (request.method === 'POST' && path === '/instances') {
const body = await request.json() as any;
if (!body.objectType || !body.codeRev) {
return json({ error: 'objectType and codeRev are required' }, 400);
}
const id = await store.createObject({
objectType: body.objectType,
externalId: body.externalId,
codeRev: body.codeRev,
});
return json({ id }, 201);
}
if (request.method === 'GET' && path.startsWith('/instances/')) {
const id = parseInt(path.slice('/instances/'.length));
if (isNaN(id)) return json({ error: 'invalid id' }, 400);
const instance = await store.getObjectInstance(id);
if (!instance) return json({ error: 'Not found' }, 404);
return json(instance);
}
// ── Maintenance ─────────────────────────────────────────
if (request.method === 'POST' && path === '/maintenance/archive') {
const body = await request.json() as any;
const olderThan = body.olderThan ?? Date.now() - 30 * 24 * 3600 * 1000; // default 30d
const deleted = await store.archiveEvents(olderThan);
return json({ deleted, olderThan });
}
// ── 404 ─────────────────────────────────────────────────
return json({
error: 'Not found',
path,
method: request.method,
available_endpoints: [
'GET /health',
endpoints: [
'GET /health',
'POST /events',
'GET /events?kind=xxx&key=xxx&since=xxx&limit=20',
'GET /projections/:key',
'POST /events/batch',
'GET /events?kind=&key=&since=&after_id=&limit=',
'GET /projections/:key',
'POST /objects',
'GET /objects/:hash'
]
}), {
status: 404,
headers: { 'Content-Type': 'application/json' }
});
'GET /objects/:hash',
'POST /instances',
'GET /instances/:id',
'POST /maintenance/archive',
],
}, 404);
} catch (error) {
console.error('Request error:', error);
return new Response(JSON.stringify({
return json({
error: 'Internal server error',
message: error instanceof Error ? error.message : 'Unknown error'
}), {
status: 500,
headers: { 'Content-Type': 'application/json' }
});
message: error instanceof Error ? error.message : 'Unknown error',
}, 500);
}
},
};
};
+4
View File
@@ -265,6 +265,10 @@ export class D1PulseStore implements PulseStore {
}
}
getDatabase(): never {
throw new Error('getDatabase() is not supported on D1 — use D1 bindings directly');
}
async close(): Promise<void> {
// D1 doesn't need explicit closing
}
+1 -4
View File
@@ -6,7 +6,4 @@ compatibility_date = "2024-09-25"
binding = "DB"
database_name = "pulseflare-db"
database_id = "dee1eee4-99e7-4aa3-85ec-dd376d288c9e"
[[migrations]]
tag = "v2"
deleted_classes = ["PulseTick"]
migrations_dir = "migrations"