FastAPI 异步任务服务的并发设计演进:从单进程轮询到多 Worker 协程直处理

本文记录了一个 FastAPI 异步任务服务在并发架构上的思考和演进过程。这个服务的本质很简单:接收客户端请求,转发给下游 AI API,把结果存起来供客户端轮询。它不做复杂的业务计算,不做数据聚合,就是一个纯转发层——接活、派活、存结果。正因为场景足够简单,我们才有机会做一次化繁为简的架构妥协,把原本"看起来该用任务队列"的设计砍到只剩三行核心配置。

一、先说清楚场景:我们到底在干什么

这个服务做的事情可以用一句话概括:

客户端提交参数 → 服务转发给下游 AI API → 等结果 → 存 Redis → 客户端来取。

关键特征:

  • 纯 IO 转发:服务本身不做任何 CPU 密集计算,所有耗时都花在等下游 API 返回。一次调用几秒到几十秒不等,全是网络等待。
  • 异步模式:客户端提交任务后立即拿到 task_id,然后轮询结果。不是同步等响应。
  • 无状态转发:每个任务独立,互不依赖,没有顺序要求,没有事务性。
  • 容忍丢失:偶尔丢一个任务(比如进程挂了),客户端重新提交就行。不是支付、不是订单,丢了不会造成业务损失。

理解这个场景非常重要,因为后面所有的架构决策都建立在这个前提上。如果你的场景涉及 CPU 密集计算、任务间有依赖、或者不能容忍丢失,结论会完全不同。


二、最初的架构:单进程 + BackgroundWorker 轮询

项目最初的并发模型非常经典:

用户请求 → FastAPI 接口 → 创建任务写入 Redis(pending)→ 立即返回 task_id
                                        ↓
                            BackgroundWorker(进程内)
                            - 每秒轮询 Redis,找 pending 状态的任务
                            - 取出来丢给 asyncio.create_task() 处理
                            - max_concurrent_tasks = 5 控制并发上限
                                        ↓
                            调用下游 AI API(async/await,纯 IO 等待)
                                        ↓
                            任务标记 completed,结果写回 Redis
                                        ↓
用户轮询 → GET /tasks/{id} → 从 Redis 读结果

启动命令是:

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]

没有 --workers 参数,只有一个 Uvicorn 进程。BackgroundWorker 在 FastAPI 的 lifespan 里启动,和 API 服务绑在同一个进程里。

这个架构的问题

并发天花板很低。 只有一个进程,max_concurrent_tasks 默认是 5,意味着同一时刻最多只有 5 个下游 API 请求在飞。下游调用是纯网络 IO 等待,asyncio 事件循环完全有能力处理更多并发,但被这个数字卡死了。

轮询模式有固有开销。 不管有没有新任务,BackgroundWorker 每秒都要去 Redis 扫一遍 pending 任务。任务少的时候这是无意义的开销,任务多的时候又可能因为轮询间隔导致延迟。

水平扩展有坑。 如果简单地给 Uvicorn 加 --workers 4,会 fork 出 4 个进程,每个进程都会跑一个 BackgroundWorker,4 个 Worker 同时轮询 Redis 抢 pending 任务。没有分布式锁的话,同一个任务会被多个 Worker 重复处理。

回过头看,这个架构对于一个"纯转发"的服务来说,设计过重了。BackgroundWorker、轮询机制、容量控制——这些是为复杂任务队列场景准备的武器,但我们的场景根本用不上。


三、我们讨论过的几种并发方案

方案 A:调大 max_concurrent_tasks

最简单的做法——把 max_concurrent_tasks 从 5 调到 50 甚至 100。下游调用是纯 IO,asyncio 完全扛得住。

优点: 零代码改动,改个环境变量就行。

缺点: 单进程的天花板还在。Python 的 GIL 虽然对 IO 密集型影响不大,但单进程的事件循环、内存、连接数都有上限。而且 BackgroundWorker 的轮询模式本身就不够高效。

结论: 适合任务量不大、单机够用的过渡方案,但不是长期解。

方案 B:拆独立 Worker 进程 + Redis 原子抢任务

