news 2026/6/10 16:16:18

供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
供应链计划系统架构实战(七):轻量级分布式计算框架设计与实现

1、框架设计逻辑

核心组件

1、服务注册与发现(Redis)

  • 使用Redis作为服务注册中心
  • 以服务名称ApplicationName为key存储节点集群
  • 基于时间戳的心跳机制(10秒间隔)

2、任务调度系统

  • 数据库作为任务持久化存储
  • 守护线程轮询获取新任务
  • 基于负载的调度算法(选择负载最小节点)

3、双守护线程模型

  • 节点监控守护线程:维护节点健康状态
  • 任务发现守护线程:分配计算任务

具体简单时序图如下图所示

2、核心代码实现

2.1、框架核心实现

2.1.1、监听Spring应用启动事件

  • 事件驱动:利用 Spring 应用启动事件,在合适时机启动监控功能
  • 条件控制:通过配置控制功能是否启用,提高灵活性
  • 功能整合:同时启动监控线程和执行类型注册,完成进程监控的初始化

1、启动守护线程

    • ProcessDaemonServiceImpl 实现了 ApplicationListener 接口,监听 Spring 应用启动事件;
    • 在应用启动完成后启动守护线程,监控节点存活状态和进程状态;

2、注册中心注册

    • 获取应用上下文:从事件中获取 ApplicationContext
    • 执行注册服务:获取 ProcessTypeRegisterService 并调用 doRegister()
@Slf4j @Component public class ProcessDaemonServiceImpl implements ApplicationListener<ApplicationStartedEvent> { @Autowired ProcessProperties processProperties; @Override public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) { if(processProperties.isEnable() && processProperties.getBusinessKeys().size()!=0){ startDaemonThread(); ApplicationContext applicationContext = applicationStartedEvent.getApplicationContext(); applicationContext.getBean(ProcessTypeRegisterService.class).doRegister(); } } // 启动守护线程 线程优先级设置为10(最高优先级) private void startDaemonThread() { Thread daemonThread2 = new Thread(processUtilServiceImpl.nodeAliveWatcher, "nodeAliveWatcher"); daemonThread2.setDaemon(true); daemonThread2.setPriority(10); // 启动线程 daemonThread2.start(); log.info("{},守护线程启动",daemonThread2.getName()); Thread daemonThread = new Thread(processUtilServiceImpl.processStatusWatcher, "processStatusWatcher"); // 设置为守护线程 daemonThread.setDaemon(true); daemonThread.setPriority(10); // 启动线程 daemonThread.start(); log.info("{},守护线程启动",daemonThread.getName()); } }

2.1.2、监控器

2.1.2.1、节点保活监控器

无限循环运行的守护线程,负责监控节点的状态信息,分布式锁:使用 ALL_NODE_PROCESS_LOCK_KEY 确保集群中只有一个节点执行监控

策略:

    • 定期更新:每 8 秒更新一次节点状态
    • 分布式协调:通过分布式锁确保集群节点状态的一致性
    • 负载信息维护:更新当前节点的负载信息

重启检测

    • 重启标识:初始化时设置 isRestarted 为 true
    • 状态同步:向集群其他节点通知当前节点重启状态
@Slf4j @Component public class ProcessUtilServiceImpl implements ProcessUtilService { /*** * 节点保活监视器 **/ public final Runnable nodeAliveWatcher = () -> { StatusDTO statusDTO = new StatusDTO(); statusDTO.setIsRestarted(true); statusDTO.setWeight(null); while (true) { try { ThreadSleepUtil.parkSeconds(8); String timeSlot = MyDateUtils.getTimeSlot(); ALL_NODE_PROCESS_LOCK_KEY = String.format(ALL_NODE_PROCESS_LOCK_KEY, applicationName,timeSlot); redissonDistributeLock.dealWithLock(ALL_NODE_PROCESS_LOCK_KEY, null, nodeProcessLoadServiceImpl.updateThisNodeInfoFunc, (param) -> { log.warn("节点保活监视器无法正常获取锁,无法更新节点状态"); return null; }, statusDTO); } catch (Exception e) { log.error("节点保活监视器异常"); log.error(e.getMessage(), e); } } }; }

节点状态

