1 背景与目标

1.1 原有架构瓶颈

原系统基于 SQLite + APScheduler 构建。在该架构中,周期性任务的调度状态(Next Run Time)存储于本地 SQLite 文件中,任务执行器(Executor)与 Web 服务进程耦合。

这种架构存在以下核心痛点:

  1. 并发能力受限:SQLite 的写锁机制导致在高频更新任务状态时极易发生 database is locked 异常,限制了系统的任务吞吐量(QPS)。

  2. 扩展性差:APScheduler 默认运行在应用进程内。当我们要增加任务执行器(Worker)时,无法简单地水平扩展,因为多个进程同时操作 SQLite 会加剧锁竞争,且容易导致任务重复触发或漏触发。

  3. 缺乏任务缓冲:任务一旦触发立即执行,若业务逻辑(IO 密集型)阻塞,会直接卡死调度线程,导致后续任务积压。

  4. 运维困难:缺乏独立的队列积压视图、重试队列和死信队列,难以排查“某任务为何没执行”或“某任务为何失败”。

1.2 目标架构需求

为了支撑 百万级周期性任务 的高并发调度需求,新系统需满足:

  • 高吞吐调度:支持每秒触发数千个任务。

  • 水平扩展:Executors(消费者)可部署在多台机器/多个容器中,无状态运行。

  • 可靠性:任务不丢、不漏、即使消费者崩溃也能恢复(At-Least-Once)。

  • 故障隔离:某类任务的慢执行不应阻塞其他轻量级任务。

  • 可观测性:清晰的任务状态流转日志(Created -> Scheduled -> Queued -> Running -> Finished/Retried)。

1.3 技术选型:为何选择 Redis Streams

在去 SQLite 化的过程中,我们选择了 Pure Redis 方案,而非引入 Kafka/RabbitMQ。

  • 轻量级统一:Redis 既作数据库(Hash/ZSet 存任务元数据),又作消息队列(Streams),无需维护两个组件。

  • Streams 优势:相比 Redis List,Streams 支持 In-Flight 状态管理(PEL)和 Consumer Groups,能完美实现“崩溃恢复”和“消息确认”机制。

  • ZSet 延迟调度:Redis ZSet (Sorted Set) 天然适合做延迟队列(Score=NextRunTimestamp),实现精准的秒级调度。

Redis Streams:Consumer Groups 与 PEL(In-Flight 状态管理)

Redis Streams 是 Redis 5.0 引入的持久化消息队列,核心优势在于通过 Consumer Groups(消费者组)PEL(Pending Entries List,待处理条目列表) 实现了可靠的消息消费与故障恢复,完美匹配“任务不丢、不漏、可水平扩展”的需求。

1)Consumer Groups(消费者组)

  • 核心作用:将多个消费者(Executor)组织为一个“组”,共同消费 Stream 中的消息,实现负载均衡(每条消息仅发送给组内一个消费者)和状态共享

  • 关键机制

    • 组内消费者通过 XREADGROUP 命令抢占式获取消息,避免重复消费;

    • 支持动态添加/删除消费者,轻松实现 Executor 的水平扩展。

2)PEL(Pending Entries List)

  • 核心作用:管理“已发送但未确认”的消息(In-Flight 状态),是实现“At-Least-Once(至少一次送达)”的关键。

  • 工作流程

    1. 消费者通过 XREADGROUP 获取消息后,消息会被暂存到 PEL 中;

    2. 消费者成功执行任务后,调用 XACK 命令确认,消息从 PEL 中移除;

    3. 若消费者崩溃,未确认的消息会一直留在 PEL 中;其他消费者可通过 XCLAIM 命令“认领”这些超时未确认的消息,继续执行,避免任务丢失。

具体使用:

  • 任务触发后写入 Stream,Consumer Group 分配给不同 Executor 消费;

  • Executor 执行完任务调用 XACK 确认,崩溃则通过 XCLAIM 转移任务,既保证不丢任务,又避免重复触发。

Redis ZSet(Sorted Set):延迟调度的核心

Redis ZSet 是一种有序集合,每个元素(Member)都关联一个“分数(Score)”,Redis 会自动按 Score 对元素进行排序。这种特性让它天然适合做延迟队列周期性任务调度

