news 2026/4/17 13:03:43

MongoDB实现发布订阅机制

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
MongoDB实现发布订阅机制

一、MongoDB Pub/Sub 的实现原理
MongoDB 的发布订阅不是像 Redis 那样的原生 “频道式” Pub/Sub,而是基于变更流(Change Streams)(MongoDB 3.6+ 推荐)或早期的tailable cursor(可尾游标)实现:
Change Streams:监听集合 / 数据库 / 集群的实时数据变更(插入、更新、删除等),订阅者通过监听这些变更事件来接收 “消息”,发布者则通过向集合插入文档来 “发布消息”。
核心逻辑:把 “消息” 封装成文档插入 MongoDB 集合,订阅者监听该集合的插入事件,从而实现消息的发布与订阅。

const path = require('path'); const paths = require('../paths'); const dbManager = require(path.join(paths.manager, 'dbManager')) const COLLECTION_NAME = 'messages'; // 存储消息的集合名 // 并发 / 吞吐量能力(参考值,普通服务器硬件) // 指标 MongoDB(tailable cursor) Redis Pub/Sub MongoDB(Change Streams) // 单频道订阅者数量 支持 100+(无明显卡顿) 支持 1000+ 支持 50+ // 消息发布 QPS 1 万 - 5 万 / 秒 10 万 + / 秒 5000 - 2 万 / 秒 // 消息延迟(发布→接收) 1-10ms <1ms 5-20ms /** * 初始化:创建固定集合(仅需执行一次) * 固定集合是tailable cursor的前提,大小限制100MB,自动覆盖旧文档 */ async function initCappedCollection() { try { // 检查集合是否存在,不存在则创建固定集合 const collections = await dbManager.getDB().listCollections({ name: COLLECTION_NAME }).toArray(); if (collections.length === 0) { await dbManager.getDB().createCollection(COLLECTION_NAME, { capped: true, // 启用固定集合 size: 100 * 1024 * 1024, // 集合最大大小100MB max: 10000 // 最多存储10000条文档(二选一,满足其一即触发覆盖) }); console.log('固定集合创建成功'); } } catch (err) { console.error('初始化集合失败:', err); } } /** * 发布者:向集合插入消息文档(模拟发布) * @param {string} channel 频道名(区分不同类型的消息) * @param {any} data 要发布的消息内容 */ async function publish(channel, data) { try { const collection = dbManager.getDB().collection(COLLECTION_NAME); // 插入消息文档(包含频道、内容、时间戳) await collection.insertOne({ channel: channel, data: data, timestamp: new Date(), }); console.log(`[发布] 频道${channel}:`, data); } catch (err) { console.error('发布失败:', err); } } /** * 订阅者:监听指定频道的消息(模拟订阅) * @param {string} channel 要订阅的频道名 */ async function subscribe(channel) { try { const collection = dbManager.getDB().collection(COLLECTION_NAME); console.log(`[订阅] 开始监听频道${channel}...`); let lastId = null; const lastDoc = await collection.findOne( { channel: channel }, { sort: { $natural: -1 } } // 仅查询最后一条时用倒序,tailable cursor本身不用 ); if (lastDoc) { lastId = lastDoc._id; } // 创建tailable cursor(持续监听最新文档) const cursor = collection.find( lastId ? { channel: channel, _id: { $gt: lastId } } : { channel: channel }, // 只读新文档 { tailable: true, // 启用可尾游标 awaitData: true, // 等待新数据(阻塞查询) noCursorTimeout: true, // 禁用游标超时 sort: { $natural: 1 } // 仅支持正序 } ); // 持续遍历游标,接收新消息 while (true) { if (await cursor.hasNext()) { // 有新文档时触发 const message = await cursor.next(); console.log(`[接收] 频道${channel}:`, message.data); } else { // 无新数据时短暂等待,避免CPU空转 await new Promise(resolve => setTimeout(resolve, 100)); } } } catch (err) { console.error('订阅失败:', err); } } // -------------------------- 示例调用 -------------------------- async function runDemo() { await initCappedCollection() // 启动订阅者(监听"chat"频道) subscribe('chat'); // 延迟1秒后发布3条测试消息 setTimeout(() => publish('chat', { user: '张三', msg: '你好!' }), 1000); setTimeout(() => publish('chat', { user: '李四', msg: 'MongoDB Pub/Sub测试' }), 2000); setTimeout(() => publish('notice', { content: '这是通知频道,订阅者收不到' }), 3000); // 其他频道不接收 } module.exports = { runDemo, };

这个方案的性能消耗属于轻到中等级别,远低于 Redis Pub/Sub(Redis 是纯内存轻量级),但高于 MongoDB Change Streams(副本集模式),适合中小规模的消息场景(如单服务 / 低并发通知),不适合高并发、低延迟的核心业务场景。

