从 Redis 到 PostgreSQL:一次需求驱动的任务系统架构升级

最近我对一个 LLM 任务服务做了一次比较完整的架构升级。

这个服务本身并不复杂:上游提交任务,我返回一个 task_id,后续上游再通过 task_id 查询任务状态和结果。任务的执行内容主要是调用 LLM 或相关多模态接口,单个任务耗时通常在几十秒以内。

最开始的实现是比较常见的组合:

FastAPI + Redis + Docker

API 接到请求后生成任务 ID,把状态写入 Redis,然后由后台逻辑继续执行任务。这个方案在早期足够轻量,开发速度也很快。

但随着需求继续演进,我开始更明确地追求几个能力:

1. 上游可以瞬间提交大量任务
2. API 必须快速返回 task_id
3. 真正执行的任务数量必须全局可控
4. 任务状态要可查询、可恢复、可追踪
5. worker 崩溃后任务不能永久卡死
6. 未来希望减少组件数量,尽量 all in PostgreSQL

也就是说,这次不是简单地“把 Redis 换成 PostgreSQL”。

更准确地说,这是一次从:

轻量状态存储

升级到:

PostgreSQL 中心化任务队列 + 全局并发控制 + 独立 worker 执行

的架构演进。

最后形成的结构是:

API + PostgreSQL Queue + Worker

也就是:

FastAPI API 层:只负责接单,快速返回 task_id
PostgreSQL:负责任务表、状态、队列、锁、并发控制
Worker:独立进程/容器,从 PostgreSQL 领取任务并执行

这套结构跑起来以后,我感觉它非常清楚,也很适合 LLM 任务这类“提交快、执行慢、需要排队和状态追踪”的场景。


一、需求背景:API 要快,执行要可控

这个服务对外提供的接口保持不变:

POST /tasks
POST /tasks/veo
GET /tasks/{task_id}

上游提交任务时,服务要立刻返回:

task_id
status

后续上游通过:

GET /tasks/{task_id}

查询任务状态:

pending
running
completed
failed
not_found

这类系统最重要的边界是:

提交任务

和:

执行任务

不能绑死在一起。

因为提交任务是高频、短耗时操作;执行任务是低频、长耗时操作。

提交任务只需要:

接收请求
生成 task_id
写入任务记录
返回 task_id

而执行任务可能要:

下载图片
构造 prompt
调用 LLM
等待外部 API 返回
处理结果
写入数据库

这两部分的耗时完全不在一个量级。

因此这次改造的目标非常明确:

API 层只接单,不干活。
Worker 层负责真正执行任务。
PostgreSQL 负责在两者之间做任务队列和全局协调。

这样一来,上游瞬间提交大量任务时,API 仍然可以快速返回 task_id;真正向外部 LLM 打出去的请求数量,则由 worker 和 PostgreSQL 控制。


二、为什么主动选择 PostgreSQL

这次选择 PostgreSQL,不只是为了替代 Redis,而是看中了 PostgreSQL 的 all in one 能力。

在这个系统里,任务队列并不只是一个队列。

它同时需要承载:

任务状态
任务请求参数
任务执行结果
错误信息
尝试次数
锁定时间
超时回收
任务查询
历史记录
并发控制

如果继续把状态、队列、锁、结果分散到不同组件里,系统会变得越来越复杂。

而 PostgreSQL 可以把这些能力收敛到一张任务表和一组事务逻辑里:

JSONB:存请求和结果
普通索引:加速 pending/running 查询
事务:保证领取任务原子性
行锁:防止重复领取
advisory lock:保护全局并发计数
timestamp 字段:实现超时回收
SQL 查询:天然支持统计和排查

这就是我看中 PostgreSQL 的地方。

它不是单纯的关系型数据库,而是可以承担很多后端基础设施职责:

状态存储
任务队列
轻量文档存储
锁服务
审计日志
查询分析

对于这个阶段的项目来说,减少组件数量、增强可控性,比单纯追求某一个局部组件的极限性能更重要。