把 BackgroundWorker 从 FastAPI 进程里拆出来,做成独立的 Worker 进程。API 进程只负责接收请求和写 Redis,Worker 进程只负责消费任务。

API 进程 → 写任务到 Redis
Worker 进程 1 ─┐
Worker 进程 2 ─┤── 轮询 Redis,原子抢任务(SETNX / Lua 脚本)
Worker 进程 3 ─┘

优点: API 和 Worker 解耦,可以独立扩缩容。docker-compose scale worker=N 一行命令水平扩。

缺点: 需要实现分布式锁来防止重复处理。多了一个进程类型要维护。轮询模式的固有开销还在。

结论: 比方案 A 好很多,但对于一个纯转发服务来说,引入分布式锁的复杂度不值得。

方案 C:Redis 队列替代轮询

不再轮询 Redis 扫 pending 任务,改用 Redis List(BRPOP)或 Stream 做消息队列。创建任务时 LPUSH 到队列,Worker 用 BRPOP 阻塞等待。

优点: 天然支持多消费者,不需要额外加锁。没有轮询开销,任务来了立刻消费。

缺点: 需要改造任务分发逻辑。Redis Stream 的消费者组管理有一定学习成本。

结论: 如果要走独立 Worker 路线,这是比方案 B 更优的选择。但我们的场景真的需要消息队列吗?

方案 D:引入专业任务队列(Celery / arq)

用 Celery + Redis/RabbitMQ,或者 arq(轻量级 asyncio 任务队列)。自带重试、超时、优先级、监控、结果后端。

优点: 生产级方案,功能全面,社区成熟。

缺点: 重依赖。Celery 本身的配置和运维成本不低。对于一个纯转发服务来说,杀鸡用牛刀。

结论: 如果项目规模继续增长,这是终极方案。但当前阶段过度设计了。

方案 E:去掉 BackgroundWorker,请求直接起协程(最终采用)

回到场景本身思考:我们做的是纯转发,每个请求进来就是"调一次下游 API,存个结果"。那为什么要绕一圈——先写 Redis,再轮询取出来,再处理?

直接在收到请求的时候起协程处理不就完了?

客户端 → Uvicorn (worker 2 接到请求)
            │
            ├─ 创建任务写 Redis(pending)
            ├─ asyncio.create_task()  ← 本进程内直接起协程
            └─ return task_id         ← 立即响应

            协程在 worker 2 内执行:
            ├─ 更新状态为 processing
            ├─ 调用下游 AI API(async/await,纯 IO)
            └─ 更新状态为 completed / failed

客户端 → GET /tasks/{id}(可能落到任意 worker)
            └─ 从 Redis 读结果

配合 Uvicorn --workers 4,就有了 4 个进程级并发。每个进程内 asyncio 再提供协程级并发。

这就是我们最终采用的方案。


四、为什么选方案 E:一次化繁为简的妥协

说"妥协",是因为方案 E 确实有明显的缺陷:

  • 进程挂了任务就丢了。 正在处理的任务会永远卡在 processing 状态,不会被重试。
  • 没有持久化的任务队列。 进程重启后,所有正在处理的任务全部丢失。
  • 没有任务优先级。 所有任务先来先服务,无法插队。

但回到我们的场景:

  • 纯转发,不怕丢。 丢了一个任务,客户端重新提交就行。不是支付,不是订单。
  • 任务有 TTL。 卡在 processing 的僵尸任务,2 小时后 Redis 自动清理。查询接口还有 10 分钟超时检测,主动标记 failed。
  • 不需要优先级。 所有请求平等,没有 VIP 通道的需求。

所以那些"缺陷",在我们的场景下根本不是问题。

而方案 E 带来的好处是实实在在的:

  • 删掉了 BackgroundWorker 整个模块。 轮询逻辑、容量控制、活跃任务追踪、优雅关闭——全部不需要了。
  • 删掉了 get_pending_tasks 方法。 没人轮询了,不需要按状态查 pending 任务。
  • 不需要分布式锁。 每个任务只在收到请求的那个 Worker 里处理,天然不冲突。
  • 不需要额外的进程类型。 没有独立的 Worker 进程要部署和维护。
  • 水平扩展极其简单。 改 --workers 数字就行。

