docs(rfc-001): archival watermark + workflow_runs materialized table
- Cold archival: meta table with archived_up_to watermark for crash-safe recovery - Workflow state: workflow_runs materialized table (UPSERT in same txn as log write) - O(active) queries instead of full table scan - Derivable from logs if lost 小橘 <xiaoju@shazhou.work>
This commit is contained in:
@@ -488,9 +488,9 @@ CREATE INDEX idx_logs_ref_id ON logs(ref_id);
|
||||
- **统一一张表**,通过 `source + type` 区分 log 来源和类型
|
||||
- Reflex 可以查询 logs 表(只读),但 log 不能触发 reflex(见 §2.4)
|
||||
|
||||
#### Workflow 状态:事件溯源
|
||||
#### Workflow 状态:事件溯源 + 物化表
|
||||
|
||||
Workflow Thread 的状态不用 mutable 表,而是 append-only 的事件流:
|
||||
Workflow Thread 的状态以 append-only 事件流为 source of truth:
|
||||
|
||||
```sql
|
||||
-- 也在 logs 表中,source = "workflow"
|
||||
@@ -505,20 +505,37 @@ source=workflow, type=started, ref_id=run-7, ts=1001
|
||||
source=workflow, type=completed, ref_id=run-7, ts=1005
|
||||
```
|
||||
|
||||
查询当前活跃 workflow:
|
||||
为避免每次查活跃 workflow 都扫描全表,引擎维护一张 **物化表**,在写 log 的同一事务中 UPSERT:
|
||||
|
||||
```sql
|
||||
SELECT ref_id, type FROM logs
|
||||
WHERE source = 'workflow'
|
||||
AND (ref_id, ts) IN (
|
||||
SELECT ref_id, MAX(ts) FROM logs
|
||||
WHERE source = 'workflow'
|
||||
GROUP BY ref_id
|
||||
)
|
||||
AND type IN ('queued', 'started')
|
||||
CREATE TABLE workflow_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
workflow TEXT NOT NULL, -- workflow 名
|
||||
status TEXT NOT NULL, -- 最新状态:queued, started, completed, failed, crashed
|
||||
ts INTEGER NOT NULL -- 最新状态的时间戳
|
||||
);
|
||||
|
||||
CREATE INDEX idx_workflow_runs_status ON workflow_runs(status);
|
||||
```
|
||||
|
||||
进程重启时从 log 重建内存状态。运行时用内存 materialized view 加速查询。
|
||||
写入流程(同一事务):
|
||||
|
||||
```sql
|
||||
BEGIN;
|
||||
INSERT INTO logs (source, type, ref_id, payload, ts) VALUES ('workflow', 'started', 'run-7', '{}', 1001);
|
||||
INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts) VALUES ('run-7', 'alert', 'started', 1001);
|
||||
COMMIT;
|
||||
```
|
||||
|
||||
查询当前活跃 workflow 变为 O(活跃数):
|
||||
|
||||
```sql
|
||||
SELECT * FROM workflow_runs WHERE status IN ('queued', 'started')
|
||||
```
|
||||
|
||||
物化表是 logs 的派生数据——数据丢失时可从 logs 重建。logs 表仍是 source of truth。
|
||||
|
||||
进程重启时从 log 重建内存状态。运行时用内存 materialized view 进一步加速。
|
||||
|
||||
#### 冷归档
|
||||
|
||||
@@ -538,6 +555,29 @@ data/
|
||||
|
||||
导出后从主库 DELETE + VACUUM。冷数据用 grep/jq 即可查询,不需要 SQL。
|
||||
|
||||
**水位标记**:归档进度记录在 meta 表中,确保任一步崩溃都能安全恢复:
|
||||
|
||||
```sql
|
||||
CREATE TABLE meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
);
|
||||
|
||||
-- 归档水位:已成功归档到哪一天
|
||||
-- key = "archived_up_to", value = "2026-03-22"
|
||||
```
|
||||
|
||||
归档流程:
|
||||
|
||||
```
|
||||
1. 读 meta.archived_up_to,确定从哪天开始
|
||||
2. 导出该天数据到 JSONL(幂等:同一天重复导出会覆盖文件)
|
||||
3. 同一事务:DELETE 该天数据 + UPDATE meta.archived_up_to
|
||||
4. VACUUM(可选,非事务内)
|
||||
```
|
||||
|
||||
任何一步崩溃,重启后从水位标记处继续,不会丢数据也不会重复删除。
|
||||
|
||||
### 5.5 热更新
|
||||
|
||||
主进程 watch `~/.uncaged-nerve/` 文件变化,按类型处理:
|
||||
|
||||
Reference in New Issue
Block a user