ElasticJob 分布式调度原理

候选人小刘在面试某电商公司时,面试官看了他的项目经验中"负责数据同步任务",问道:

"你们数据同步用了什么方案?定时任务怎么分片的?"

小刘说:"我们用的 XXL-JOB 做定时任务,分片...就是按 ID 取模分的。"

面试官追问:"那 ElasticJob 了解吗?它的分片是怎么实现的?"

小刘说:"好像用到了 ZooKeeper..."

面试官追问:"ZooKeeper 在里面具体干什么?重新分片的时候发生了什么?"

小刘彻底卡住了。

【面试官心理】 这道题我用来区分"用过调度框架"和"理解过分布式协调"的候选人。ElasticJob 的设计比 XXL-JOB 更轻量,但 ZooKeeper 的引入让它在分布式协调方面更强大。能讲清楚 ZooKeeper 临时节点和主节点选举的,至少对分布式协调有基本认知。

一、为什么需要 ElasticJob 🔴

1.1 XXL-JOB 和 Quartz 的瓶颈

在分布式场景下,XXL-JOB 和 Quartz 都有各自的局限:

// XXL-JOB 的问题:
// 1. 调度中心是单点(社区版),挂了就没有新任务触发了
// 2. 路由策略虽然是"分片广播",但分片参数是由调度中心计算的
//    执行器只是被动接收,没有真正的"自主分片"能力
// 3. 没有 ZooKeeper,所有协调都依赖数据库

// Quartz 集群的问题:
// 1. 通过数据库锁抢任务,节点多了锁竞争严重
// 2. 没有分片能力,只能通过业务代码自己写分片逻辑
// 3. 数据库成为性能瓶颈

1.2 ElasticJob 的设计哲学

ElasticJob 的核心理念:让每个分片任务尽可能均匀地分布到各个节点,并且能够动态扩缩容。

graph TD
    A["ZooKeeper\n集群"] -->|"注册临时节点\n选举主节点"| B["ElasticJob Lite\n无中心化架构"]
    B --> B1["节点1\n分片0,1"]
    B --> B2["节点2\n分片2,3"]
    B --> B3["节点3\n分片4"]
    B -->|"扩缩容时\n重新分片"| B1
    B -->|"重新分片"| B2
    B -->|"重新分片"| B3

ElasticJob 分为两个版本:

  • ElasticJob-Lite:轻量级,无中心化架构,嵌入应用进程
  • ElasticJob-Cloud:云原生版本,支持更复杂的资源调度(Mesos)

这里重点讲 ElasticJob-Lite

二、分片机制核心原理 🔴

2.1 分片键的定义

// ElasticJob 的分片通过 shardingItemParameters 配置
// 每个分片用逗号分隔,格式为:分片序号=描述

@JobSharding("${myjob.shardingItems}")
public class MyJob implements SimpleJob {

    @Override
    public void execute(ShardingContext shardingContext) {
        // shardingContext.getShardingItemList() 返回当前节点负责的分片序号
        // 比如 [0, 1] 表示当前节点负责分片0和分片1
        List<Integer> shards = shardingContext.getShardingItemList();
        for (Integer shard : shards) {
            // 根据分片序号处理不同的数据范围
            processByShard(shard);
        }
    }
}

// 配置文件
myjob:
  shardingItemParameters: |
    0=北京,天津,河北
    1=上海,江苏,浙江
    2=广东,福建,海南

2.2 默认分片策略:AverageAllocationJobShardingStrategy

// ElasticJob 默认的分片策略
// 核心逻辑:将 n 个分片均匀分配给 m 个节点

// 示例:3 个分片,2 个节点
// 节点1:分片 0
// 节点2:分片 1, 2

// 示例:4 个分片,3 个节点
// 节点1:分片 0, 3
// 节点2:分片 1
// 节点3:分片 2