核心逻辑从"写 Redis → 轮询 → 取任务 → 处理"变成了"收到请求 → 起协程 → 处理"。 中间环节全部砍掉。

这不是偷懒,这是对场景的准确判断。一个纯转发服务,不需要任务队列的可靠性保证,那就不要引入任务队列的复杂度。


五、最终方案的实现细节

5.1 整体架构

                    ┌─────────────────────────────────┐
                    │         Docker Compose           │
                    │                                  │
                    │  ┌───────────┐  ┌─────────────┐ │
客户端 ──9527:8000──│──│  Uvicorn  │  │    Redis     │ │
                    │  │ 4 workers │──│  (结果存储)   │ │
                    │  └───────────┘  └─────────────┘ │
                    └─────────────────────────────────┘

每个 Worker 进程内部:
┌──────────────────────────────────────────────┐
│  asyncio 事件循环                              │
│                                                │
│  POST /tasks 请求进来                          │
│    ├─ 写 Redis                                 │
│    ├─ asyncio.create_task(_process_task)       │
│    └─ return task_id                           │
│                                                │
│  _process_task 协程                            │
│    ├─ async with semaphore:  ← 下游并发限流    │
│    │     └─ await call_downstream_api()        │
│    └─ 写结果到 Redis                           │
│                                                │
│  GET /tasks/{id} 请求进来                      │
│    └─ 从 Redis 读结果                          │
└──────────────────────────────────────────────┘

5.2 并发控制:Semaphore 限流

去掉 BackgroundWorker 后,没有了 max_concurrent_tasks 的概念。如果短时间涌入大量请求,每个都直接起协程调下游 API,可能把下游打挂。

解决方案是用 asyncio.Semaphore 限制每个 Worker 进程内的下游 API 并发数:

# dependencies.py
vlm_semaphore: Optional[asyncio.Semaphore] = None

# main.py lifespan
dependencies.vlm_semaphore = asyncio.Semaphore(settings.max_concurrent_vlm)

# routes.py _process_task
async with semaphore:
    result = await client.call_downstream_api(...)

默认每个 Worker 最多 10 个并发下游调用,4 个 Worker 就是最多 40 个。通过环境变量 MAX_CONCURRENT_VLM 可调。

Semaphore 只限制下游调用这一步,不限制任务创建。 100 个请求同时进来,100 个都会立即返回 task_id(对调用方完全无感),然后 100 个协程在后台排队等 Semaphore。排在后面的任务只是完成得慢一些,不会报错,不会拒绝。

这对调用方来说是透明的:提交任务秒回,轮询结果的时候排在后面的任务等久一点而已。

5.3 processing 超时检测

Worker 进程挂了,正在处理的任务会卡在 processing 状态。我们在查询接口里做了被动检测:

PROCESSING_TIMEOUT_MINUTES = 10

if task.status == TaskStatus.PROCESSING:
    elapsed = datetime.now(UTC) - task.updated_at.replace(tzinfo=UTC)
    if elapsed > timedelta(minutes=PROCESSING_TIMEOUT_MINUTES):
        task.mark_failed("Processing timed out after 10 minutes")
        await task_manager.update_task(task)

不需要额外的定时任务。调用方轮询结果的时候,如果发现 processing 超过 10 分钟,直接标记 failed 返回。调用方看到 failed 可以选择重新提交。

这是一个"懒检测"策略——只有被查询到的任务才会触发超时判断。没人查的任务会一直卡着,直到 Redis TTL 过期自动清除。对我们的场景来说完全够用,因为调用方一定会轮询结果。

5.4 Uvicorn 多 Worker 模式

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

Uvicorn 的多 Worker 模式基于 multiprocessing,会 fork 出 4 个子进程。每个子进程独立运行一个 FastAPI 应用实例,各自有自己的:

  • asyncio 事件循环
  • Redis 连接(在 lifespan 里各自初始化,Redis 连接不能跨进程共享)
  • Semaphore 实例(进程级隔离,互不影响)

请求由 Uvicorn 主进程分发到子进程,天然负载均衡。

