查券返利机器人的异步任务调度:Java XXL-Job+Redis实现海量查券请求的分布式任务分发
大家好,我是 微赚淘客系统3.0 的研发者省赚客!
在高并发场景下,用户通过查券返利机器人发起的优惠券查询请求可能瞬时达到数十万量级。为避免直接冲击核心接口并保障系统稳定性,我们采用“请求入队 + 异步消费”模式,基于 XXL-JOB 作为分布式调度中心,结合 Redis Stream 实现任务的可靠分发与削峰填谷。
整体架构设计
用户请求首先写入 Redis Stream,由多个消费者实例监听流数据;XXL-JOB 定时触发任务拉取器,动态调整消费速率,并支持失败重试、积压告警与人工干预。该方案解耦了请求入口与处理逻辑,提升系统弹性。
Redis Stream 任务队列定义
每个查券请求封装为一个结构化消息,写入名为coupon:query:stream的 Stream:
packagejuwatech.cn.task.model;publicclassCouponQueryTask{privateStringtaskId;// 全局唯一IDprivateStringuserId;// 用户IDprivateStringitemId;// 商品IDprivatelongtimestamp;// 请求时间戳privateintretryCount;// 重试次数// getters and setterspublicStringgetTaskId(){returntaskId;}publicvoidsetTaskId(StringtaskId){this.taskId=taskId;}publicStringgetUserId(){returnuserId;}publicvoidsetUserId(StringuserId){this.userId=userId;}publicStringgetItemId(){returnitemId;}publicvoidsetItemId(StringitemId){this.itemId=itemId;}publiclonggetTimestamp(){returntimestamp;}publicvoidsetTimestamp(longtimestamp){this.timestamp=timestamp;}publicintgetRetryCount(){returnretryCount;}publicvoidsetRetryCount(intretryCount){this.retryCount=retryCount;}}生产者将任务推入 Redis:
packagejuwatech.cn.task.producer;importcom.fasterxml.jackson.databind.ObjectMapper;importorg.springframework.data.redis.connection.stream.StreamRecords;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Component;importjava.util.HashMap;importjava.util.Map;@ComponentpublicclassCouponTaskProducer{privatefinalRedisTemplate<String,Object>redisTemplate;privatefinalObjectMapperobjectMapper=newObjectMapper();publicCouponTaskProducer(RedisTemplate<String,Object>redisTemplate){this.redisTemplate=redisTemplate;}publicvoidsubmitTask(juwatech.cn.task.model.CouponQueryTasktask)throwsException{Map<String,String>payload=newHashMap<>();payload.put("task",objectMapper.writeValueAsString(task));redisTemplate.opsForStream().add(StreamRecords.newRecord().ofObject(payload).withStreamKey("coupon:query:stream"));}}XXL-JOB 调度任务配置
在 XXL-JOB 控制台注册执行器juwatech-coupon-job,并创建任务coupon-query-consumer,Cron 表达式设为0/5 * * * * ?(每5秒触发一次)。
对应的 JobHandler 实现如下:
packagejuwatech.cn.job;importcom.xxl.job.core.context.XxlJobHelper;importcom.xxl.job.core.handler.annotation.XxlJob;importorg.springframework.stereotype.Component;@ComponentpublicclassCouponQueryConsumerJob{privatefinaljuwatech.cn.task.consumer.CouponTaskConsumerconsumer;publicCouponQueryConsumerJob(juwatech.cn.task.consumer.CouponTaskConsumerconsumer){this.consumer=consumer;}@XxlJob("couponQueryConsumer")publicvoidexecute(){try{intconsumed=consumer.consumeBatch(100);// 每次最多消费100条XxlJobHelper.handleSuccess("Consumed "+consumed+" tasks");}catch(Exceptione){XxlJobHelper.handleFail(e.getMessage());}}}Redis Stream 消费逻辑
消费者从 Stream 读取待处理任务,并调用查券服务:
packagejuwatech.cn.task.consumer;importcom.fasterxml.jackson.databind.ObjectMapper;importorg.springframework.data.redis.connection.stream.*;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Service;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Map;@ServicepublicclassCouponTaskConsumer{privatefinalRedisTemplate<String,Object>redisTemplate;privatefinaljuwatech.cn.service.CouponQueryServicecouponQueryService;privatefinalObjectMapperobjectMapper=newObjectMapper();publicCouponTaskConsumer(RedisTemplate<String,Object>redisTemplate,juwatech.cn.service.CouponQueryServicecouponQueryService){this.redisTemplate=redisTemplate;this.couponQueryService=couponQueryService;}publicintconsumeBatch(intbatchSize)throwsException{StreamReadOptionsoptions=StreamReadOptions.empty().count(batchSize).block(Duration.ofSeconds(2));// 阻塞等待新消息List<MapRecord<String,Object,Object>>records=redisTemplate.opsForStream().read(options,StreamOffset.create("coupon:query:stream",ReadOffset.lastConsumed()));if(records==null||records.isEmpty()){return0;}for(MapRecord<String,Object,Object>record:records){try{Map<Object,Object>value=record.getValue();StringtaskJson=(String)value.get("task");juwatech.cn.task.model.CouponQueryTasktask=objectMapper.readValue(taskJson,juwatech.cn.task.model.CouponQueryTask.class);// 执行查券couponQueryService.queryAndNotify(task.getUserId(),task.getItemId());// 确认消费(删除消息)redisTemplate.opsForStream().acknowledge("coupon:query:stream","coupon-group",record.getId());}catch(Exceptione){handleConsumeFailure(record,e);}}returnrecords.size();}privatevoidhandleConsumeFailure(MapRecord<String,Object,Object>record,Exceptione){// 可选:将失败任务写入重试队列或 DLQjuwatech.cn.util.AsyncLogger.logAsync("Failed to process task "+record.getId()+": "+e.getMessage());}}消费者组与消息可靠性
初始化消费者组以支持多实例并行消费:
XGROUP CREATE coupon:query:stream coupon-group $ MKSTREAM在应用启动时自动创建(可选):
packagejuwatech.cn.config;importorg.springframework.data.redis.core.RedisTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassRedisStreamInit{privatefinalRedisTemplate<String,String>redisTemplate;publicRedisStreamInit(RedisTemplate<String,String>redisTemplate){this.redisTemplate=redisTemplate;}@PostConstructpublicvoidinitConsumerGroup(){try{redisTemplate.execute((connection)->{connection.streamCommands().xGroupCreate("coupon:query:stream".getBytes(),"coupon-group".getBytes(),org.springframework.data.redis.connection.stream.ReadOffset.from("0-0").getOffset().getBytes(),true);returnnull;});}catch(Exceptionignored){// Group already exists}}}积压监控与弹性扩缩容
通过 Redis 命令XINFO GROUPS coupon:query:stream获取 pending 消息数,在 XXL-JOB 中上报指标,结合 Prometheus + Alertmanager 实现积压告警。当 pending > 10000 时,自动扩容消费者实例。
本文著作权归 微赚淘客系统3.0 研发团队,转载请注明出处!