news 2026/6/10 16:16:18

供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

1、框架设计逻辑

核心组件

1、服务注册与发现(Redis)

  • 使用Redis作为服务注册中心
  • 以服务名称ApplicationName为key存储节点集群
  • 基于时间戳的心跳机制(10秒间隔)

2、任务调度系统

  • 数据库作为任务持久化存储
  • 守护线程轮询获取新任务
  • 基于负载的调度算法(选择负载最小节点)

3、双守护线程模型

  • 节点监控守护线程:维护节点健康状态
  • 任务发现守护线程:分配计算任务

具体简单时序图如下图所示

2、核心代码实现

2.1、框架核心实现

2.1.1、监听Spring应用启动事件

  • 事件驱动:利用 Spring 应用启动事件,在合适时机启动监控功能
  • 条件控制:通过配置控制功能是否启用,提高灵活性
  • 功能整合:同时启动监控线程和执行类型注册,完成进程监控的初始化

1、启动守护线程

    • ProcessDaemonServiceImpl 实现了 ApplicationListener 接口,监听 Spring 应用启动事件;
    • 在应用启动完成后启动守护线程,监控节点存活状态和进程状态;

2、注册中心注册

    • 获取应用上下文:从事件中获取 ApplicationContext
    • 执行注册服务:获取 ProcessTypeRegisterService 并调用 doRegister()
@Slf4j @Component public class ProcessDaemonServiceImpl implements ApplicationListener<ApplicationStartedEvent> { @Autowired ProcessProperties processProperties; @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { if(processProperties.isEnable() && processProperties.getBusinessKeys().size()!=0){ startDaemonThread(); ApplicationContext applicationContext = applicationStartedEvent.getApplicationContext(); applicationContext.getBean(ProcessTypeRegisterService.class).doRegister(); } } // 启动守护线程 线程优先级设置为10(最高优先级) private void startDaemonThread() { Thread daemonThread2 = new Thread(processUtilServiceImpl.nodeAliveWatcher, "nodeAliveWatcher"); daemonThread2.setDaemon(true); daemonThread2.setPriority(10); // 启动线程 daemonThread2.start(); log.info("{},守护线程启动",daemonThread2.getName()); Thread daemonThread = new Thread(processUtilServiceImpl.processStatusWatcher, "processStatusWatcher"); // 设置为守护线程 daemonThread.setDaemon(true); daemonThread.setPriority(10); // 启动线程 daemonThread.start(); log.info("{},守护线程启动",daemonThread.getName()); } }

2.1.2、监控器

2.1.2.1、节点保活监控器

无限循环运行的守护线程,负责监控节点的状态信息,分布式锁:使用 ALL_NODE_PROCESS_LOCK_KEY 确保集群中只有一个节点执行监控

策略:

    • 定期更新:每 8 秒更新一次节点状态
    • 分布式协调:通过分布式锁确保集群节点状态的一致性
    • 负载信息维护:更新当前节点的负载信息

重启检测

    • 重启标识:初始化时设置 isRestarted 为 true
    • 状态同步:向集群其他节点通知当前节点重启状态
@Slf4j @Component public class ProcessUtilServiceImpl implements ProcessUtilService { /*** * 节点保活监视器 **/ public final Runnable nodeAliveWatcher = () -> { StatusDTO statusDTO = new StatusDTO(); statusDTO.setIsRestarted(true); statusDTO.setWeight(null); while (true) { try { ThreadSleepUtil.parkSeconds(8); String timeSlot = MyDateUtils.getTimeSlot(); ALL_NODE_PROCESS_LOCK_KEY = String.format(ALL_NODE_PROCESS_LOCK_KEY, applicationName,timeSlot); redissonDistributeLock.dealWithLock(ALL_NODE_PROCESS_LOCK_KEY, null, nodeProcessLoadServiceImpl.updateThisNodeInfoFunc, (param) -> { log.warn("节点保活监视器无法正常获取锁,无法更新节点状态"); return null; }, statusDTO); } catch (Exception e) { log.error("节点保活监视器异常"); log.error(e.getMessage(), e); } } }; }

节点状态

@Data public class StatusDTO { private Random random = new Random(); private Boolean isRestarted = true; Long weight ; }

节点状态更新机制

基于redis缓存去更新