1. CPU 消耗
客户端(Node.js 服务):
核心消耗点:cursor.hasNext() 循环 + 消息处理回调。
关键优化:开启 awaitData: true(代码中已配置)时,MongoDB 驱动会采用阻塞式 IO 等待新数据,而非空轮询,此时 CPU 占用极低(单订阅频道 CPU 使用率 < 1%);若关闭 awaitData 改用主动轮询(如每 100ms 查一次),CPU 会飙升至 10%+(空转消耗)。
多频道订阅:每增加一个频道,CPU 消耗线性增加,但单进程支持 50+ 频道订阅时,CPU 仍可控制在 5% 以内。
服务端(MongoDB):
固定集合(capped collection)的插入 / 查询都是顺序 IO(而非普通集合的随机 IO),CPU 消耗极低;
维护 tailable cursor 的开销可忽略(单节点下,100 个 cursor 仅占用 < 0.5 核 CPU)。
2. 内存消耗
客户端(Node.js 服务):
每个订阅频道对应一个 cursor 对象,内存占用仅几 KB;即使 100 个频道订阅,总内存增加也不足 1MB。
仅需缓存少量消息元数据(如最后一条消息的 _id),无额外内存压力。
服务端(MongoDB):
固定集合的元数据(索引、游标信息)占用内存约几十 MB;
固定集合本身会按 size 配置(如 100MB)加载部分数据到内存(MongoDB 缓存机制),但可通过 wiredTiger 缓存配置限制,避免内存溢出。
3. 磁盘 IO 消耗
固定集合是 MongoDB 中磁盘 IO 效率最高的集合类型:
插入消息时是顺序写(磁盘磁头无需频繁移动),比普通集合的随机写快 5-10 倍;
tailable cursor 读取消息是顺序读,几乎无随机 IO,磁盘 IO 利用率 > 90%;
对比普通集合:普通集合的随机写 IO 消耗是固定集合的 3-5 倍。
4. 网络消耗
连接层面:tailable cursor 会保持长连接(无频繁重连),无额外心跳包开销;
数据层面:仅传输新插入的消息数据(含 _id/channel/data/timestamp 等元数据),单条消息的网络包比 Redis Pub/Sub 大(Redis 仅传输纯消息内容,MongoDB 多文档元数据),但整体网络消耗仍属低级别;
并发订阅:多订阅者监听同一频道时,MongoDB 会为每个订阅者推送消息,网络消耗随订阅者数量线性增加(Redis 同理)。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/7 13:40:13

特殊符号绕过-ctfshow-web40

一、打开环境看源码if(isset($_GET[c])){$c $_GET[c];if(!preg_match("/[0-9]|\~|\|\|\#|\\$|\%|\^|\&|\*|\&#xff08;|\&#xff09;|\-|\|\|\{|\[|\]|\}|\:|\|\"|\,|\<|\.|\>|\/|\?|\\\\/i", $c)){eval($c);}}else{highlight_file(__FILE__); }…

作者头像 李华
网站建设 2026/4/12 20:05:18

Java基于Spring Boot+Vue的学生宿舍管理系统的设计于实现

所需该项目可以在最下面查看联系方式&#xff0c;为防止迷路可以收藏文章&#xff0c;以防后期找不到 这里写目录标题项目介绍系统实现截图技术栈介绍Spring Boot与Vue结合使用的优势Spring Boot的优点Vue的优点Spring Boot 框架结构解析Vue介绍系统执行流程Java语言介绍系统测…

作者头像 李华
网站建设 2026/4/16 11:04:00

2026必备!MBA论文写作痛点全解析:TOP9一键生成论文工具深度测评

2026必备&#xff01;MBA论文写作痛点全解析&#xff1a;TOP9一键生成论文工具深度测评 2026年MBA论文写作工具测评&#xff1a;为何需要这份榜单&#xff1f; 随着MBA课程的日益深入&#xff0c;论文写作已成为每位学生必须面对的重要环节。然而&#xff0c;从选题构思到资料收…

作者头像 李华
网站建设 2026/4/18 3:27:25

震惊!用大模型开发零售AI引擎竟然如此简单?手把手教你从零构建企业级RAG系统,小白也能秒变AI架构师!

在零售行业数字化转型的浪潮中&#xff0c;大型语言模型&#xff08;LLM&#xff09;的应用正从概念验证走向生产部署。然而&#xff0c;直接将通用大模型应用于零售业务&#xff0c;往往面临准确性、安全性和可扩展性的三重挑战。本文将从工程实践角度&#xff0c;深入解析如何…

作者头像 李华
网站建设 2026/4/18 3:26:44

【AI大模型必看】从“内存墙“到“算力瓶颈“:大模型推理技术演进全攻略,小白也能秒懂的保姆级教程!

MLNLP社区是国内外知名的机器学习与自然语言处理社区&#xff0c;受众覆盖国内外NLP硕博生、高校老师以及企业研究人员。 社区的愿景是促进国内外自然语言处理&#xff0c;机器学习学术界、产业界和广大爱好者之间的交流和进步&#xff0c;特别是初学者同学们的进步。 来源 |…

作者头像 李华