news 2026/5/3 5:05:59

自己写一个分布式定时任务框架+负载均衡+OpenAPI异步调用!

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
自己写一个分布式定时任务框架+负载均衡+OpenAPI异步调用!

项目背景

目前的定时任务框架已经很成熟,从QuartZ到xxl-job,再到近几年出现的PowerJob,既然有这么多的好的实现,为什么还是选择重写一个定时任务框架呢?

开发中遇到这样的场景,业务层面需要频繁的创建修改定时任务,在考虑分布式的架构下,对于目前可以实现该功能的框架中:

  • MQ的延时队列无法动态调整任务参数;

  • redis的过期策略需要保存太久的key且可能会有BigKey

  • xxljob没有原生的openAPI,其基于数据库锁的调度只是实现server的高可用而不是高性能;

  • powerjob的openAPI又是基于http的同步阻塞调度,并且对于server的负载均衡,由于其分组隔离设计,需要开发者手动配置,在高并发下的定时任务操作下,并不能很好的调度server集群。

主流框架往往为了适配更多的场景,支持足够多的功能,往往体积大,且不易动态扩展,为了对项目有最大的控制,在解决以上业务场景的前提下,进行部分功能的修剪,也希望能更好的从中学习主流框架的设计思想,于是决定重写一个定时任务框架。

本文章主要介绍该项目相对于目前主流定时任务框架的特性,对于定时任务调度和发现的详细可以见源码,文章末尾也给出了流程图方便理解(关于这部分作者对于PowerJob的原先设计也做了部分剪枝,相对于原来的框架更易理解和学习,后面可能会推出相关讲解)

项目地址:

https://github.com/karatttt/k-job

定位

这是一个基于 PowerJob 的重写和重构版本,修改和扩展了原始项目的功能,以更好地适配业务需求。

  • 支持定时任务频繁创建和任务参数频繁动态变动的场景(提供轻量API,并使用内置消息队列异步处理)

  • 支持大量定时任务并发执行的场景,实现负载均衡(分组隔离+应用级别的锁实现)

  • 主要针对小型任务 ,无需过多配置,不对任务实例进行操作

技术选型

通信 : gRPC(基于netty的nio) 序列化 : Protobuf编码格式编解码 负载均衡 :自己实现的注册中心NameServer |___ 策略 : 服务端最小调度次数策略 |___ 交互 :pull+push 消息队列 : 自己实现的简易消息队列 |___ 消息发送 : 异步+超时重试 |___ 持久化 :mmap+同步刷盘策略 |___ 消息重试 :多级延时队列+死信队列 定时调度 : 时间轮算法

项目结构

├── LICENSE ├── k-job-common // 各组件的公共依赖,开发者无需感知 ├── k-job-nameServer // server和worker的注册中心,提供负载均衡 ├── k-job-producer //普通Jar包,提供 OpenAPI,内置消息队列的异步发送 ├── k-job-server // 基于SpringBoot实现的调度服务器 ├── k-job-worker-boot-starter // kjob-worker 的 spring-boot-starter ,spring boot 应用可以通用引入该依赖一键接入 kjob-server ├── k-job-worker // 普通Jar包,接入kjob-server的应用需要依赖该Jar包 └── pom.xml

特性

负载均衡(解决大量定时任务并发执行场景)

对于worker的负载均衡策略有许多且已经由较好的解决(轮询,健康值等),但是,我们目前的系统存在大量的定时任务,考虑server层面,可能会存在以下情况:

  • server一次调度从DB中获取太多任务,可能会OOM

  • 发起调度请求是由线程池负责,可能会有性能瓶颈,我们的系统对时间是敏感的,对时间精度高要求

  • 我们的OpenAPI同样也不希望大量请求落在同一个server上

在分布式系统下,解决定时任务并发执行往往考虑server集群的负载均衡(这里的负载均衡特指server集群能够根据自身负载,动态调度worker集群),但是对于定时任务框架,需要关注集群下的任务重复调度问题,目前的定时任务框架大都为了解决该问题而不能较好实现负载均衡