  • 维护节点状态:更新当前节点的存活状态和负载信息
  • 权重管理:根据 StatusDTO 中的权重值调整节点负载
  • 节点清理:移除长时间未更新的节点信息
public Function<StatusDTO, Void> updateThisNodeInfoFunc = (statusDTO) -> { try { Long dealingWeight = statusDTO.getWeight(); RMap<String, NodeProcessStastic> nodeDatas = redissonClient.getMap(ALL_NODE_PROCESS_KEY); String localHostIp = IpAddressUtil.getHostIp(); NodeProcessStastic nodeProcessStastic = nodeDatas.getOrDefault(localHostIp, new NodeProcessStastic()); nodeProcessStastic.setTimestamp(DateUtil.now()); nodeProcessStastic.setIpAddress(localHostIp.replaceAll("\\.", "-")); nodeProcessStastic.setSupportBusinessKeys(processProperties.getBusinessKeys()); if (dealingWeight != null) { if ((dealingWeight < 0 && nodeProcessStastic.getDealingProcessWeightSum()>0)|| dealingWeight > 0) { log.info("该机器:{},增加权重得分:{}", localHostIp, dealingWeight); nodeProcessStastic.setDealingProcessWeightSum(nodeProcessStastic.getDealingProcessWeightSum() + dealingWeight); nodeProcessStastic.setLastWeightChangeTimestamp(DateUtil.now()); } }else{ if(statusDTO.getIsRestarted()){ nodeProcessStastic.setDealingProcessWeightSum(0); nodeProcessStastic.setLastWeightChangeTimestamp(null); } } nodeDatas.put(localHostIp, nodeProcessStastic); // 移除可能已经重启了的pod int oriSize = nodeDatas.size(); nodeDatas.values().removeIf(ele -> { long btwTime = DateUtil.between(DateUtil.parse(ele.g
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 14:55:04

git merge vs rebase:选择合适方式整合PyTorch-CUDA-v2.8代码

git merge vs rebase&#xff1a;选择合适方式整合PyTorch-CUDA-v2.8代码 在深度学习项目的开发流程中&#xff0c;一个常见的场景是&#xff1a;团队基于统一的 PyTorch-CUDA 容器镜像&#xff08;如 pytorch-cuda:v2.8&#xff09;开展模型训练和实验。随着功能迭代推进&…

作者头像 李华
网站建设 2026/6/10 17:36:47

HuggingFace Transformers模型加载优化技巧(基于PyTorch镜像)

HuggingFace Transformers模型加载优化技巧&#xff08;基于PyTorch镜像&#xff09; 在深度学习项目中&#xff0c;尤其是涉及大语言模型的开发与部署时&#xff0c;一个常见的痛点是&#xff1a;明明代码写得没问题&#xff0c;但一运行就卡在“加载模型”这一步——显存爆了…

作者头像 李华
网站建设 2026/6/10 14:56:16

anaconda配置pytorch环境耗时太久?建议切换至容器化方案

告别Anaconda慢速配置&#xff1a;PyTorch开发为何该转向容器化&#xff1f; 在深度学习项目启动前&#xff0c;你是否经历过这样的场景&#xff1f; 打开终端&#xff0c;输入 conda create -n pytorch-env python3.10&#xff0c;然后一边刷新网页查CUDA版本兼容表&#xff…

作者头像 李华
网站建设 2026/6/10 14:56:39

开源项目部署利器:PyTorch-CUDA镜像一键复现SOTA模型

开源项目部署利器&#xff1a;PyTorch-CUDA镜像一键复现SOTA模型 在深度学习领域&#xff0c;你是否经历过这样的场景&#xff1f;刚从论文中找到一个令人兴奋的 SOTA 模型代码仓库&#xff0c;满心欢喜地克隆下来准备复现结果&#xff0c;却在 pip install -r requirements.tx…

作者头像 李华
网站建设 2026/6/10 16:02:56

PyTorch-CUDA-v2.8镜像支持Kubernetes部署吗?Yes,兼容k8s

PyTorch-CUDA-v2.8镜像支持Kubernetes部署吗&#xff1f;Yes&#xff0c;兼容k8s 在AI模型训练日益复杂、GPU资源成本高企的今天&#xff0c;如何快速、稳定地将深度学习环境部署到生产集群中&#xff0c;是每个MLOps团队面临的现实挑战。手动配置PyTorch环境&#xff1f;等待数…

作者头像 李华