一个关键点: 每个 Worker 的 Semaphore 是独立的。设置 MAX_CONCURRENT_VLM=10,4 个 Worker 的实际上限是 4×10=40,不是 10。

5.5 任务 TTL

从 24 小时缩短到 2 小时。理由:

  • 这是一个即时性服务,2 小时足够调用方拿到结果
  • 缩短 TTL 意味着卡在 processing 的僵尸任务更快被清理
  • Redis 内存占用更可控

TTL 通过环境变量 TASK_TTL_HOURS 可调。Redis key 的 TTL 和应用层的 expires_at 字段双重保障。


六、方案对比总结

维度A:调大并发数B:独立 Worker + 锁C:Redis 队列D:Celery/arqE:请求直接起协程
改动量
架构复杂度最低
水平扩展不支持支持支持支持支持(改 workers 数)
任务可靠性最高中(进程挂了任务丢)
运维成本最低
适用场景临时过渡需要可靠投递需要可靠投递大规模生产纯转发、容忍偶尔丢任务

七、什么时候需要升级

当前方案的适用边界:

  • 场景是纯转发:服务本身不做重计算,只是把请求转给下游、等结果、存起来
  • 可以容忍偶尔丢任务:进程挂了正在处理的任务会丢,调用方重试即可
  • 不需要任务优先级:所有任务先来先服务
  • 单机部署:多 Worker 在同一台机器上
  • 任务量在几百 QPS 以内:4 Worker × 10 并发 = 40 个下游请求同时飞,加上排队机制,几百 QPS 没问题

如果未来出现以下情况,就该考虑升级到方案 C 或 D:

  • 需要跨机器部署多个实例
  • 需要任务持久化和可靠投递(不能丢任务)
  • 需要任务优先级、延迟执行、定时任务
  • 需要完善的任务监控和管理后台
  • 服务不再是纯转发,开始有 CPU 密集的处理逻辑

但在那之前,当前方案就是最好的方案——因为它最简单。


八、关键代码片段

创建任务 + 起协程

@router.post("/tasks", response_model=CreateTaskResponse)
async def create_task(body: CreateTaskRequest):
    # ... 参数校验 ...

    task = await task_manager.create_task(...)

    # 直接在当前 worker 进程内起协程处理任务
    asyncio.create_task(_process_task(task))

    return CreateTaskResponse(id=task.id, status=task.status)

后台处理协程 + Semaphore 限流

async def _process_task(task):
    task_manager = dependencies.task_manager
    semaphore = dependencies.vlm_semaphore

    try:
        task.mark_processing()
        await task_manager.update_task(task)

        async with semaphore:  # 限制下游 API 并发
            result = await client.call_downstream_api(...)

        task.mark_completed(image_prompt=result["image_prompt"], ...)
        await task_manager.update_task(task)
    except Exception as exc:
        task.mark_failed(f"Processing error: {exc}")
        await task_manager.update_task(task)

processing 超时检测

@router.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: str):
    task = await task_manager.get_task(task_id)

    if task.status == TaskStatus.PROCESSING:
        elapsed = datetime.now(UTC) - task.updated_at.replace(tzinfo=UTC)
        if elapsed > timedelta(minutes=10):
            task.mark_failed("Processing timed out after 10 minutes")
            await task_manager.update_task(task)

    return TaskResponse(...)

Dockerfile

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]

环境变量

变量默认值说明
MAX_CONCURRENT_VLM10每个 Worker 进程的下游 API 最大并发数
TASK_TTL_HOURS2任务过期时间(小时)
REDIS_URLredis://localhost:6379/0Redis 连接地址

九、写在最后

架构设计最容易犯的错误是过度设计。

看到"异步任务"就想上 Celery,看到"多进程"就想上分布式锁,看到"消息传递"就想上 RabbitMQ——这些都是好工具,但不是每个场景都需要。

我们的服务就是一个转发层。请求进来,转给下游,等结果,存起来。没有复杂的业务逻辑,没有事务性要求,没有不能丢的数据。

对于这样的场景,asyncio.create_task() + asyncio.Semaphore + Uvicorn --workers 4 就是最优解。不是因为它功能最强,而是因为它刚好够用,同时复杂度最低