通过查看源码,xxljob的调度,在每次查询数据库获取任务前,通过数据库行锁进行了全局加锁,保证同一时刻只有一个server在进行调度来避免重复调度,但是无法发挥集群server的调度能力

对于powerjob的调度,通过分组隔离机制(详细可以看官方文档)避免了重复调度,但是同样带来了问题:同一app下的worker集群只能被一台server调度,如果该server的任务太多了呢?如果只有一个业务对应的app,如何用server集群来负载均衡呢?

基于以上问题,增加了一个注册中心nameServer模块来负责负载均衡:

最小调度次数策略:NameServer记录server集群状态并维护各个server的分配任务次数,由于server是否调度某个worker由表中数据决定,worker会在每次pull判断是否发起请求更新server中的调度关系表,并将目前分组交由最小调度次数的server来调度,当且仅当以下发生:

  • 同一app分组下的workerNum > threshold

  • 该分组对应的server的scheduleTimes > minServerScheduleTime x 2

考虑到server的地理位置,通信效率等因素,后续可以考虑增加每个server的权重来更优分配

关键代码如下:

public ReBalanceInfo getServerAddressReBalanceList(String serverAddress, String appName) { // first req, serverAddress is empty if(serverAddress.isEmpty()){ ReBalanceInfo reBalanceInfo = new ReBalanceInfo(); reBalanceInfo.setSplit(false); reBalanceInfo.setServerIpList(new ArrayList<String>(serverAddressSet)); reBalanceInfo.setSubAppName(""); return reBalanceInfo; } ReBalanceInfo reBalanceInfo = new ReBalanceInfo(); // get sorted scheduleTimes serverList List<String> newServerIpList = serverAddress2ScheduleTimesMap.keySet().stream().sorted(new Comparator<String>() { @Override public int compare(String o1, String o2) { return (int) (serverAddress2ScheduleTimesMap.get(o1) - serverAddress2ScheduleTimesMap.get(o2)); } }).collect(Collectors.toList()); // see if split if(!appName2WorkerNumMap.isEmpty() && appName2WorkerNumMap.get(appName) > maxWorkerNum && appName2WorkerNumMap.get(appName) % maxWorkerNum == 1){ // return new serverIpList reBalanceInfo.setSplit(true); reBalanceInfo.setChangeServer(false); reBalanceInfo.setServerIpList(newServerIpList); reBalanceInfo.setSubAppName(appName + ":" + appName2WorkerNumMap.size()); return reBalanceInfo; } // see if need change server Long lestScheduleTimes = serverAddress2ScheduleTimesMap.get(newServerIpList.get(newServerIpList.size() - 1)); Long comparedScheduleTimes = lestScheduleTimes == 0 ? 1 : lestScheduleTimes; if(serverAddress2ScheduleTimesMap.get(serverAddress) / comparedScheduleTimes > 2){ reBalanceInfo.setSplit(false); reBalanceInfo.setChangeServer(true); // first server is target lest scheduleTimes server reBalanceInfo.setServerIpList(newServerIpList); reBalanceInfo.setSubAppName(""); return reBalanceInfo; } // return default list reBalanceInfo.setSplit(false); reBalanceInfo.setServerIpList(new ArrayList<String>(serverAddressSet)); reBalanceInfo.setSubAppName(""); return reBalanceInfo; }
实现功能:
  • app组自动拆分:可以为app设置组内worker数量阈值,超过阈值自动拆分subApp并分配负载均衡后的server

  • worker动态分配:对于每一个subApp,当触发pull时,根据最小调度次数策略,可以分配至负载均衡后的server,开发者无需感知subApp

以上,解决PowerJob中同一worker分组只能被一个server调度问题,且subApp分组可以根据server的负载,实现动态依附至不同server,对于可能的重复调度问题,我们只需加上App级别的锁,相对于xxl-job的全局加锁性能更好。

消息队列(解决任务大量创建和频繁更改场景)

其实一开始用powerjob作为项目中的中间件,业务中的任务操作使用其openAPI。过程中感受最大的就是,我的业务只是根据任务id修改了任务参数,并不需要server的响应,为什么要同步阻塞?可靠性应由server保证而不是客户端的大量重试及等待。对于业务中频繁创建定时任务和改动,更应是异步操作。

一开始的想法是,使用grpc的futureStub进行异步发送,请求由Reactor线程监听事件,当事件可读时分配给业务线程池进行处理(gRPC内部已经实现)。所以需要做的似乎只是做一个Producer服务,并把stub全换成Future类型,对于jobId,我们用雪花算法拿到一个全局id就可以,无需server分配。

但是以上设计有一个致命的问题------阻塞在BlockingQueue的请求无法ack,且server宕机存在消息丢失的可能!这违背了消息队列的设计(入队--ack--持久化--消费),意味着只有被分配到线程(消费者)消费时,才能被ack,而活跃的线程数并不多。故不能仅仅依赖gRPC的内部实现,需要自己实现消息队列

可靠消息

以rocketMQ为例,producer的消息会先到达broker中的队列后返回ack,consumer再轮询从broker中pull重平衡处理后的消息消费。

考虑到本项目的设计无需路由,所有的server都可以接受消息,于是不再设计broker,将server和broker结合,每个server维护自己的队列,且消费自己队列的消息,这样还能减少一次通信。

这样可靠消息的解决就变成了:

  • producer到server的消息丢失------失败或者超时则依次遍历所有的server,一定能保证消息抵达,不再阐述

  • server的队列消息丢失(机器宕机)------持久化,采用同步刷盘策略,百分之百的可靠

持久化:同步刷盘机制借鉴了rocketMQ的mmap和commitLog/consumerQueue设计,将磁盘的文件映射到内存进行读写,每次消息进来先存到buffer后触发刷盘,成功后执行写响应的回调;用consumerQueue文件作为队列,server定时pull消费消息,详细见k-job-server.consumer.DefaultMessageStore,有详细注释

// 和rocketMQ一样,读写都是用mmap,因为内存buffer就是文件的映射,只是有刷盘机制 private MappedByteBuffer commitLogBuffer; // 映射到内存的commitlog文件 private MappedByteBuffer consumerQueueBuffer; // 映射到内存的consumerQueue文件 private final AtomicLong commitLogBufferPosition = new AtomicLong(0);// consumerLog的buffer的位置,同步刷盘的情况下与consumerLog文件的位置保持一致 private final AtomicLong commitLogCurPosition = new AtomicLong(0);// consumerLog文件的目前位置,每次刷盘后就等于buffer位置 private final AtomicLong lastProcessedOffset = new AtomicLong(0);// consumerQueue的buffer拉取commitLog的位置,与commitLog相比,重启时就是consumerQueue文件最后一条消息的索引位置 private final AtomicLong currentConsumerQueuePosition = new AtomicLong(0); // consumerQueue文件的目前位置 private final AtomicLong consumerPosition = new AtomicLong(0); // 记录消费者在consumerQueue中的消费位置,这个只在目前的系统中有,类似于rocketMQ通过pull远程拉取
消息重试

对于producer,前面提到,为了应对大量定时任务的场景,对于任务的操作,应全部是异步的,我们引入超时机制即可,当超过一定的时间未收到ack,或者返回错误响应,选择下一个server发起重试

对于consumer(server),使用多级延时队列,当某个消息消费失败后,投递至下一级延迟更久的延时队列,若全都消费失败则进入死信队列,需要人工干预

private staticfinal Deque<MqCausa.Message> deadMessageQueue = new ArrayDeque<>(); privatestaticfinal List<DelayQueue<DelayedMessage>> delayQueueList = new ArrayList<>(2); /** * 逆序排序,因为重试次数到0则不再重试 */ privatestatic List<Long> delayTimes = Lists.newArrayList(10000L, 5000L); public static void init(Consumer consumer) { delayQueueList.add(new DelayQueue<>()); delayQueueList.add(new DelayQueue<>()); Thread consumerThread1 = new Thread(() -> { try { while (true) { // 从延时队列中取出消息(会等待直到消息到期) DelayQueue<DelayedMessage> delayQueue = delayQueueList.get(0); if(!delayQueue.isEmpty()) { DelayedMessage message = delayQueue.take(); consumer.consume(message.message); delayQueue.remove(message); System.out.println("Consumed: " + message.getMessage() + " at " + System.currentTimeMillis()); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); System.out.println("Consumer thread interrupted"); } }); // 其他等级的延时队列 consumerThread1.start(); } public static void reConsume(MqCausa.Message msg) { if (msg.getRetryTime() == 0) { log.error("msg : {} is dead", msg); deadMessageQueue.add(msg); return; } MqCausa.Message build = msg.toBuilder().setRetryTime(msg.getRetryTime() - 1).build(); DelayedMessage delayedMessage = new DelayedMessage(build, delayTimes.get(build.getRetryTime())); delayQueueList.get(msg.getRetryTime() - 1).add(delayedMessage); } // 定义一个延时消息类,实现 Delayed 接口 staticclass DelayedMessage implements Delayed { privatefinal MqCausa.Message message; privatefinallong triggerTime; // 到期时间 public DelayedMessage(MqCausa.Message message, long delayTime) { this.message = message; // 当前时间加上延时时间,设置消息的触发时间 this.triggerTime = System.currentTimeMillis() + delayTime; } // 获取剩余的延时时间 @Override public long getDelay(TimeUnit unit) { return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } // 比较方法,用于确定消息的顺序 @Override public int compareTo(Delayed other) { if (this.triggerTime < ((DelayedMessage) other).triggerTime) { return -1; } elseif (this.triggerTime > ((DelayedMessage) other).triggerTime) { return1; } return0; } public MqCausa.Message getMessage() { return message; } }

最终实现如图所示:

实现功能:

  • 对于任务操作请求的异步发送

  • 轮询策略实现消费的负载均衡

其他

附上个人总结的对于worker和server之间服务发现以及调度的流程图

服务发现

调度

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 8:30:14

即插即用系列(代码实践) | AMD核心模块:自适应多尺度分解框架——纯MLP架构吊打Transformer,时间序列预测新SOTA

论文题目:Adaptive Multi-Scale Decomposition Framework for Time Series Forecasting 中文题目:用于时间序列预测的自适应多尺度分解框架 论文出处:arXiv 2024 (清华大学深圳国际研究生院 & 同济大学) 应用任务:时间序列预测 (Time Series Forecasting)、长期预测 核…

作者头像 李华
网站建设 2026/4/18 5:16:35

邦芒解析:最难升职的六种职场人员

在职场中&#xff0c;升职往往不仅取决于工作能力&#xff0c;还与个人的沟通方式、心态和人际关系密切相关。综合来看&#xff0c;以下六类职场人员常面临升职困难&#xff1a;1‌、不懂汇报与邀功的员工‌&#xff1a;即使工作认真、能力突出&#xff0c;但习惯埋头苦干&…

作者头像 李华
网站建设 2026/4/24 8:32:29

内存池详解和实现

内存池 1. 定长内存池 1.0 预先分配内存池大小 // 定义内存页的大小为 4096 字节 #define MEM_PAGE_SIZE 0x10001.1 内存池结构体typedef struct mempool_s{//每个内存块大小int blocksize;//空闲内存块数量int freecount;//指向空闲内存块链表的头指针char *free_ptr;//内存池…

作者头像 李华
网站建设 2026/4/27 0:49:34

多渠道简历管理不再难,智能化招聘工具的一键去重与智能归集技巧

在企业招聘过程中&#xff0c;HR 常常面临多渠道简历分散存储、重复投递频发的问题&#xff0c;手动整理不仅耗时耗力&#xff0c;还易遗漏优质候选人。而智能化招聘工具的多渠道简历一键去重与智能归集功能&#xff0c;正是解决这一痛点的关键。 本文将从功能实现逻辑、操作方…

作者头像 李华