news 2026/6/22 8:37:08

我工作中用MQ的10种场景

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
我工作中用MQ的10种场景

近有球友问我:MQ的使用场景有哪些?工作中一定要使用MQ吗?

记得刚工作那会儿,我总是想不明白:为什么明明直接调用接口就能完成的功能,非要引入MQ这么个"中间商"?

直到经历了系统崩溃、数据丢失、性能瓶颈等一系列问题后,我才真正理解了MQ的价值。

今天我想和大家分享我在实际工作中使用消息队列(MQ)的10种典型场景,希望对你会有所帮助。

加苏三的工作内推群

一、为什么需要消息队列(MQ)?

在深入具体场景之前,我们先来思考一个基本问题:为什么要使用消息队列?

系统间的直接调用:

image

引入消息队列后:

image

接下来我们将通过10个具体场景,带大家来深入理解MQ的价值。

场景一:系统解耦

背景描述

在我早期参与的一个电商项目中,订单创建后需要通知多个系统:

// 早期的紧耦合设计

public class OrderService {

private InventoryService inventoryService;

private PointsService pointsService;

private EmailService emailService;

private AnalyticsService analyticsService;

public void createOrder(Order order) {

// 1. 保存订单

orderDao.save(order);

// 2. 调用库存服务

inventoryService.updateInventory(order);

// 3. 调用积分服务

pointsService.addPoints(order.getUserId(), order.getAmount());

// 4. 发送邮件通知

emailService.sendOrderConfirmation(order);

// 5. 记录分析数据

analyticsService.trackOrderCreated(order);

// 更多服务...

}

}

这种架构存在严重问题:

紧耦合:订单服务需要知道所有下游服务

单点故障:任何一个下游服务挂掉都会导致订单创建失败

性能瓶颈:同步调用导致响应时间慢

MQ解决方案

引入MQ后,架构变为:

image

代码实现:

// 订单服务 - 生产者

@Service

public class OrderService {

@Autowired

private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {

// 1. 保存订单

orderDao.save(order);

// 2. 发送消息到MQ

rabbitTemplate.convertAndSend(

"order.exchange",

"order.created",

new OrderCreatedEvent(order.getId(), order.getUserId(), order.getAmount())

);

}

}

// 库存服务 - 消费者

@Component

@RabbitListener(queues = "inventory.queue")

public class InventoryConsumer {

@Autowired

private InventoryService inventoryService;

@RabbitHandler

public void handleOrderCreated(OrderCreatedEvent event) {

inventoryService.updateInventory(event.getOrderId());

}

}

技术要点

消息协议选择:根据业务需求选择RabbitMQ、Kafka或RocketMQ

消息格式:使用JSON或Protobuf等跨语言格式

错误处理:实现重试机制和死信队列

场景二:异步处理

背景描述

用户上传视频后需要执行转码、生成缩略图、内容审核等耗时操作,如果同步处理,用户需要等待很长时间。

MQ解决方案

// 视频服务 - 生产者

@Service

public class VideoService {

@Autowired

private KafkaTemplate<String, Object> kafkaTemplate;

public UploadResponse uploadVideo(MultipartFile file, String userId) {

// 1. 保存原始视频

String videoId = saveOriginalVideo(file);

// 2. 发送处理消息

kafkaTemplate.send("video-processing", new VideoProcessingEvent(videoId, userId));

// 3. 立即返回响应

return new UploadResponse(videoId, "upload_success");

}

}

// 视频处理服务 - 消费者

@Service

public class VideoProcessingConsumer {

@KafkaListener(topics = "video-processing")

public void processVideo(VideoProcessingEvent event) {

// 异步执行耗时操作

videoProcessor.transcode(event.getVideoId());

videoProcessor.generateThumbnails(event.getVideoId());

contentModerationService.checkContent(event.getVideoId());

// 发送处理完成通知

notificationService.notifyUser(event.getUserId(), event.getVideoId());

}

}

架构优势

快速响应:用户上传后立即得到响应

弹性扩展:可以根据处理压力动态调整消费者数量

故障隔离:处理服务故障不会影响上传功能

场景三:流量削峰

背景描述

电商秒杀活动时,瞬时流量可能是平时的百倍以上,直接冲击数据库和服务。

MQ解决方案

image

代码实现:

// 秒杀服务

@Service