// 关键代码逻辑:
public static Map<JobInstance, List<Integer>> sharding(
        List<JobInstance> jobInstances, // 在线节点列表
        int shardingTotalCount,         // 总分片数
        String shardingItemParameters) {// 分片参数

    Map<JobInstance, List<Integer>> result = new LinkedHashMap<>();

    // 先将每个分片平均分配
    for (int i = 0; i < shardingTotalCount; i++) {
        int jobInstanceIndex = i % jobInstances.size();  // 取模分配
        List<Integer> shardingItems = result.getOrDefault(
            jobInstances.get(jobInstanceIndex), new ArrayList<>());
        shardingItems.add(i);
        result.put(jobInstances.get(jobInstanceIndex), shardingItems);
    }

    return result;
}

平均分配的缺陷:

// ❌ 问题:按平均分配,如果分片0是热点数据
// 节点1只拿到1个分片,节点2拿到2个分片
// 但节点1的分片0数据量是节点2的10倍
// 负载严重不均

// ✅ 解决方案:自定义分片策略
public class HotRegionShardingStrategy implements JobShardingStrategy {

    @Override
    public Map<JobInstance, List<Integer>> sharding(
            List<JobInstance> jobInstances,
            JobConfiguration jobConfiguration) {

        // 根据数据量动态调整分片分配
        // 比如:热点地区(北上广)分配更多分片
        // 非热点地区(西藏/新疆)分配更少分片

        Map<JobInstance, List<Integer>> result = new HashMap<>();
        // 自定义逻辑...
        return result;
    }
}

2.3 分片广播 vs 分片执行

ElasticJob 的分片广播和其他调度框架不同:

// ElasticJob 的分片是"每个节点只执行自己分配到的分片"
// 不存在"所有节点都执行所有分片"的模式

// 场景:10个分片,3个节点
// 节点1:分片 0, 3, 6, 9
// 节点2:分片 1, 4, 7
// 节点3:分片 2, 5, 8

// 每个节点只查询并处理自己分片范围内的数据
// 这就天然避免了数据重复处理的问题

// 典型的分片查询:
@Bean
public DataSource dataSource() {
    // ...
}

public class OrderSyncJob implements SimpleJob {
    @Override
    public void execute(ShardingContext shardingContext) {
        List<Integer> shards = shardingContext.getShardingItemList();
        int shardingTotal = shardingContext.getShardingTotalCount();

        for (Integer shard : shards) {
            // 按分片分页查询
            // SELECT * FROM orders
            // WHERE status = 0
            // AND mod(id, #{shardingTotal}) = #{sharding}
            // LIMIT #{pageSize} OFFSET #{offset}

            List<Order> orders = orderMapper.selectBySharding(shard, shardingTotal);
            for (Order order : orders) {
                syncOrder(order);
            }
        }
    }
}

三、ZooKeeper 协调原理 🔴

3.1 ZooKeeper 在 ElasticJob 中的角色

这是 ElasticJob 和 XXL-JOB 最大的架构差异。

// ElasticJob 使用 ZooKeeper 做三件事:
// 1. 注册节点:每个执行器启动时在 ZooKeeper 创建临时节点
// 2. 主节点选举:多个执行器竞争创建同一个临时节点,成功者为主节点
// 3. 分片信息存储:分片分配结果存储在 ZooKeeper 的持久化节点中
graph TD
    ZK["ZooKeeper\n集群"] --> N1["/elasticjob/\njobs/order-job"]
    N1 --> N2["/instances/\n192.168.1.1@-@12345"]
    N1 --> N3["/instances/\n192.168.1.2@-@12346"]
    N1 --> N4["/instances/\n192.168.1.3@-@12347"]
    N1 --> N5["/leader/\nelection/instance"]
    N5 --> N6["选举为主节点"]
    N1 --> N7["/sharding/\n0/instance"]
    N1 --> N8["/sharding/\n1/instance"]
    N1 --> N9["/sharding/\n2/instance"]

3.2 临时节点与主节点选举

// ElasticJob 的主节点选举使用 ZooKeeper 的临时顺序节点
// 核心逻辑:谁先创建节点,谁就是主节点

// 执行器启动时
public class JobInstance {
    private final String instanceId; // 格式:IP@JobName@随机数