1)核心结构

  • Member:存储任务的唯一标识(如 Task ID);

  • Score:存储任务的“下一次执行时间戳”(Next Run Timestamp)。

2)延迟调度实现流程

  1. 添加任务:通过 ZADD 命令将任务写入 ZSet,Score 设为任务的下一次执行时间戳;

  2. 扫描到期任务:调度器通过 ZRANGEBYSCORE 命令,获取 Score ≤ 当前时间戳的所有任务(即已到期的任务);

  3. 触发与更新

    • 到期任务被取出后,写入 Redis Streams 等待消费;

    • 若是周期性任务,计算下一次执行时间戳,通过 ZADD 更新该任务的 Score,实现循环调度。

具体使用:

  • 替代原 SQLite 存储“Next Run Time”,利用 ZSet 的有序性实现秒级精准调度

  • Redis 的高性能读写避免了 SQLite 写锁问题,支撑每秒数千次的任务扫描与触发。

2 系统架构概览 (System Architecture Overview)

2.1 整体拓扑图

系统分为三层架构:API 层调度层执行层

graph TD User[用户/上游系统] -->|HTTP POST| API["API (Gunicorn)"] subgraph "Redis Cluster 数据中心" TaskData[Hash: 任务元数据] ScheduleIdx[ZSet: 调度索引] RetryIdx[ZSet: 重试索引] QueueStream[Stream: 待执行队列] DLQ[Stream: 死信队列] AuthNonce[String: 防重放Nonce] end subgraph "Worker Nodes 执行集群" Producer[Producer 线程] Consumer[Consumer 线程池] RetryMgr[Retry Manager] end API -->|Write| TaskData API -->|Write| ScheduleIdx Producer -- (1) Poll (每秒) --> ScheduleIdx Producer -- (2) Push --> QueueStream Producer -- (3) Update NextRun --> TaskData Consumer -- (1) Pop (Group Read) --> QueueStream Consumer -- (2) Execute Tasks --> TasksLogic[业务逻辑] TasksLogic -->|Logs| DiskLog[磁盘日志] Consumer -- Success --> Ack[XACK + XDEL] Consumer -- Fail --> RetryMgr RetryMgr -->|Delay < Max| RetryIdx RetryMgr -->|Delay > Max| DLQ

2.2 核心组件角色

  1. API Server:

    • 负责接收用户 HTTP 请求(创建/停止/查询/清空)。

    • 无状态:不运行任何后台调度任务。

    • 逻辑:校验参数 -> 写入 task:* Hash -> 写入/移除 schedule:zset

  2. Scheduler / Producer:

    • 系统中的“节拍器”。

    • 逻辑:ZREVRANGEBYSCORE 扫描 schedule:zsetAccessTime <= Now 的任务 -> 封装消息 -> XADD 投递到 queue:{type} -> 计算下一次运行时间 -> 更新 task:*schedule:zset

    • 特性:单线程(或多实例抢占),负责将“静止的任务数据”转化为“流动的队列消息”。

  3. Consumers (Workers):

    • 系统中的“工人”。

    • 按任务类型分组,每组有独立的 StreamConsumer Group

    • 逻辑:XREADGROUP 获取任务 -> 调用任务特定的业务逻辑 -> 输出结果 -> XACK 确认。

  4. Retry Manager:

    • 负责容错。

    • 逻辑:当消费者捕获异常时,计算重试次数。若未达上限,计算退避时间放入 retry:zset;若达上限,移入 dlq:*(死信队列)。

2.3 数据流向

一个典型的任务生命周期:

  1. Created: API 收到请求,创建 task:uuid,写入 schedule:zset (Score=Now)。

  2. Scheduled: Producer 扫描到该任务,将其从 Scheduler 搬运到 Stream Queue。

  3. Queued: 消息在 Redis Stream 中排队。

  4. Processing: Consumer 通过 Group 读走消息,消息进入 PEL (Pending Entries List)。

  5. Completed:

    • Success: 业务日志落盘,执行 XACK + XDEL

    • Failed: 进入 Retry 流程,重新进入 retry:zset 等待下一次调度。

3 Redis 数据结构设计 (Redis Data Structures)

本系统完全摒弃了关系型数据库,所有状态在 Redis 中闭环。