public class SecKillService {

@Autowired

private RedisTemplate<String, Object> redisTemplate;

@Autowired

private RabbitTemplate rabbitTemplate;

public SecKillResponse secKill(SecKillRequest request) {

// 1. 校验用户资格

if (!checkUserQualification(request.getUserId())) {

return SecKillResponse.failed("用户无资格");

}

// 2. 预减库存(Redis原子操作)

Long remaining = redisTemplate.opsForValue().decrement(

"sec_kill_stock:" + request.getItemId());

if (remaining == null || remaining < 0) {

// 库存不足,恢复库存

redisTemplate.opsForValue().increment("sec_kill_stock:" + request.getItemId());

return SecKillResponse.failed("库存不足");

}

// 3. 发送秒杀成功消息到MQ

rabbitTemplate.convertAndSend(

"sec_kill.exchange",

"sec_kill.success",

new SecKillSuccessEvent(request.getUserId(), request.getItemId())

);

return SecKillResponse.success("秒杀成功");

}

}

// 订单处理消费者

@Component

@RabbitListener(queues = "sec_kill.order.queue")

public class SecKillOrderConsumer {

@RabbitHandler

public void handleSecKillSuccess(SecKillSuccessEvent event) {

// 异步创建订单

orderService.createSecKillOrder(event.getUserId(), event.getItemId());

}

}

技术要点

库存预扣:使用Redis原子操作避免超卖

队列缓冲:MQ缓冲请求,避免直接冲击数据库

限流控制:在网关层进行限流,拒绝过多请求

场景四:数据同步

背景描述

在微服务架构中,不同服务有自己的数据库,需要保证数据一致性。

MQ解决方案

// 用户服务 - 数据变更时发送消息

@Service

public class UserService {

@Transactional

public User updateUser(User user) {

// 1. 更新数据库

userDao.update(user);

// 2. 发送消息(在事务内)

rocketMQTemplate.sendMessageInTransaction(

"user-update-topic",

MessageBuilder.withPayload(new UserUpdateEvent(user.getId(), user.getStatus()))

.build(),

null

);

return user;

}

}

// 其他服务 - 消费用户更新消息

@Service

@RocketMQMessageListener(topic = "user-update-topic", consumerGroup = "order-group")

public class UserUpdateConsumer implements RocketMQListener<UserUpdateEvent> {

@Override

public void onMessage(UserUpdateEvent event) {

// 更新本地用户信息缓存

orderService.updateUserCache(event.getUserId(), event.getStatus());

}

}

一致性保证

本地事务表:将消息和业务数据放在同一个数据库事务中

事务消息:使用RocketMQ的事务消息机制

幂等消费:消费者实现幂等性,避免重复处理

场景五:日志收集

背景描述

分布式系统中,日志分散在各个节点,需要集中收集和分析。

MQ解决方案

image

代码实现:

// 日志收集组件

@Component

public class LogCollector {

@Autowired

private KafkaTemplate<String, String> kafkaTemplate;

public void collectLog(String appId, String level, String message, Map<String, Object> context) {

LogEvent logEvent = new LogEvent(appId, level, message, context, System.currentTimeMillis());

// 发送到Kafka

kafkaTemplate.send("app-logs", appId, JsonUtils.toJson(logEvent));

}

}

// 日志消费者

@Service

public class LogConsumer {

@KafkaListener(topics = "app-logs", groupId = "log-es")

public void consumeLog(String message) {

LogEvent logEvent = JsonUtils.fromJson(message, LogEvent.class);

// 存储到Elasticsearch

elasticsearchService.indexLog(logEvent);

// 实时监控检查

if ("ERROR".equals(logEvent.getLevel())) {

alertService.checkAndAlert(logEvent);

}

}

}

技术优势

解耦:应用节点无需关心日志如何处理

缓冲:应对日志产生速率波动

多消费:同一份日志可以被多个消费者处理

场景六:消息广播

背景描述

系统配置更新后,需要通知所有服务节点更新本地配置。

MQ解决方案

// 配置服务 - 广播配置更新

@Service

public class ConfigService {

@Autowired

private RedisTemplate<String, Object> redisTemplate;

public void updateConfig(String configKey, String configValue) {

// 1. 更新配置存储

configDao.updateConfig(configKey, configValue);

// 2. 广播配置更新消息

redisTemplate.convertAndSend("config-update-channel",

new ConfigUpdateEvent(configKey, configValue));

}

}

// 服务节点 - 订阅配置更新

@Component

public class ConfigUpdateListener {

@Autowired

private LocalConfigCache localConfigCache;

@RedisListener(channel = "config-update-channel")

public void handleConfigUpdate(ConfigUpdateEvent event) {

// 更新本地配置缓存

localConfigCache.updateConfig(event.getKey(), event.getValue());

}

}

应用场景

功能开关:动态开启或关闭功能

参数调整:调整超时时间、限流阈值等

黑白名单:更新黑白名单配置

场景七:顺序消息

背景描述

在某些业务场景中,消息的处理顺序很重要,如订单状态变更。

MQ解决方案