    public void register() {
        // 在 ZooKeeper 中创建临时节点
        // 路径:/elasticjob/{jobName}/leader/election/{instanceId}
        // ZooKeeper 保证同一时刻只有一个节点能创建成功

        // 成功创建 -> 当前节点为主节点
        // 创建失败 -> 当前节点为从节点,注册 watcher 监听主节点变化
    }
}

// 主节点的职责:
// 1. 触发重新分片
// 2. 转移分片(节点下线时)
// 3. 清理过期的任务执行记录
// 4. 处理 Misfire

3.3 分片信息的存储与监听

// 分片信息存储在 ZooKeeper 的持久化节点中
// 路径:/elasticjob/{jobName}/sharding/{分片序号}/instance
// 内容:{instanceId}

// 重新分片流程(主节点触发):
// 1. 先将所有分片标记为 DISABLED(禁用)
// 2. 等待正在执行的任务完成(最多等待 10 分钟)
// 3. 根据新的节点数量,重新计算分片分配
// 4. 更新 ZooKeeper 中的分片节点
// 5. 各节点收到变更通知,自动切换到新的分片

// 关键:先禁后启,确保数据一致性
⚠️

重新分片期间,所有任务执行会被暂停。如果你的业务要求7x24小时不间断,这个"先禁后启"的过程可能导致数据处理延迟。需要评估重新分片的频率和业务容忍度。

3.4 ZooKeeper 节点下线与故障转移

// 节点主动下线:
// 1. 执行器调用 scheduler.shutdown()
// 2. ZooKeeper 临时节点自动删除
// 3. 主节点检测到节点消失,重新分配分片

// 节点异常下线(断网/进程崩溃):
// 1. ZooKeeper 临时节点在 session 超时后自动删除(默认 30 秒)
// 2. 主节点通过心跳检测发现节点消失
// 3. 重新分配该节点的分片

// 分片转移时,丢失中的任务怎么办?
// ElasticJob 不保证任务不丢失
// 建议:
// 1. 任务本身要做幂等
// 2. 使用带事务的数据库更新(状态机)
// 3. 或者接受"最多处理一次"的语义

四、弹性扩缩容 🟡

4.1 动态增加节点

// 场景:双十一零点,从 3 个节点扩展到 10 个节点

// 步骤:
// 1. 启动新的执行器进程
// 2. 新节点在 ZooKeeper 注册临时节点
// 3. 现有节点通过 watcher 感知到新节点
// 4. 主节点触发重新分片
// 5. 每个节点重新分配到更少的分片
// 6. 处理压力大幅降低

// 扩缩容的时机由主节点控制
// 不是每次节点变化都触发重新分片
// 有以下配置可以控制:

// shardingStrategyType: 分片策略类型
// reconcileIntervalMinutes: 重新分片的检查间隔(默认 10 分钟)
// maxTimeDiffSeconds: 最大时间误差(超过这个误差会触发重新分片,默认 -1 表示不检查)

4.2 数据倾斜问题

// ❌ 问题:静态分片参数无法应对数据热点
// 固定的分片参数:
// 0=北京,天津,河北
// 1=上海,江苏,浙江
// 2=广东,福建,海南

// 但广东的数据量是其他省份的10倍!
// 分片2永远是最慢的那个

// ✅ 解决方案1:按数据量动态分片
// 在 Job 中实时计算每个分片的数据量
// 通过 ZooKeeper 协调动态调整

// ✅ 解决方案2:自定义分片参数
// 使用时间范围或 ID 范围作为分片键
// 比如:按日期分片 0=2024-01, 1=2024-02, 2=2024-03

五、作业监控与事件追踪 🟡

5.1 JobEventConfiguration

// ElasticJob 支持作业执行事件的追踪
// 通过 JobEventConfiguration 配置事件总线

@Configuration
public class ElasticJobConfiguration {

    @Autowired
    private DataSource dataSource;

    @Bean
    public ElasticJobConfiguration elasticJobConfiguration() {
        return reg -> reg
            .jobScheduler("myJob",
                new SpringJobScheduler(
                    new MyJob(),
                    createDataflowJobConfiguration(),
                    createJobEventConfig()))
            ..globalConfiguration(
                new RegistryCenterConfiguration(
                    new ZooKeeperRegistryCenter(zkConfig)));
    }

