#定时任务调度系统设计
#一个定时任务的灾难
2022年,我们团队遇到了一个诡异的问题:
每天凌晨 3 点,定时任务开始执行数据清理。起初只需要 10 分钟,后来慢慢变成 30 分钟、1 小时、3 小时。
原因:数据量越来越大,但任务还是单线程执行。
更严重的是,有一次任务超时,运维手动 kill 了进程。恢复后,任务没有从断点继续,而是从头开始执行,导致数据清理重复。
定时任务系统的核心问题是:如何保证任务可靠执行、可扩展、可监控?
#二、定时任务分类🔴
#2.1 任务类型
| 类型 | 说明 | 示例 |
|---|---|---|
| 定时执行 | 固定时间点执行 | 每日对账、凌晨统计 |
| 周期执行 | 固定周期执行 | 每分钟检查、心跳 |
| 延迟执行 | 延迟一段时间后执行 | 订单超时取消 |
| 异步任务 | 提交后立即执行 | 发送通知、数据同步 |
#2.2 单机 vs 分布式
单机调度:
- 简单,但有单点故障
- 无法水平扩展
分布式调度:
- 高可用
- 支持分片
- 复杂度高#三、分布式调度架构🔴
#3.1 整体架构
┌─────────────────────────────────────────────────────────┐
│ 调度中心 Server │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 任务注册 │ │ 执行器管理 │ │ 调度引擎 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└──────────────────────────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ 执行器集群 Executor │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Executor-1 │ │ Executor-2 │ │ Executor-N │ │
│ │ (分片1,2) │ │ (分片3,4) │ │ (分片5,6) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘#3.2 任务注册
CREATE TABLE sched_job (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_name VARCHAR(100) NOT NULL,
job_group VARCHAR(100) NOT NULL,
cron_expr VARCHAR(50), -- Cron 表达式
sharding_total INT DEFAULT 1, -- 分片总数
sharding_item VARCHAR(200), -- 分片参数
executor_handler VARCHAR(200), -- 执行器 Bean 名
executor_param VARCHAR(500), -- 执行参数
status TINYINT DEFAULT 0, -- 0=停止, 1=运行
created_at TIMESTAMP,
updated_at TIMESTAMP
);#3.3 调度执行
@Service
class JobScheduler {
@Autowired
private JobDao jobDao;
@Autowired
private ExecutorServiceRegistry executorRegistry;
/**
* 调度任务
*/
@Scheduled(cron = "0/5 * * * * ?") // 每 5 秒扫描
public void schedule() {
// 1. 获取待调度的任务
List<SchedJob> jobs = jobDao.findRunnableJobs();
for (SchedJob job : jobs) {
// 2. 检查是否到达执行时间
if (shouldExecute(job)) {
// 3. 触发执行
trigger(job);
}
}
}
/**
* 触发执行
*/
private void trigger(SchedJob job) {
// 分片广播:所有执行器都执行
if (job.getShardingTotal() > 1) {
for (int i = 0; i < job.getShardingTotal(); i++) {
Executor executor = executorRegistry.getExecutor(i);
executor.execute(job, i);
}
} else {
// 随机选择一个执行器
Executor executor = executorRegistry.getAnyExecutor();
executor.execute(job, 0);
}
}
}#四、分片任务🟡
#4.1 分片原理
场景:处理 1000 万订单
不分片:
单机处理:1台机器 × 1000万 = 极慢
分片执行:
10台机器 × 每台100万 = 快 10 倍#4.2 分片实现
// 执行器接口
interface JobExecutor {
void execute(SchedJob job, int shardingIndex);
}
// 分片任务执行
@Service
class OrderCleanupExecutor implements JobExecutor {
@Autowired
private OrderDao orderDao;
@Override
public void execute(SchedJob job, int shardingIndex) {
int shardingTotal = job.getShardingTotal();
int offset = shardingIndex;
int limit = 1000;
while (true) {
// 分片查询
List<Order> orders = orderDao.findExpiredOrders(
offset, limit, shardingTotal, shardingIndex
);
if (orders.isEmpty()) {
break;
}
// 处理
for (Order order : orders) {
cleanupOrder(order);
}
offset += limit * shardingTotal;
}
}
}-- 分片查询 SQL
SELECT * FROM orders
WHERE status = 'EXPIRED'
AND id % #{shardingTotal} = #{shardingIndex}
LIMIT #{limit} OFFSET #{offset}#五、任务监控与告警🟡
#5.1 执行记录
CREATE TABLE sched_job_log (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
job_id BIGINT NOT NULL,
sharding_index INT,
executor_host VARCHAR(50),
status TINYINT, -- 0=失败, 1=成功, 2=运行中
start_time TIMESTAMP,
end_time TIMESTAMP,
duration BIGINT, -- 执行耗时 ms
error_msg TEXT,
created_at TIMESTAMP,
INDEX idx_job_status (job_id, status),
INDEX idx_start_time (start_time)
);#5.2 告警机制
@Service
class JobAlertService {
@Autowired
private AlertTemplate alertTemplate;
/**
* 任务失败告警
*/
public void alertFailure(JobLog log) {
alertTemplate.send(
AlertChannel.DINGTALK,
"任务执行失败",
String.format("任务 [%s] 执行失败\n" +
"执行器: %s\n" +
"耗时: %dms\n" +
"错误: %s",
log.getJobName(),
log.getExecutorHost(),
log.getDuration(),
log.getErrorMsg())
);
}
/**
* 任务超时告警
*/
public void alertTimeout(JobLog log, long thresholdMs) {
if (log.getDuration() > thresholdMs) {
alertTemplate.send(
AlertChannel.SMS,
"任务执行超时",
String.format("任务 [%s] 执行超时: %dms > %dms",
log.getJobName(), log.getDuration(), thresholdMs)
);
}
}
}#六、XXL-Job 使用🟡
#6.1 执行器注册
@Component
class MyJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 任务逻辑
return SUCCESS;
}
}
// 使用 @XxlJob 注解
@Component
class OrderCleanupJob {
@XxlJob("orderCleanupJob")
public ReturnT<String> cleanup() {
// 分片参数
ShardingVO shardingVO = ShardingUtil.getSharding();
int index = shardingVO.getIndex();
int total = shardingVO.getTotal();
// 业务逻辑
orderService.cleanup(index, total);
return SUCCESS;
}
}#6.2 任务配置
XXL-Job 管理后台配置:
- Cron 表达式:0 0 3 * * ?(每天凌晨3点)
- 分片总数:10
- 超时时间:30分钟
- 告警邮箱:ops@company.com【架构权衡】 定时任务系统的核心是可靠性和可扩展性。单机 Cron 适合简单场景;分布式调度适合复杂任务、需要分片执行的场景。
#七、面试总结
| 级别 | 期望回答 |
|---|---|
| P5 | 能说出分布式调度的基本架构 |
| P6 | 能说出分片任务的实现原理 |
| P7 | 有实际调度系统设计经验 |