docs: add Khala MVP implementation plan

This commit is contained in:
2026-04-25 04:30:23 +00:00
parent ce20d73ab6
commit 0d0b139890
+558
View File
@@ -0,0 +1,558 @@
# Khala MVP Implementation Plan
> **For Hermes:** Use subagent-driven-development skill to implement this plan task-by-task.
**Goal:** Build Khala — a Cloudflare Workers + D1 + Durable Objects cloud workflow orchestrator that lets agents coordinate multi-agent workflows as a stateless worker pool.
**Architecture:** Khala is a CF Worker that receives events from agents via REST API. Each workflow thread runs in a Durable Object with a JSONata moderator. Agents poll a task queue for unclaimed turns, execute locally, and POST results back. Thread messages are stored in D1.
**Tech Stack:** Cloudflare Workers, D1 (SQLite), Durable Objects, Hono (routing), JSONata (moderator engine), TypeScript
---
## Phase 0: Project Scaffolding
### Task 0.1: Create khala package
**Objective:** Set up the `packages/khala` CF Worker project with wrangler, Hono, and D1 binding.
**Files:**
- Create: `packages/khala/package.json`
- Create: `packages/khala/wrangler.toml`
- Create: `packages/khala/tsconfig.json`
- Create: `packages/khala/src/index.ts`
**Step 1: Create package.json**
```json
{
"name": "@uncaged/khala",
"version": "0.0.1",
"private": true,
"scripts": {
"dev": "wrangler dev",
"deploy": "wrangler deploy",
"test": "vitest run"
},
"dependencies": {
"hono": "^4.7.0",
"jsonata": "^2.0.5"
},
"devDependencies": {
"@cloudflare/workers-types": "^4.20250410.0",
"vitest": "^4.1.5",
"wrangler": "^4.14.0"
}
}
```
**Step 2: Create wrangler.toml**
```toml
name = "khala"
main = "src/index.ts"
compatibility_date = "2025-04-01"
[[d1_databases]]
binding = "DB"
database_name = "khala"
database_id = "placeholder"
[durable_objects]
bindings = [
{ name = "THREAD", class_name = "ThreadDO" }
]
[[migrations]]
tag = "v1"
new_classes = ["ThreadDO"]
```
**Step 3: Create minimal Hono entrypoint**
```typescript
// src/index.ts
import { Hono } from "hono";
export type Env = {
DB: D1Database;
THREAD: DurableObjectNamespace;
};
const app = new Hono<{ Bindings: Env }>();
app.get("/health", (c) => c.json({ ok: true }));
export default app;
```
**Step 4: Create tsconfig.json**
```json
{
"compilerOptions": {
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "bundler",
"strict": true,
"noUncheckedIndexedAccess": true,
"types": ["@cloudflare/workers-types"]
},
"include": ["src"]
}
```
**Step 5: Install dependencies and verify**
```bash
cd packages/khala && pnpm install
pnpm exec wrangler types # generates worker-configuration.d.ts
```
**Step 6: Commit**
```bash
git add packages/khala/
git commit -m "chore(khala): scaffold CF Worker package"
```
---
## Phase 1: D1 Schema & Data Layer
### Task 1.1: Create D1 migration — core tables
**Objective:** Define D1 schema for agents, threads, messages, and task queue.
**Files:**
- Create: `packages/khala/migrations/0001_initial.sql`
**SQL:**
```sql
-- Agent registry
CREATE TABLE agents (
id TEXT PRIMARY KEY, -- agent name (e.g. "tuanzi")
token_hash TEXT NOT NULL, -- bcrypt/sha256 hash of agent token
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Workflow threads
CREATE TABLE threads (
id TEXT PRIMARY KEY, -- ulid
workflow TEXT NOT NULL, -- workflow name (e.g. "code-review")
status TEXT NOT NULL DEFAULT 'active', -- active | completed | failed
initiator TEXT NOT NULL, -- agent id or external caller
result TEXT, -- final result JSON (set on completion)
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
-- Thread messages (append-only)
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
thread_id TEXT NOT NULL REFERENCES threads(id),
role TEXT NOT NULL, -- role name or "__moderator__"
content TEXT NOT NULL,
meta TEXT, -- JSON
step INTEGER NOT NULL, -- 0-indexed step number
agent_id TEXT, -- which agent executed this turn
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_messages_thread ON messages(thread_id, step);
-- Task queue
CREATE TABLE tasks (
id TEXT PRIMARY KEY, -- ulid
thread_id TEXT NOT NULL REFERENCES threads(id),
role TEXT NOT NULL,
instruction TEXT NOT NULL, -- turn instruction from moderator
status TEXT NOT NULL DEFAULT 'open', -- open | claimed | completed | expired
claim_id TEXT, -- set when claimed
claimed_by TEXT, -- agent id
claimed_at TEXT,
timeout_seconds INTEGER NOT NULL DEFAULT 300,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX idx_tasks_status ON tasks(status, created_at);
```
**Step 1: Write the migration file**
**Step 2: Apply locally**
```bash
cd packages/khala
pnpm exec wrangler d1 migrations apply khala --local
```
**Step 3: Commit**
```bash
git add packages/khala/migrations/
git commit -m "feat(khala): D1 schema — agents, threads, messages, tasks"
```
### Task 1.2: Create data access functions
**Objective:** Type-safe D1 query functions for all tables.
**Files:**
- Create: `packages/khala/src/db.ts`
- Create: `packages/khala/src/types.ts`
**types.ts:**
```typescript
export type Agent = {
id: string;
token_hash: string;
created_at: string;
};
export type Thread = {
id: string;
workflow: string;
status: "active" | "completed" | "failed";
initiator: string;
result: string | null;
created_at: string;
updated_at: string;
};
export type Message = {
id: number;
thread_id: string;
role: string;
content: string;
meta: string | null;
step: number;
agent_id: string | null;
created_at: string;
};
export type Task = {
id: string;
thread_id: string;
role: string;
instruction: string;
status: "open" | "claimed" | "completed" | "expired";
claim_id: string | null;
claimed_by: string | null;
claimed_at: string | null;
timeout_seconds: number;
created_at: string;
};
```
**db.ts:** Query functions — `createThread`, `appendMessage`, `createTask`, `claimTask`, `completeTask`, `getOpenTasks`, `getThreadMessages`, etc. Each is a plain function taking `D1Database` as first arg.
**Step 1: Write types.ts**
**Step 2: Write db.ts with all query functions**
Key functions:
- `createThread(db, workflow, initiator) → Thread`
- `appendMessage(db, threadId, role, content, meta, step, agentId) → Message`
- `createTask(db, threadId, role, instruction, timeoutSeconds) → Task`
- `claimTask(db, taskId, agentId) → { ok: true, claimId } | { ok: false }`
- `completeTask(db, taskId, claimId) → boolean`
- `expireTimedOutTasks(db) → number` (count expired)
- `getOpenTasks(db, limit) → Task[]`
- `getThreadMessages(db, threadId, opts?) → Message[]` (opts: role, since, step, last)
Use `ulid()` for IDs (add `ulidx` dependency).
**Step 3: Commit**
```bash
git add packages/khala/src/
git commit -m "feat(khala): data access layer — types and D1 queries"
```
---
## Phase 2: Auth & Agent Registry
### Task 2.1: Agent auth middleware
**Objective:** Bearer token auth for agents. Hash-based token verification.
**Files:**
- Create: `packages/khala/src/auth.ts`
- Modify: `packages/khala/src/index.ts`
**auth.ts:**
```typescript
import { createMiddleware } from "hono/factory";
import type { Env } from "./index.ts";
export const agentAuth = createMiddleware<{ Bindings: Env }>(async (c, next) => {
const header = c.req.header("Authorization");
if (!header?.startsWith("Bearer ")) {
return c.json({ error: "missing token" }, 401);
}
const token = header.slice(7);
const hash = await sha256(token);
const agent = await c.env.DB.prepare(
"SELECT id FROM agents WHERE token_hash = ?"
).bind(hash).first<{ id: string }>();
if (!agent) {
return c.json({ error: "invalid token" }, 401);
}
c.set("agentId", agent.id);
await next();
});
async function sha256(input: string): Promise<string> {
const data = new TextEncoder().encode(input);
const buf = await crypto.subtle.digest("SHA-256", data);
return [...new Uint8Array(buf)].map(b => b.toString(16).padStart(2, "0")).join("");
}
```
### Task 2.2: Admin routes for agent management
**Objective:** Admin API to register/remove agents (protected by admin secret in env).
**Files:**
- Create: `packages/khala/src/routes/admin.ts`
**Routes:**
- `POST /admin/agents` — body `{ id, token }` → hash token, insert agent
- `DELETE /admin/agents/:id` — remove agent
- `GET /admin/agents` — list agents (no tokens)
Protected by `ADMIN_SECRET` env var check.
**Commit:**
```bash
git commit -m "feat(khala): agent auth middleware and admin API"
```
---
## Phase 3: Workflow Engine (Durable Object)
### Task 3.1: Workflow registry
**Objective:** Load workflow definitions from a simple in-memory registry (hardcoded for MVP, later from D1 or KV).
**Files:**
- Create: `packages/khala/src/workflows.ts`
**Structure:**
```typescript
export type CloudRole = {
prompt: string;
};
export type CloudWorkflowDef = {
name: string;
roles: Record<string, CloudRole>;
moderator: string; // JSONata expression
};
// For MVP: hardcoded registry
const registry = new Map<string, CloudWorkflowDef>();
export function registerWorkflow(def: CloudWorkflowDef): void {
registry.set(def.name, def);
}
export function getWorkflow(name: string): CloudWorkflowDef | null {
return registry.get(name) ?? null;
}
```
### Task 3.2: ThreadDO — Durable Object
**Objective:** Each workflow thread runs as a Durable Object. The DO manages the moderator state machine, creates tasks, and processes responses.
**Files:**
- Create: `packages/khala/src/thread-do.ts`
**Key behavior:**
1. `POST /start` — Initialize thread: save workflow def, run moderator to get first turn, create task
2. `POST /response` — Agent posts turn result: validate claim_id, append message, run moderator for next turn or END
3. `GET /messages` — Query thread messages with filters
**Moderator execution:**
- Build context from messages (start frame + role steps)
- Evaluate JSONata expression → returns `{ role: "reviewer" }` or `{ role: "__end__" }`
- If not END, create new task in queue
- If END, mark thread completed, set result
**Important:** The DO holds workflow state in memory during the request but persists everything to D1. The DO itself uses `ctx.storage` only for the thread ID mapping.
### Task 3.3: Wire ThreadDO into worker
**Objective:** Export the DO class, add routes that proxy to the DO.
**Files:**
- Modify: `packages/khala/src/index.ts`
**Routes:**
- `POST /workflows/:name/threads` — Create thread → instantiate DO → start
- `POST /threads/:id/response` — Forward to DO
- `GET /threads/:id/messages` — Forward to DO (or query D1 directly)
- `GET /threads/:id` — Thread status
**Commit:**
```bash
git commit -m "feat(khala): ThreadDO workflow engine with JSONata moderator"
```
---
## Phase 4: Task Queue API
### Task 4.1: Task queue endpoints
**Objective:** Agents poll for work and claim tasks.
**Files:**
- Create: `packages/khala/src/routes/tasks.ts`
**Routes (all require agentAuth):**
- `GET /tasks` — List open tasks (optionally filter by workflow)
- `POST /tasks/:id/claim` — Claim a task → returns `{ claimId, role, instruction, threadId }`
- `POST /tasks/:id/release` — Release a claimed task back to queue
**Claim logic:**
- Atomic: UPDATE ... WHERE status = 'open' → if rowsWritten = 0, already claimed
- Returns claim_id (ulid) for optimistic lock on response
### Task 4.2: Task timeout sweep
**Objective:** Periodically expire timed-out tasks back to open.
**Implementation:** CF Worker Cron Trigger (every 1 minute) that calls `expireTimedOutTasks(db)`.
**Files:**
- Modify: `packages/khala/src/index.ts` (add scheduled handler)
- Modify: `packages/khala/wrangler.toml` (add cron trigger)
```toml
[triggers]
crons = ["* * * * *"]
```
**Commit:**
```bash
git commit -m "feat(khala): task queue API with claim/release and timeout sweep"
```
---
## Phase 5: Agent-Side Integration (KhalaSense)
### Task 5.1: Khala client library
**Objective:** A small client that agents use to interact with Khala.
**Files:**
- Create: `packages/core/src/khala-client.ts`
**API:**
```typescript
export type KhalaClientConfig = {
baseUrl: string;
token: string;
};
export function createKhalaClient(config: KhalaClientConfig) {
return {
pollTasks: () => GET /tasks,
claimTask: (taskId) => POST /tasks/:id/claim,
submitResponse: (threadId, content, meta, claimId) => POST /threads/:id/response,
getMessages: (threadId, opts) => GET /threads/:id/messages,
};
}
```
### Task 5.2: KhalaSense
**Objective:** A Sense that polls Khala for open tasks and emits signals.
**Files:**
- Create: `packages/daemon/src/senses/khala-sense.ts`
**Behavior:**
- `compute()`: poll `/tasks`, if tasks available → return task info as signal value
- Reflex picks up signal → triggers a workflow that executes the turn locally
- After local execution → POST response back to Khala
**This task depends on understanding the existing Sense pattern in daemon. Check `packages/daemon/src/` for examples.**
**Commit:**
```bash
git commit -m "feat(khala): KhalaSense — agent-side polling and integration"
```
---
## Phase 6: End-to-End Demo
### Task 6.1: Register a demo workflow
**Objective:** Register a simple 2-role "ping-pong" workflow for testing.
**Files:**
- Create: `packages/khala/src/workflows/ping-pong.ts`
**Workflow:**
- Roles: `pinger` (says ping), `ponger` (says pong)
- Moderator: alternate pinger/ponger for 3 rounds then END
- JSONata: `steps.length >= 6 ? { "role": "__end__" } : steps.length % 2 = 0 ? { "role": "pinger" } : { "role": "ponger" }`
### Task 6.2: Integration test
**Objective:** Test the full flow with miniflare.
**Files:**
- Create: `packages/khala/src/__tests__/e2e.test.ts`
**Test:**
1. Create thread via API
2. Poll tasks → get first task
3. Claim task
4. POST response
5. Poll again → get next task
6. Repeat until workflow completes
7. Verify thread status = completed and all messages present
**Commit:**
```bash
git commit -m "feat(khala): ping-pong demo workflow and e2e test"
```
---
## Summary
| Phase | Tasks | Description |
|-------|-------|-------------|
| 0 | 0.1 | Project scaffolding |
| 1 | 1.1-1.2 | D1 schema & data layer |
| 2 | 2.1-2.2 | Auth & agent registry |
| 3 | 3.1-3.3 | Workflow engine (DO + moderator) |
| 4 | 4.1-4.2 | Task queue API |
| 5 | 5.1-5.2 | Agent-side integration |
| 6 | 6.1-6.2 | End-to-end demo |
**Deployment:** `khala.shazhou.workers.dev`
**First milestone:** Phase 0-4 (cloud side complete), testable with curl.
**Second milestone:** Phase 5-6 (agent integration + demo).