少一个组件,就少一个故障点。少一层抽象,就少一个需要理解的概念。

这是一次化繁为简的妥协。我们清楚地知道放弃了什么(任务可靠性),也清楚地知道换来了什么(架构简单性)。在当前场景下,这个交换是值得的。

最好的架构不是功能最全的,而是对当前场景做出了正确取舍的。

Read more

三台机器部署 ClickHouse 高可用集群实战记录

本文是一份可发布版部署记录。真实 IP、域名、账号、密码、下载链接、业务目录名、机器唯一标识等敏感信息已经替换为占位符。命令中的 <...> 需要按自己的环境替换。 目标与拓扑 这次目标是用三台数据节点部署一套 ClickHouse 高可用集群,拓扑采用: 1 shard x 3 replicas 含义是:集群只有一个逻辑分片,三台机器都保存同一份数据的完整副本。任意一台数据节点宕机时,只要 ClickHouse Keeper 仍然有多数派,剩余节点仍可继续提供读写服务。 规划节点如下: 主机名示例地址角色ch-01<ch-01-ip>ClickHouse Server + ClickHouse Keeperch-02<ch-02-ip>ClickHouse Server + ClickHouse Keeperch-03<ch-03-ip&

By ladydd

折腾记(二):接入火山引擎实时语音 API,家庭语音助手体验直接拉满

接上篇 上一篇用全开源组件(Whisper + Hermes + Edge-TTS)搭了个语音助手,能跑,但体验就是"能用"二字: * 中文识别只有 70 分,方言基本歇菜 * 英文唤醒词"Alexa"喊着别扭 * 说完到回复要等 4-8 秒 * 它说话的时候你插不了嘴 这些问题靠堆开源组件很难根治。于是我去试了火山引擎(字节跳动)的语音服务,结果直接换了条路。 这篇分两段:先讲怎么用火山引擎的 ASR/TTS 替换掉开源组件(小改),再讲怎么上端到端实时语音模型(大改)。 第一段:先把 ASR 和 TTS 换成火山引擎 为什么换 我用豆包输入法的时候发现它语音识别准得离谱。一查,豆包用的就是字节自家的火山引擎 Seed-ASR。开通后有免费额度(

By ladydd

折腾记(一):用全开源组件给家里搭一个语音助手,对接自己的 Hermes Agent

起因 事情是从一块 ESP32-S3 开发板开始的。 我手上有一块 Seeed Studio XIAO ESP32-S3 Sense,带摄像头和麦克风。最初的想法很美好:用这块板子做一个无线语音终端,对着它说话,连到我服务器上跑的 Hermes Agent(一个自托管的 AI agent),让它回答我。 但折腾到一半我突然意识到一件事:我的麦克风、音响、服务器全在家里,为什么要绕一圈用 ESP32?直接把麦克风和音响插到服务器上不就行了? ESP32 那条路(做无线拾音终端)当然也有价值,但那是"为了学嵌入式而学",不是解决问题的最短路径。于是这个项目就从"嵌入式项目"变成了"在服务器上拼一个语音助手"。这篇就记录后者。 教训零:先想清楚你要解决的是什么问题。很多时候最优解比你最初设想的简单得多。 目标

By ladydd

Kiro 的三种代理设置方法:本地、服务端、Remote

作为kiro的骨灰级用户,这篇是我自己折腾 Kiro / Kiro Remote / Ubuntu Server 代理问题后的复盘。 核心不是“怎么配一个代理”,而是先判断:到底是谁在访问外网? 谁访问外网,代理就要配给谁。 0. 先说结论 Kiro 相关代理大概分三类: 场景真正访问外网的进程在哪里代理应该配在哪里本地 KiroWindows / Mac 本机本机 Clash / Proxifier / 系统代理服务端 Kiro / CLIUbuntu Server 上的 shell、CLI、node、kiro 进程Ubuntu 的环境变量,比如 HTTP_PROXY / HTTPS_PROXYKiro Remote远程 Ubuntu 上的 ~/.kiro-server 和 extensionHost远程 Ubuntu 的 Kiro Server

By ladydd
陕公网安备61011302002223号 | 陕ICP备2025083092号