// 订单状态变更服务

@Service

public class OrderStateService {

@Autowired

private RocketMQTemplate rocketMQTemplate;

public void changeOrderState(String orderId, String oldState, String newState) {

OrderStateEvent event = new OrderStateEvent(orderId, oldState, newState);

// 发送顺序消息,使用orderId作为sharding key

rocketMQTemplate.syncSendOrderly(

"order-state-topic",

event,

orderId // 保证同一订单的消息按顺序处理

);

}

}

// 订单状态消费者

@Service

@RocketMQMessageListener(

topic = "order-state-topic",

consumerGroup = "order-state-group",

consumeMode = ConsumeMode.ORDERLY // 顺序消费

)

public class OrderStateConsumer implements RocketMQListener<OrderStateEvent> {

@Override

public void onMessage(OrderStateEvent event) {

// 按顺序处理订单状态变更

orderService.processStateChange(event);

}

}

顺序保证机制

分区顺序:同一分区内的消息保证顺序

顺序投递:MQ保证消息按发送顺序投递

顺序处理:消费者顺序处理消息

场景八:延迟消息

背景描述

需要实现定时任务,如订单超时未支付自动取消。

MQ解决方案

// 订单服务 - 发送延迟消息

@Service

public class OrderService {

@Autowired

private RabbitTemplate rabbitTemplate;

public void createOrder(Order order) {

// 保存订单

orderDao.save(order);

// 发送延迟消息,30分钟后检查支付状态

rabbitTemplate.convertAndSend(

"order.delay.exchange",

"order.create",

new OrderCreateEvent(order.getId()),

message -> {

message.getMessageProperties().setDelay(30 * 60 * 1000); // 30分钟

return message;

}

);

}

}

// 订单超时检查消费者

@Component

@RabbitListener(queues = "order.delay.queue")

public class OrderTimeoutConsumer {

@RabbitHandler

public void checkOrderPayment(OrderCreateEvent event) {

Order order = orderDao.findById(event.getOrderId());

if ("UNPAID".equals(order.getStatus())) {

// 超时未支付,取消订单

orderService.cancelOrder(order.getId(), "超时未支付");

}

}

}

替代方案对比

方案 优点 缺点

数据库轮询 实现简单 实时性差,数据库压力大

延时队列 实时性好 实现复杂,消息堆积问题

定时任务 可控性强 分布式协调复杂

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/13 5:11:51

程序员兼职:高效拓展收入与技术能力的现实路径

随着远程办公、灵活用工逐渐成为趋势&#xff0c;程序员兼职正在成为许多开发者提高收入、积累项目经验、探索更多职业路线的现实选择。不同于传统的固定工作&#xff0c;兼职项目更自由&#xff0c;但同时也带来了更高的信息不对称与执行压力。 为了帮助想进入程序员兼职市场的…

作者头像 李华
网站建设 2026/6/18 5:28:14

一生一芯学习:PA2:输入输出

入输出是计算机与外界交互的基本手段&#xff0c;只需要向设备发送一些有意义的数字信号&#xff0c;设备就会按照这些信号来工作。设备有自己的专属寄存器&#xff08;如CPU的通用寄存器&#xff09;&#xff0c;也有自己的功能部件&#xff08;如CPU的ALU&#xff09;。以键盘…

作者头像 李华
网站建设 2026/6/20 14:54:45

littlefs版本升级深度解析:从架构演进到实战应用

littlefs版本升级深度解析&#xff1a;从架构演进到实战应用 【免费下载链接】littlefs A little fail-safe filesystem designed for microcontrollers 项目地址: https://gitcode.com/GitHub_Trending/li/littlefs littlefs文件系统作为嵌入式领域的明星项目&#xff…

作者头像 李华
网站建设 2026/6/17 15:34:46

收藏!RAG技术从入门到落地:大模型时代程序员必学的增强秘籍

对于刚接触大模型的程序员小白来说&#xff0c;是不是常遇到这些头疼问题&#xff1a;调用GPT回答专业问题时频频“一本正经地胡说八道”&#xff1f;想让模型掌握2025年最新技术动态却无从下手&#xff1f;微调大模型的高昂成本让人望而却步&#xff1f;别慌&#xff0c;RAG&a…

作者头像 李华
网站建设 2026/6/22 7:31:10

50岁再去旅行,不是流浪,是回家

“到了50岁还背着包到处跑&#xff0c;不是老不正经&#xff0c;是终于把人生调成了自己的频道。”01 把"流浪"翻译成"漫游"&#xff0c;世界就安静了 有人说&#xff1a; “50岁去旅行&#xff0c;像无家可归的流浪。” 我却觉得&#xff1a; 20岁的旅行才…

作者头像 李华