3.1 任务元数据 (Task Hash)

  • Key: task:{task_id} (e.g., task:58b56a49...)

  • Structure: Hash

  • Fields:

    • task_id: 唯一标识

    • type: 任务类型

    • 自定义字段: JSON list

    • interval_sec: 周期 (e.g., 60)

    • status: 状态 (active / stopped)

    • retry_count: 当前连续重试次数

    • next_run_at_ms: 下一次理论调度时间戳

    • updated_at_ms: 最后更新时间

3.2 调度索引 (Schedule ZSet)

  • Key: schedule:zset

  • Structure: Sorted Set

  • Member: task_id

  • Score: NextRunTimestamp (毫秒)

  • 作用: 按时间顺序存储所有活跃任务。Producer 每次只需查询 Score <= Now 的成员即可,效率极高 (O(logN))。

3.3 消息队列 (Stream Queues)

  • Key: `queue:{task_type}

  • Structure: Stream

  • Fields (Message Body):

    • task_id, schedule_id, retry_attempt

    • 一些必要字段 (冗余存储,避免消费者反查 Hash 导致竞态或性能损耗)

    • enqueue_at_ms

  • 特点: 利用 Redis Streams 持久化特性,支持多消费者组并行消费。

3.4 重试与死信

  • Retry ZSet: retry:zset

    • 结构同 schedule:zset,用于存放处于“等待重试冷却中”的任务。

  • Task Retry State: retry:task:{task_id} (Hash)

    • 临时存储重试过程中的上下文(如上次错误信息)。

  • Dead Letter Queue: dlq:{task_type} (Stream)

    • 存放超过最大重试次数的任务,供人工审计。

4 可靠性与一致性 (Reliability & Consistency)

4.1 至少一次投递 (At-Least-Once Delivery)

系统设计目标是不丢消息。

  • Producer: 只有在 XADD 成功后才更新调度时间。若 XADD 失败,Producer 会抛异常,任务保留在 schedule:zset 中(Score 不变),下一轮 Poll 会再次尝试投递。

  • Consumer: 只有在业务逻辑执行完毕后才 XACK。若 Crash,消息保留在 Stream PEL 中。

4.2 幂等性保障 (Idempotency)

由于“至少一次”可能导致重复执行(这也是分布式系统的权衡),我们需要幂等性设计。

  • 业务幂等: 虽然任务执行操作本身是幂等的(在本项目场景下,任务重复执行无副作用),但我们仍然需要防止重复执行

  • 去重策略:

    • 在 Stream 层面,Consumer Group 保证了同一消息不被并发消费。

    • 在应用层面,业务逻辑执行结果包含精确的 timestamp,下游分析系统可根据 task_id + timestamp 去重。

4.3 重试策略与指数退避 (Exponential Backoff)

4.3.1 触发条件

  • 消费者捕获到业务逻辑抛出的异常。

4.3.2 重试流程

  1. On Failure: Consumer catch Exception -> 调用 retry_manager.on_failure

  2. 计算退避:

    • Delay = BaseDelay * 2^(Attempt-1)

    • e.g., 10s -> 20s -> 40s -> … (Max 300s).

  3. 入池:

    • 写入 retry:task:{id} (Hash) 保存现场(必要字段+error message)。

    • 写入 retry:zset (ZSet) Score = Now + Delay

    • 注意:必须先 ACK 原消息。因为重试是生成新消息并在未来执行,原消息已完成“失败处理”使命。

  4. 再调度:

    • Producer (或 RetryDispatcher) 扫描 retry:zset

    • 到期后,将任务重新封装特定消息(含 retry_attempt 计数)投递回 queue:{type}

4.4 死信队列处理 (Dead Letter Queue)

retry_count > MAX_ATTEMPTS (默认 3 次) 时:

  1. 不再写入 retry:zset

  2. 将任务上下文写入 dlq:{type} (Stream)。

  3. 记录 retry_to_dlq 日志。

5 结语 (Conclusion)

本次架构升级成功将单体 SQL 系统演进为基于 Redis Streams 的分布式高并发系统。

  • 性能: 消除数据库锁瓶颈。

  • 可靠性: 引入 ACK、Retry、DLQ 机制,确保任务执行的确定性。

  • 可视性: 元数据、队列、日志三位一体,状态透明。

  • 安全: 修复了攻击风险。

声明:本文使用 AI 辅助撰写