news 2026/4/18 10:41:13

Spring Boot4.0整合RabbitMQ死信队列详解

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Spring Boot4.0整合RabbitMQ死信队列详解

Spring Boot整合RabbitMQ死信队列详解

为啥那么讲解死信队列,因为好多人不会使用,不知道什么场景下使用,此案例是我在公司实现的一种方式,让大家都可以学习到

一、死信队列的好处

1.提高系统可靠性

  • 避免消息丢失,确保处理失败的消息有备份
  • 防止因消息处理异常导致的消息无限重试

2.异常消息管理

  • 将异常消息与正常消息分离
  • 便于监控和排查问题消息

3.灵活的重试机制

  • 支持延迟重试
  • 可设置不同的重试策略

4.系统解耦

  • 业务逻辑与异常处理逻辑分离
  • 提高代码的可维护性

二、注解式配置说明

1.主配置注解

@ConfigurationpublicclassRabbitMQConfig{// 主队列@BeanpublicQueueorderQueue(){returnQueueBuilder.durable("order.queue").deadLetterExchange("dlx.exchange")// 死信交换器.deadLetterRoutingKey("dlx.routing.key")// 死信路由键.ttl(10000)// 消息10秒未消费进入死信.maxLength(1000)// 队列最大长度.build();}// 死信队列@BeanpublicQueuedeadLetterQueue(){returnQueueBuilder.durable("dl.queue").build();}// 死信交换器@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange("dlx.exchange");}// 绑定死信交换器和队列@BeanpublicBindingdeadLetterBinding(){returnBindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("dlx.routing.key");}}

2.监听器注解

@ComponentpublicclassOrderMessageListener{// 监听正常队列@RabbitListener(queues="order.queue")publicvoidprocessOrderMessage(OrderDTOorder,Channelchannel,@Header(AmqpHeaders.DELIVERY_TAG)longtag){try{// 业务处理逻辑if(processOrder(order)){// 手动确认channel.basicAck(tag,false);}else{// 拒绝消息,进入死信队列channel.basicNack(tag,false,false);}}catch(Exceptione){// 异常时拒绝channel.basicNack(tag,false,false);}}// 监听死信队列@RabbitListener(queues="dl.queue")publicvoidprocessDeadLetter(OrderDTOorder){log.error("收到死信消息: {}",order);// 死信消息处理逻辑handleDeadLetter(order);}}

三、详细整合步骤

1.添加依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2.配置属性

spring:rabbitmq:host:localhostport:5672username:guestpassword:guest# 开启消息返回机制publisher-returns:true# 开启确认机制publisher-confirm-type:correlatedlistener:simple:# 手动确认acknowledge-mode:manual# 重试配置retry:enabled:truemax-attempts:3initial-interval:1000

3.完整配置类

@Configuration@Slf4jpublicclassRabbitMQFullConfig{// ========== 正常业务队列配置 ==========@BeanpublicDirectExchangeorderExchange(){returnnewDirectExchange("order.exchange",true,false);}@BeanpublicQueueorderQueue(){Map<String,Object>args=newHashMap<>();// 死信交换器args.put("x-dead-letter-exchange","order.dlx.exchange");// 死信路由键args.put("x-dead-letter-routing-key","order.dlx.key");// 消息TTL(毫秒)args.put("x-message-ttl",30000);// 队列最大长度args.put("x-max-length",10000);returnQueueBuilder.durable("order.queue").withArguments(args).build();}@BeanpublicBindingorderBinding(){returnBindingBuilder.bind(orderQueue()).to(orderExchange()).with("order.key");}// ========== 死信队列配置 ==========@BeanpublicDirectExchangedeadLetterExchange(){returnnewDirectExchange("order.dlx.exchange",true,false);}@BeanpublicQueuedeadLetterQueue(){returnQueueBuilder.durable("order.dl.queue").build();}@BeanpublicBindingdeadLetterBinding(){returnBindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with("order.dlx.key");}// ========== 重试队列(延时队列替代方案)==========@BeanpublicCustomExchangedelayExchange(){Map<String,Object>args=newHashMap<>();args.put("x-delayed-type","direct");returnnewCustomExchange("delay.exchange","x-delayed-message",true,false,args);}@BeanpublicQueuedelayQueue(){returnQueueBuilder.durable("delay.queue").build();}@BeanpublicBindingdelayBinding(){returnBindingBuilder.bind(delayQueue()).to(delayExchange()).with("delay.key").noargs();}}

4.消息生产者

@Component@Slf4jpublicclassMessageProducer{@AutowiredprivateRabbitTemplaterabbitTemplate;// 发送普通消息publicvoidsendOrderMessage(OrderDTOorder){CorrelationDatacorrelationData=newCorrelationData(order.getId());rabbitTemplate.convertAndSend("order.exchange","order.key",order,message->{// 设置消息属性message.getMessageProperties().setExpiration("30000")// 消息TTL.setDeliveryMode(MessageDeliveryMode.PERSISTENT);returnmessage;},correlationData);// 确认回调correlationData.getFuture().addCallback(result->{if(result.isAck()){log.info("消息发送成功: {}",order.getId());}},ex->log.error("消息发送失败: {}",ex.getMessage()));}// 发送延迟消息publicvoidsendDelayMessage(OrderDTOorder,intdelayTime){rabbitTemplate.convertAndSend("delay.exchange","delay.key",order,message->{message.getMessageProperties().setHeader("x-delay",delayTime);returnmessage;});}}

5.消息消费者(完整版)

@Component@Slf4jpublicclassOrderMessageConsumer{privatestaticfinalintMAX_RETRY_COUNT=3;@AutowiredprivateMessageProducermessageProducer;/** * 监听订单队列 */@RabbitListener(queues="order.queue")publicvoidhandleOrderMessage(@PayloadOrderDTOorder,@HeadersMap<String,Object>headers,Channelchannel,@Header(AmqpHeaders.DELIVERY_TAG)longdeliveryTag){try{log.info("收到订单消息: {}",order);// 模拟业务处理booleansuccess=processOrderBusiness(order);if(success){// 业务成功,确认消息channel.basicAck(deliveryTag,false);log.info("订单处理成功: {}",order.getId());}else{// 获取重试次数IntegerretryCount=(Integer)headers.get("x-retry-count");retryCount=(retryCount==null)?1:retryCount+1;if(retryCount<=MAX_RETRY_COUNT){// 重试次数未超限,重新入队log.warn("订单处理失败,第{}次重试: {}",retryCount,order.getId());// 设置重试计数headers.put("x-retry-count",retryCount);// 延迟重试messageProducer.sendDelayMessage(order,5000);// 确认消息,避免重新投递channel.basicAck(deliveryTag,false);}else{// 超过重试次数,进入死信队列log.error("订单处理失败次数超过上限,进入死信队列: {}",order.getId());channel.basicNack(deliveryTag,false,false);}}}catch(Exceptione){log.error("处理订单消息异常: {}",e.getMessage());try{// 拒绝消息,进入死信队列channel.basicNack(deliveryTag,false,false);}catch(IOExceptionex){log.error("拒绝消息失败: {}",ex.getMessage());}}}/** * 监听死信队列 */@RabbitListener(queues="order.dl.queue")publicvoidhandleDeadLetterMessage(@PayloadOrderDTOorder,@HeadersMap<String,Object>headers){log.error("收到死信消息: {}",order);// 记录死信消息logDeadLetter(order,headers);// 发送告警sendAlert(order);// 人工处理或其他补偿措施manualProcess(order);}/** * 监听延迟队列 */@RabbitListener(queues="delay.queue")publicvoidhandleDelayMessage(@PayloadOrderDTOorder){log.info("收到延迟消息,开始重试: {}",order);// 重新发送到订单队列messageProducer.sendOrderMessage(order);}privatebooleanprocessOrderBusiness(OrderDTOorder){// 业务处理逻辑// 返回true表示成功,false表示失败returnnewRandom().nextBoolean();}privatevoidlogDeadLetter(OrderDTOorder,Map<String,Object>headers){// 记录死信日志log.info("记录死信: {}, headers: {}",order,headers);}privatevoidsendAlert(OrderDTOorder){// 发送告警通知log.warn("发送告警: 订单{}处理失败",order.getId());}privatevoidmanualProcess(OrderDTOorder){// 人工处理逻辑log.info("等待人工处理订单: {}",order.getId());}}

四、使用场景

1.订单超时取消

// 订单创建时发送延迟消息publicvoidcreateOrder(OrderDTOorder){// 保存订单orderService.save(order);// 发送30分钟过期的消息rabbitTemplate.convertAndSend("order.exchange","order.key",order,message->{message.getMessageProperties().setExpiration("1800000");// 30分钟returnmessage;});}

2.支付回调重试

// 支付回调失败时进入死信队列,人工处理@RabbitListener(queues="payment.callback.queue")publicvoidhandlePaymentCallback(PaymentDTOpayment){if(!paymentService.processCallback(payment)){thrownewRuntimeException("支付回调处理失败");}}

3.库存锁定与释放

// 库存锁定15分钟后自动释放publicvoidlockInventory(StringorderId){inventoryService.lock(orderId);// 发送15分钟后到期的消息rabbitTemplate.convertAndSend("inventory.exchange","inventory.lock.key",orderId,message->{message.getMessageProperties().setExpiration("900000");// 15分钟returnmessage;});}

4.消息重试机制

// 分级重试策略publicclassRetryStrategy{// 第一次重试:5秒后// 第二次重试:30秒后// 第三次重试:5分钟后// 超过3次进入死信队列}

五、优点总结

  1. 可靠性:确保消息不丢失,即使处理失败也有备份
  2. 灵活性:支持多种死信策略(超时、长度限制、拒绝等)
  3. 可维护性:异常处理与正常业务逻辑分离
  4. 监控性:死信队列便于监控和统计异常消息
  5. 可扩展性:支持多种重试和补偿机制

六、最佳实践建议

  1. 合理设置TTL:根据业务需求设置合适的过期时间
  2. 监控死信队列:设置告警,及时处理死信消息
  3. 限制队列大小:防止消息积压
  4. 记录详细日志:便于问题排查
  5. 死信消息分析:定期分析死信原因,优化系统

通过Spring Boot4.0整合RabbitMQ死信队列,可以构建更加健壮、可靠的消息驱动系统,有效处理各种异常场景,提高系统的整体稳定性。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 7:52:40

4.2 AI辅助技术文档撰写:将代码理解转化为专业文档

4.2 AI辅助技术文档撰写:将代码理解转化为专业文档 在深入理解代码库之后,下一步就是将这些理解转化为清晰、准确的技术文档。技术文档不仅是团队协作的重要工具,也是项目可持续发展的关键。本节将介绍如何利用AI工具将代码理解转化为高质量的技术文档。 技术文档的重要性…

作者头像 李华
网站建设 2026/4/18 8:44:33

如何让你的GitHub项目快速涨星(Star),具备知名度

GitHub 开源项目为何容易被埋没&#xff1f; 在当今的软件开发领域&#xff0c;GitHub 早已成为开发者日常工作和学习中不可或缺的平台。我们几乎每天都会接触到大量的开源项目&#xff0c;而衡量一个项目是否值得使用&#xff0c;Star 和 Fork 数量往往是最直观、也最常见的参…

作者头像 李华
网站建设 2026/4/18 8:46:53

5.1 攻克LLM致命痛点:深入理解MCP协议核心机制

5.1 攻克LLM致命痛点:深入理解MCP协议核心机制 大型语言模型(LLM)在代码生成和理解方面展现出了惊人的能力,但在实际应用中仍然存在一些致命的痛点。本节将深入探讨这些痛点,并介绍Model Context Protocol(MCP)协议如何解决这些问题,为AI编程工具提供更强大、更准确的…

作者头像 李华
网站建设 2026/4/18 5:41:07

Higress云原生网关部署与优化配置指南

Higress云原生网关部署与优化配置指南 【免费下载链接】higress Next-generation Cloud Native Gateway | 下一代云原生网关 项目地址: https://gitcode.com/GitHub_Trending/hi/higress 在当今云原生技术架构中&#xff0c;高效可靠的Kubernetes应用网关部署已成为企业…

作者头像 李华