所以这次我主动选择:

All in PostgreSQL

把任务系统的核心状态全部收敛进去。


三、任务表:系统的中心协议

这次改造后,任务表成为整个系统的中心。

表结构核心如下:

CREATE TABLE IF NOT EXISTS tasks (
    task_id TEXT PRIMARY KEY,
    status TEXT NOT NULL,
    task_type TEXT,
    request JSONB,
    result JSONB,
    error TEXT,
    attempts INTEGER NOT NULL DEFAULT 0,
    locked_until TIMESTAMPTZ,
    created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    expires_at TIMESTAMPTZ
);

配套索引:

CREATE INDEX IF NOT EXISTS idx_tasks_pending_created_at
ON tasks (created_at)
WHERE status = 'pending'
  AND task_type IS NOT NULL
  AND request IS NOT NULL;

CREATE INDEX IF NOT EXISTS idx_tasks_running_locked_until
ON tasks (locked_until)
WHERE status = 'running'
  AND request IS NOT NULL;

这张表承担的不只是存储职责。

它实际上变成了 API 和 worker 之间的协议。

API 只需要写入:

task_id
status = pending
task_type
request

Worker 只需要读取 pending 任务,领取后改成:

running

执行完成后再改成:

completed

或者:

failed

查询接口只需要根据 task_id 从这张表里读状态和结果。

这样系统边界非常清楚:

API 不需要知道 worker 是 Python 还是 Go。
Worker 不需要知道上游是谁。
双方只需要遵守 tasks 表的状态协议。

这一点是这次架构升级里很关键的收获。


四、API 层:只接单,立刻返回

改造后的 API 层逻辑非常简单。

收到:

POST /tasks
POST /tasks/veo

以后,只做几件事:

1. 生成 task_id
2. 把任务写入 PostgreSQL
3. 返回 task_id 和 pending 状态

也就是:

enqueue_task(...)
return task_id

API 层不再负责真正执行任务。

这样做的好处非常直接:

上游提交速度和任务执行速度解耦

上游可以很快提交 100 个、1000 个甚至更多任务。

这些请求进入 API 后,只是变成 PostgreSQL 里的任务行。真正执行的节奏由 worker 控制。

这也是这个架构最核心的设计:

API 负责吞吐。
Worker 负责执行。
PostgreSQL 负责协调。

五、Worker:独立执行单元

任务执行被移动到了独立 worker。

Worker 的职责是:

1. 循环尝试领取任务
2. 把任务从 pending 改为 running
3. 执行任务逻辑
4. 成功后写 completed + result
5. 失败后写 failed + error
6. 定期处理超时 running 任务

最开始 worker 可以是 Python:

python -m app.worker

后来我进一步把 worker 改造成 Go。

因为 worker 这类服务本质上很适合 Go:

循环取任务
HTTP 调用
超时控制
数据库更新
日志
优雅退出

Go 在这类场景下内存占用低、启动快、部署简单。

而且由于 API 和 worker 之间通过 PostgreSQL tasks 表通信,所以 worker 的语言并不影响外部接口。

这也是架构边界清楚后的好处:

API 可以继续用 FastAPI。
Worker 可以独立换成 Go。
上游完全无感知。

六、任务领取:FOR UPDATE SKIP LOCKED

多个 worker 同时运行时,最重要的问题是:

不能让多个 worker 领取到同一个任务。

PostgreSQL 里可以用:

FOR UPDATE SKIP LOCKED

来解决这个问题。

领取任务的核心 SQL 是:

WITH next_task AS (
    SELECT task_id
    FROM tasks
    WHERE status = 'pending'
      AND task_type IS NOT NULL
      AND request IS NOT NULL
    ORDER BY created_at
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE tasks AS t
SET status = 'running',
    attempts = t.attempts + 1,
    locked_until = now() + ($1::int * interval '1 second'),
    updated_at = now(),
    expires_at = NULL
FROM next_task
WHERE t.task_id = next_task.task_id
RETURNING t.task_id, t.task_type, t.request::text AS request;

它的效果可以理解成:

多个 worker 同时来抢 pending 任务。
某个 worker 锁住一行后,其他 worker 会跳过这行。
每个 worker 最终拿到不同的任务。

这解决了任务重复领取问题。


七、全局并发控制:advisory lock + running count

FOR UPDATE SKIP LOCKED 解决的是“不要重复领取同一个任务”。

但我还需要另一个能力:

全局最多同时执行 N 个任务。

比如设置:

MAX_CONCURRENT_TASKS=20

那不管启动多少 worker,真正处于 running 的任务都不应该超过 20。

为了做到这一点,领取任务时使用了 PostgreSQL 的事务级 advisory lock:

SELECT pg_advisory_xact_lock(...);

完整逻辑是:

开启事务
  ↓
获取 pg_advisory_xact_lock
  ↓
回收超时 running 任务
  ↓
查询当前 running 数量
  ↓
如果 running >= MAX_CONCURRENT_TASKS,直接不领取
  ↓
如果 running < MAX_CONCURRENT_TASKS,领取一个 pending 任务
  ↓
同一个事务里更新成 running
  ↓
提交事务
  ↓
worker 开始调用 LLM

这里的设计重点是两个锁分工明确:

pg_advisory_xact_lock:
    负责把“统计 running 数 + 领取任务”这个临界区串行化

FOR UPDATE SKIP LOCKED:
    负责具体任务行的抢占

这样就能避免多个 worker 同时看到 running 数未满,然后一起领取导致短暂超额。

也就是说,PostgreSQL 不只是存任务,还承担了全局并发控制器的角色。


八、事务边界:不能拖着 LLM 调用

这个设计里还有一个非常重要的边界:

数据库事务只包住领取任务,不能包住 LLM 调用。

正确流程是:

开启事务
  ↓
领取任务
  ↓
标记 running
  ↓
提交事务
  ↓
调用 LLM
  ↓
写入 completed / failed

这样数据库锁只持有很短时间。

不能这样做:

开启事务
  ↓
领取任务
  ↓
调用 LLM 等几十秒
  ↓
写结果
  ↓
提交事务

因为这样会形成长事务,把外部 API 的不确定性带进数据库事务里。

最终我采用的是短事务领取,事务外执行任务。

这也是这类 PostgreSQL queue 设计里非常关键的一点。


九、worker 崩溃恢复:locked_until

如果 worker 领取任务后进程崩溃,任务会停留在:

status = running

为了避免任务永久卡住,任务表里设计了:

locked_until
attempts

worker 领取任务前,会先回收超时任务:

UPDATE tasks
SET status = CASE
        WHEN attempts >= $1::int THEN 'failed'
        ELSE 'pending'
    END,
    error = CASE
        WHEN attempts >= $1::int THEN 'worker timeout'
        ELSE error
    END,
    locked_until = NULL,
    updated_at = now(),
    expires_at = CASE
        WHEN attempts >= $1::int THEN now() + ($2::int * interval '1 second')
        ELSE NULL
    END
WHERE status = 'running'
  AND request IS NOT NULL
  AND locked_until IS NOT NULL
  AND locked_until < now();

这样 worker 崩溃后:

任务先保持 running
  ↓
locked_until 过期
  ↓
其他 worker 领取任务前发现它超时
  ↓
根据 attempts 决定重新 pending 或最终 failed

这让任务系统具备了自恢复能力。


十、Docker Compose:用 scale 控制 worker 数量

worker 独立以后,可以用 Docker Compose 很直观地控制执行单元数量。

启动 20 个 worker:

docker compose up -d --scale worker=20

停止整个项目:

docker compose down

把 worker 降到 0,但保留 API 和 PostgreSQL:

docker compose up -d --scale worker=0

恢复 worker:

docker compose up -d --scale worker=20

这个模型非常清楚:

worker=0:系统可以接单,但暂停执行
worker=3:低并发执行
worker=20:高并发执行

这比把并发藏在代码内部更直观。

当然,真正的全局并发仍然由 PostgreSQL 的 MAX_CONCURRENT_TASKS 兜底控制。

所以即使 worker 数量多于上限,也不会无限制向外打请求。


十一、Python worker 与 Go worker

一开始 worker 是 Python 版本。

测试下来,Python worker 空闲大概 30MiB,执行任务时大概 40MiB。

这个内存其实并不夸张:

40MiB × 20 = 800MiB

完全可接受。

后来我尝试把 worker 改成 Go,效果更轻:

Go worker 待机:3MiB ~ 6MiB

如果启动 20 个 Go worker:

6MiB × 20 = 120MiB

这个资源占用非常漂亮。

这也让我更加确认:

FastAPI 做 API
Go 做 worker
PostgreSQL 做中心任务系统

是一个非常舒服的组合。

FastAPI 保留了 Python 在接口开发上的效率,Go worker 则提供了更轻的运行时和更稳定的执行层。


十二、为什么没有使用 Celery

这次没有引入 Celery。

不是 Celery 不好,而是这次需求更适合一个直接、透明、PostgreSQL 中心化的方案。

Celery 通常会带来:

broker
result backend
worker
task decorator
routing
retry
beat
monitoring

它适合更复杂的任务生态。

而这个系统当前的核心诉求很明确:

任务入库
快速返回 task_id
worker 领取任务
控制全局执行并发
写入结果
支持查询
支持超时恢复

这些能力用 PostgreSQL tasks 表 + worker 就可以覆盖。

更重要的是,当前系统的 GET /tasks/{task_id} 本来就是核心接口。

任务状态天然就应该落在业务数据库里。

如果额外引入 Celery,很可能会出现:

Celery 管一份任务状态
PostgreSQL 管一份业务状态
API 查询时还要做映射

这会让系统边界变复杂。

现在的设计更直接:

tasks 表就是事实来源。

十三、关于 goroutine 的思考

Go worker 跑起来后,我也考虑过是否要用 goroutine 进一步优化。

现在的模式是:

20 个 worker 容器
每个容器 1 个执行槽

Go 当然可以改成:

1 个 worker 容器
内部 20 个 goroutine

或者折中:

4 个 worker 容器
每个容器 5 个 goroutine

goroutine 的优势是空闲成本很低,一个 Go 进程里开多个 goroutine,比启动多个进程和多个容器更省资源。

但我最后暂时没有这么做。

原因是当前 Go worker 已经足够轻:

单个 worker 只有 3MiB ~ 6MiB

20 个容器也只是 60MiB ~ 120MiB 的量级。

而一个容器一个执行槽的好处也很明显:

结构直观
故障隔离好
排查简单
一个 worker 挂了只损失一个执行槽
不会过早引入进程内并发复杂度

所以当前阶段,我选择保持:

一个 worker 容器 = 一个执行单元

后续如果确实需要优化,可以再演进到:

少量容器 × 每个容器多个 goroutine

但不是现在。


十四、资源占用情况

实际观察下来,资源占用比预期更好。

大概数据是:

FastAPI API:约 160MiB
PostgreSQL:约 127MiB
Go worker:单个 3MiB ~ 6MiB

如果启动 20 个 Go worker:

Go worker 总内存:约 60MiB ~ 120MiB

整体结构非常轻:

FastAPI:160MiB
PostgreSQL:127MiB
20 个 Go worker:60~120MiB

即使加上运行时波动,总体也很可控。

PostgreSQL 肯定不会像 Redis 那样极致省内存,但它提供的是完全不同的能力:

任务状态
事务
锁
JSONB
查询
索引
恢复
统计

用 100 多 MiB 的常驻内存换来这些能力,在这个系统里非常值。


十五、提交压测结果

我跑了一次提交压测:

python scripts/load_submit.py --total 100 --concurrency 50 --docker-stats

结果:

submitted=100/100
ok=100
failed=0
elapsed=0.17s
avg_submit_rate=594.4/s
latency_seconds min=0.004 p50=0.008 p95=0.012 p99=0.015 max=0.015

这说明 API 入队链路很轻:

HTTP 请求
FastAPI 接收
写 PostgreSQL
返回 task_id

在这个测试下,p95 只有 12ms 左右。

这正是“API 只接单,不执行任务”的效果。

后续还可以继续扩大测试:

python scripts/load_submit.py --total 1000 --concurrency 100 --docker-stats
python scripts/load_submit.py --total 5000 --concurrency 200 --docker-stats
python scripts/load_submit.py --total 10000 --concurrency 200 --docker-stats

主要观察:

ok 是否等于 total
failed 是否为 0
p95 / p99 是否稳定
PostgreSQL CPU / IO 是否稳定
API 内存是否稳定
running 是否始终 <= MAX_CONCURRENT_TASKS

提交速度只是第一部分。

更重要的是确认:

任务可以大量提交
但执行并发始终受控

十六、这套架构的优点

1. API 完全兼容

对外接口不变:

POST /tasks
POST /tasks/veo
GET /tasks/{task_id}

返回结构不变:

task_id
status
result
error

状态值不变:

pending
running
completed
failed
not_found

这意味着上游调用方不需要改。

2. 提交和执行解耦

API 负责快速入队,worker 负责慢速执行。

上游提交压力不会直接变成 LLM 请求压力。

3. 全局并发可控

真正执行任务的数量由:

MAX_CONCURRENT_TASKS
worker 数量
PostgreSQL 锁

共同控制。

即使上游瞬间提交很多任务,也只会按设定并发慢慢执行。

4. 状态统一

任务状态全部在 PostgreSQL 中。

查询、排查、统计都很直接。

5. 崩溃可恢复

通过:

locked_until
attempts
timeout recycle

worker 挂掉后任务可以重新被处理或最终失败。

6. worker 可替换

Python worker 可以换成 Go worker。

未来 API 也可以从 FastAPI 换成 Gin。

核心协议是 PostgreSQL tasks 表。


十七、当前方案的取舍

这套架构也有取舍。

1. worker 是常驻的

启动 20 个 worker 后,即使没有任务,它们也会占用基础内存。

不过 Go worker 单个只有 3MiB ~ 6MiB,目前这个成本可以接受。

2. Docker Compose 不自动扩缩容

普通 Docker Compose 不会根据 pending 任务数量自动扩缩 worker。

如果未来需要,可以写一个轻量 autoscaler:

pending > 100:scale worker=20
pending > 20:scale worker=10
pending = 0 持续一段时间:scale worker=1

但当前阶段没必要过早引入。

3. PostgreSQL 成为核心依赖

PostgreSQL 现在是任务系统中心。

未来生产化需要关注:

volume
备份
连接数
索引
vacuum
慢查询
历史任务清理

这是 all in PostgreSQL 必须接受的责任。


十八、最终理解

这次架构升级让我进一步确认了一件事:

任务系统的核心不是“异步执行”,而是“边界清晰”。

一个健康的任务系统应该把这几件事拆开:

谁接收请求?
谁记录任务?
谁控制并发?
谁执行任务?
谁负责失败恢复?
谁提供查询?

在这次方案里,答案很清楚:

FastAPI 接收请求
PostgreSQL 记录任务、控制并发、提供状态查询
Go worker 执行任务
Docker Compose 管理 worker 数量

这套结构不是最复杂的,也不是最重的,但它非常清楚。

我最满意的是:

tasks 表成为了系统协议。

API、worker、未来其他服务,都围绕这张表协作。

这让系统从一个普通的异步接口,升级成了一个可控、可恢复、可扩展的任务系统。


结语

这次从 Redis 到 PostgreSQL 的迁移,本质上不是“换一个存储组件”。

它是一次需求驱动的架构升级:

从轻量状态存储
升级到
PostgreSQL 中心化任务系统

最终架构是:

Python FastAPI:负责接单
PostgreSQL:负责任务队列、状态、锁、并发控制
Go worker:负责执行任务
Docker Compose:负责 worker scale

对我来说,这个方案最优雅的地方在于:

上游可以快速提交任务,
API 可以立即返回 task_id,
worker 可以按固定并发慢慢执行,
所有状态都能在 PostgreSQL 中被查询和恢复。

这就是我这次想要的效果。

Read more

三台机器部署 ClickHouse 高可用集群实战记录

本文是一份可发布版部署记录。真实 IP、域名、账号、密码、下载链接、业务目录名、机器唯一标识等敏感信息已经替换为占位符。命令中的 <...> 需要按自己的环境替换。 目标与拓扑 这次目标是用三台数据节点部署一套 ClickHouse 高可用集群,拓扑采用: 1 shard x 3 replicas 含义是:集群只有一个逻辑分片,三台机器都保存同一份数据的完整副本。任意一台数据节点宕机时,只要 ClickHouse Keeper 仍然有多数派,剩余节点仍可继续提供读写服务。 规划节点如下: 主机名示例地址角色ch-01<ch-01-ip>ClickHouse Server + ClickHouse Keeperch-02<ch-02-ip>ClickHouse Server + ClickHouse Keeperch-03<ch-03-ip&

By ladydd

折腾记(二):接入火山引擎实时语音 API,家庭语音助手体验直接拉满

接上篇 上一篇用全开源组件(Whisper + Hermes + Edge-TTS)搭了个语音助手,能跑,但体验就是"能用"二字: * 中文识别只有 70 分,方言基本歇菜 * 英文唤醒词"Alexa"喊着别扭 * 说完到回复要等 4-8 秒 * 它说话的时候你插不了嘴 这些问题靠堆开源组件很难根治。于是我去试了火山引擎(字节跳动)的语音服务,结果直接换了条路。 这篇分两段:先讲怎么用火山引擎的 ASR/TTS 替换掉开源组件(小改),再讲怎么上端到端实时语音模型(大改)。 第一段:先把 ASR 和 TTS 换成火山引擎 为什么换 我用豆包输入法的时候发现它语音识别准得离谱。一查,豆包用的就是字节自家的火山引擎 Seed-ASR。开通后有免费额度(

By ladydd

折腾记(一):用全开源组件给家里搭一个语音助手,对接自己的 Hermes Agent

起因 事情是从一块 ESP32-S3 开发板开始的。 我手上有一块 Seeed Studio XIAO ESP32-S3 Sense,带摄像头和麦克风。最初的想法很美好:用这块板子做一个无线语音终端,对着它说话,连到我服务器上跑的 Hermes Agent(一个自托管的 AI agent),让它回答我。 但折腾到一半我突然意识到一件事:我的麦克风、音响、服务器全在家里,为什么要绕一圈用 ESP32?直接把麦克风和音响插到服务器上不就行了? ESP32 那条路(做无线拾音终端)当然也有价值,但那是"为了学嵌入式而学",不是解决问题的最短路径。于是这个项目就从"嵌入式项目"变成了"在服务器上拼一个语音助手"。这篇就记录后者。 教训零:先想清楚你要解决的是什么问题。很多时候最优解比你最初设想的简单得多。 目标

By ladydd

Kiro 的三种代理设置方法:本地、服务端、Remote

作为kiro的骨灰级用户,这篇是我自己折腾 Kiro / Kiro Remote / Ubuntu Server 代理问题后的复盘。 核心不是“怎么配一个代理”,而是先判断:到底是谁在访问外网? 谁访问外网,代理就要配给谁。 0. 先说结论 Kiro 相关代理大概分三类: 场景真正访问外网的进程在哪里代理应该配在哪里本地 KiroWindows / Mac 本机本机 Clash / Proxifier / 系统代理服务端 Kiro / CLIUbuntu Server 上的 shell、CLI、node、kiro 进程Ubuntu 的环境变量,比如 HTTP_PROXY / HTTPS_PROXYKiro Remote远程 Ubuntu 上的 ~/.kiro-server 和 extensionHost远程 Ubuntu 的 Kiro Server

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号