@Data public class StatusDTO { private Random random = new Random(); private Boolean isRestarted = true; Long weight ; }

节点状态更新机制

基于redis缓存去更新

  • 维护节点状态:更新当前节点的存活状态和负载信息
  • 权重管理:根据 StatusDTO 中的权重值调整节点负载
  • 节点清理:移除长时间未更新的节点信息
public Function<StatusDTO, Void> updateThisNodeInfoFunc = (statusDTO) -> { try { Long dealingWeight = statusDTO.getWeight(); RMap<String, NodeProcessStastic> nodeDatas = redissonClient.getMap(ALL_NODE_PROCESS_KEY); String localHostIp = IpAddressUtil.getHostIp(); NodeProcessStastic nodeProcessStastic = nodeDatas.getOrDefault(localHostIp, new NodeProcessStastic()); nodeProcessStastic.setTimestamp(DateUtil.now()); nodeProcessStastic.setIpAddress(localHostIp.replaceAll("\\.", "-")); nodeProcessStastic.setSupportBusinessKeys(processProperties.getBusinessKeys()); if (dealingWeight != null) { if ((dealingWeight < 0 && nodeProcessStastic.getDealingProcessWeightSum()>0)|| dealingWeight > 0) { log.info("该机器:{},增加权重得分:{}", localHostIp, dealingWeight); nodeProcessStastic.setDealingProcessWeightSum(nodeProcessStastic.getDealingProcessWeightSum() + dealingWeight); nodeProcessStastic.setLastWeightChangeTimestamp(DateUtil.now()); } }else{ if(statusDTO.getIsRestarted()){ nodeProcessStastic.setDealingProcessWeightSum(0); nodeProcessStastic.setLastWeightChangeTimestamp(null); } } nodeDatas.put(localHostIp, nodeProcessStastic); // 移除可能已经重启了的pod int oriSize = nodeDatas.size(); nodeDatas.values().removeIf(ele -> { long btwTime = DateUtil.between(DateUtil.parse(ele.g
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 14:58:43

PyTorch最新版v2.8发布,CUDA加速性能提升30%

PyTorch v2.8 发布&#xff1a;CUDA 加速性能提升 30%&#xff0c;开发效率再上新台阶 在深度学习研发日益依赖大规模算力的今天&#xff0c;一个看似微小的性能优化&#xff0c;往往能为团队节省成百上千小时的训练时间。就在最近&#xff0c;PyTorch 官方发布了 v2.8 版本&am…

作者头像 李华
网站建设 2026/6/10 14:26:33

清华镜像源加速PyTorch相关库安装,速度提升5倍以上

清华镜像源加速PyTorch安装&#xff1a;让深度学习环境搭建快如闪电 在人工智能实验室的深夜&#xff0c;你是否也经历过这样的场景&#xff1f;——刚配置好服务器&#xff0c;兴致勃勃地运行 pip install torch&#xff0c;结果看着终端里龟速爬行的进度条&#xff0c;一杯咖…

作者头像 李华
网站建设 2026/6/10 12:39:26

早上八点的电费涨到一块二了,家里的洗衣机还在嗡嗡转。我瘫在沙发上刷着电费账单,突然意识到这年头连用个电都得玩策略游戏——分时电价这玩意儿简直比股票K线图还刺激

分时电价下用户需求侧响应优化调度 摘要&#xff1a;为研究需求侧响应随着分时电价的响应策略&#xff0c;构建了含有可中断负荷、可转移负荷在内的需求侧优化调度模型&#xff0c;研究分时电价下可中断、可转移负荷的具体调度策略&#xff0c;并通过图展示其结果&#xff0c;具…

作者头像 李华
网站建设 2026/6/10 14:26:06

markdown插入代码块:正确格式化PyTorch-CUDA-v2.8 Python代码

PyTorch-CUDA-v2.8 镜像中的 Markdown 代码块规范与开发实践 在深度学习项目中&#xff0c;环境配置往往比写模型代码更耗时。你是否曾遇到过这样的场景&#xff1a;同事发来一份“完美运行”的训练脚本&#xff0c;但在你的机器上却因 CUDA 版本不兼容、cuDNN 缺失或 PyTorch …

作者头像 李华