    @Bean
    public JobEventConfiguration createJobEventConfig() {
        // 使用数据库存储作业事件
        return new JobEventRdbConfiguration(dataSource);
    }
}

// JobEvent 包含的事件类型:
// - CLOUD_JOB_EXECUTION_STATISTICS:作业执行统计
// - CLOUD_JOB_STATUS_TRACE:作业状态追踪
// - CLOUD_JOB_MISFIRE:Misfire 记录
// - CLOUD_JOB_STATUS_TRACE_LISTENER:状态变更监听

5.2 监控数据的使用

// 作业执行完成后的统计数据:
// - 作业名称、执行节点、分片序号
// - 开始时间、结束时间、执行时长
// - 执行结果(成功/失败)、错误信息
// - Misfire 次数、错过触发次数

// 可以用于:
// 1. 告警:执行失败超过阈值自动告警
// 2. 告警:执行时间过长(比如超过预期5倍)自动告警
// 3. 监控:实时查看作业执行状态和分布
// 4. 审计:记录每个作业的执行历史

六、ElasticJob vs XXL-JOB 🟡

维度ElasticJob LiteXXL-JOB
架构无中心化(依赖 ZooKeeper)中心化(调度中心 + 执行器)
分片协调ZooKeeper 实时协调调度中心计算,HTTP 触发
节点注册ZooKeeper 临时节点调度中心数据库注册
主节点ZooKeeper 选举调度中心本身
依赖需要 ZooKeeper 集群需要 MySQL
运维复杂度高(ZooKeeper 集群)低(MySQL + Web 服务)
分片粒度节点级别(每个节点知道自己的分片)任务级别(调度中心分配)
适用规模中大型(已有 ZooKeeper)中小型(运维简单)

【面试官心理】 问到 ElasticJob vs XXL-JOB 的候选人,说明他有技术选型的经验。我会继续追问:"如果让你们在 ElasticJob 和 XXL-JOB 之间选,你们选哪个?为什么?"能权衡出"运维复杂度"和"功能完整性"的候选人,通常有实际的架构决策经验。

七、常见翻车现场 🔴

❌ 翻车点一:重新分片导致数据重复

// ❌ 错误:没有考虑重新分片时的状态一致性
@Bean
public SimpleJob myJob() {
    return shardingContext -> {
        List<Integer> shards = shardingContext.getShardingItemList();
        // 每次分片变化后,都从头开始处理
        // 没有记录处理位置,导致重新分片后数据重复
        List<Order> orders = orderMapper.selectAll();
        for (Order order : orders) {
            syncOrder(order);
        }
    };
}

// ✅ 正确:使用分片上下文中的偏移量,或者数据库状态标记
@Bean
public SimpleJob myJob() {
    return shardingContext -> {
        List<Integer> shards = shardingContext.getShardingItemList();
        // 使用分片参数中的范围,避免重复
        // 或者通过数据库状态(status=1 表示已处理)保证幂等
        long offset = getOffsetFromRedis(shardingContext.getShardingParameter());
        List<Order> orders = orderMapper.selectByShardingWithOffset(shards, offset);
        for (Order order : orders) {
            syncOrder(order);
            updateOffset(order.getId());
        }
    };
}

❌ 翻车点二:ZooKeeper 集群故障导致调度不可用

// ❌ 错误:单 ZooKeeper 集群,没有高可用
// 如果 ZooKeeper 集群不可用
// 所有 ElasticJob 节点都无法获取分片信息
// 整个调度系统瘫痪

// ✅ 正确:ZooKeeper 集群高可用
// ZooKeeper 集群至少 3 个节点,部署在不同的机架/机房
// 配置重试策略
RegistryCenterConfiguration config = new RegistryCenterConfiguration();
config.setConnectionTimeoutMilliseconds(3000);
config.setSessionTimeoutMilliseconds(60000);
config.setMaxRetries(3);