news 2026/4/20 20:31:17

zmq源码分析之pipe attach时机

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
zmq源码分析之pipe attach时机

文章目录

      • **1. 调用层次结构**
      • **2. 完整调用链**
        • **场景 1:zmq_connect()**
        • **场景 2:zmq_bind()**
      • **3. attach_pipe 的实现**
      • **4. 不同 Socket 类型的 xattach_pipe**
        • **4.1 DEALER**
        • **4.2 PUSH**
        • **4.3 PULL**
        • **4.4 REQ**
        • **4.5 ROUTER**
        • **4.6 PUB**
        • **4.7 SUB**
      • **5. attach 的完整流程图**
      • **6. attach 的时机**
      • **7. 实际示例**
        • **示例 1:DEALER 连接多个服务端**
        • **示例 2:PUSH-PULL 模式**
      • **8. attach 时发生了什么?**
      • **9. 总结**

ZeroMQ 创建两个 pipe 来实现双向通信:

  • new_pipes[0] :本地端 pipe,附加到当前 socket
  • new_pipes[1] :远程端 pipe,附加到 session 对象
    这种设计确保了消息可以在两个方向上独立流动,提高了通信效率和可靠性。

1. 调用层次结构

应用层 ↓ zmq_connect() / zmq_bind() ↓ socket_base_t::connect() / bind() ↓ 创建 Pipe (pipepair) ↓ socket_base_t::attach_pipe() ← 第 1 层 ↓ xattach_pipe() ← 第 2 层(虚函数,子类实现) ↓ dealer_t::xattach_pipe() ← 第 3 层 ↓ _lb.attach() / _fq.attach() ← 第 4 层

2. 完整调用链

场景 1:zmq_connect()
// socket_base.cpp - connect 流程intzmq::socket_base_t::connect(constchar*endpoint_uri_){// 1. 创建 Sessionsession_base_t*session=session_base_t::create(...);// 2. 创建双向 Pipepipe_t*new_pipes[2]={NULL,NULL};rc=pipepair(parents,new_pipes,hwms,conflates);// 3. ★★★ 调用 attach_pipe ★★★attach_pipe(new_pipes[0],subscribe_to_all,true);// new_pipes[0] 连接到 Socket// new_pipes[1] 连接到 Session// 4. Session 也附加 Pipesession->attach_pipe(new_pipes[1]);// 5. 保存端点add_endpoint(...);return0;}

场景 2:zmq_bind()
// socket_base.cpp - bind 流程(以 inproc 为例)intzmq::socket_base_t::bind(constchar*endpoint_uri_){if(protocol==protocol_name::inproc){// 1. 创建 Pipepipe_t*new_pipes[2]={NULL,NULL};rc=pipepair(parents,new_pipes,hwms,conflates);// 2. ★★★ 调用 attach_pipe ★★★attach_pipe(new_pipes[0],true,true);// 3. 保存端点add_endpoint(...);return0;}// TCP 等其他协议:创建监听器,等待连接// 当有连接时,也会调用 attach_pipe}

3. attach_pipe 的实现

// socket_base.cpp 第 408 行voidzmq::socket_base_t::attach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){// 1. 设置事件接收者pipe_->set_event_sink(this);// 2. 添加到 Socket 的 Pipe 列表_pipes.push_back(pipe_);// 3. ★★★ 调用子类的 xattach_pipe ★★★xattach_pipe(pipe_,subscribe_to_all_,locally_initiated_);// 4. 如果 Socket 正在终止,立即终止 Pipeif(is_terminating()){register_term_acks(1);pipe_->terminate(false);}}

关键点

  • pipe_->set_event_sink(this):让 Pipe 知道事件发给谁
  • _pipes.push_back():Socket 管理所有 Pipe
  • xattach_pipe()虚函数,不同 Socket 类型有不同行为

4. 不同 Socket 类型的 xattach_pipe

4.1 DEALER
// dealer.cpp 第 48 行voidzmq::dealer_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// 1. 如果启用了探测路由,发送探测消息if(_probe_router){msg_t probe_msg;probe_msg.init();pipe_->write(&probe_msg);pipe_->flush();probe_msg.close();}// 2. ★★★ 附加到入站队列和出站队列 ★★★_fq.attach(pipe_);// Fair Queuing(接收)_lb.attach(pipe_);// Load Balancing(发送)}

用途:DEALER 需要双向负载均衡


4.2 PUSH
// push.cpp 第 47 行voidzmq::push_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){LIBZMQ_UNUSED(subscribe_to_all_);LIBZMQ_UNUSED(locally_initiated_);// 1. 不延迟 Pipe 终止(没有接收者)pipe_->set_nodelay();zmq_assert(pipe_);// 2. ★★★ 只附加到出站队列 ★★★_lb.attach(pipe_);}

用途:PUSH 只发送,不需要入站队列


4.3 PULL
// pull.cppvoidzmq::pull_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){LIBZMQ_UNUSED(subscribe_to_all_);LIBZMQ_UNUSED(locally_initiated_);zmq_assert(pipe_);// ★★★ 只附加到入站队列 ★★★_fq.attach(pipe_);}

用途:PULL 只接收,不需要出站队列


4.4 REQ
// req.cpp - REQ 继承自 DEALER// 没有重写 xattach_pipe,使用 DEALER 的实现voidzmq::req_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){// 调用父类 DEALER 的实现dealer_t::xattach_pipe(pipe_,subscribe_to_all_,locally_initiated_);}

用途:REQ 和 DEALER 一样,但有自己的状态机


4.5 ROUTER
// router.cpp 第 72 行voidzmq::router_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// 1. 如果启用了探测,发送空消息if(_probe_router){msg_t probe_msg;probe_msg.init();pipe_->write(&probe_msg);pipe_->flush();probe_msg.close();}// 2. ★★★ 附加到入站队列和路由表 ★★★_fq.attach(pipe_);// 3. 为 Pipe 分配路由 IDif(!locally_initiated_&&options.router_mandatory){// 为远程对端生成路由 IDblob_t routing_id=generate_routing_id();pipe_->set_router_socket_routing_id(routing_id);}}

用途:ROUTER 需要维护路由表,知道哪个 ID 对应哪个 Pipe


4.6 PUB
// pub.cppvoidzmq::pub_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// ★★★ 附加到分发器 ★★★_dist.attach(pipe_);// 如果订阅了所有消息,立即发送订阅if(subscribe_to_all_){msg_t subscribe_msg;subscribe_msg.init();subscribe_msg.set_flags(msg_t::subscribe);pipe_->write(&subscribe_msg);pipe_->flush();subscribe_msg.close();}}

用途:PUB 使用dist_t(分发器)而不是lb_t


4.7 SUB
// sub.cppvoidzmq::sub_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// ★★★ 附加到入站队列 ★★★_fq.attach(pipe_);// 发送订阅消息if(!subscribe_to_all_){// 发送用户设置的订阅for(auto&subscription:_subscriptions){msg_t sub_msg;sub_msg.init_size(subscription.size());memcpy(sub_msg.data(),subscription.data(),subscription.size());sub_msg.set_flags(msg_t::subscribe);pipe_->write(&sub_msg);pipe_->flush();sub_msg.close();}}}

用途:SUB 在连接时自动发送订阅消息


5. attach 的完整流程图

┌─────────────────────────────────────────────────────────┐ │ 应用层:zmq_connect(socket, "tcp://localhost:5555") │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────────┐ │ socket_base_t::connect() │ │ - 创建 Session │ │ - 创建 Pipe (pipepair) │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────────┐ │ socket_base_t::attach_pipe(pipe_) │ │ - pipe_->set_event_sink(this) │ │ - _pipes.push_back(pipe_) │ │ - xattach_pipe(pipe_) ← 虚函数 │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────┴────────────┐ │ │ ┌───────▼───────┐ ┌──────▼──────┐ │ DEALER │ │ PUSH │ │ xattach_pipe │ │ xattach_pipe│ │ _fq.attach() │ │ _lb.attach()│ │ _lb.attach() │ │ │ └───────────────┘ └─────────────┘

6. attach 的时机

时机触发操作调用位置
connect()主动连接[socket_base.cpp:628](file:///home/victory/test/zmq-build/libzmq/src/socket_base.cpp#L628)
bind()(inproc)绑定 inproc[socket_base.cpp:1108](file:///home/victory/test/zmq-build/libzmq/src/socket_base.cpp#L1108)
accept()接受连接监听器触发
pipe 配对inproc 配对[session_base.cpp:375](file:///home/victory/test/zmq-build/libzmq/src/session_base.cpp#L375)

7. 实际示例

示例 1:DEALER 连接多个服务端
// 应用代码void*dealer=zmq_socket(ctx,ZMQ_DEALER);zmq_connect(dealer,"tcp://server1:5555");zmq_connect(dealer,"tcp://server2:5555");zmq_connect(dealer,"tcp://server3:5555");// 内部流程:// 第 1 次 connect:// pipe1 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe1)// _lb.attach(pipe1)// _lb._pipes: [pipe1]// 第 2 次 connect:// pipe2 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe2)// _lb.attach(pipe2)// _lb._pipes: [pipe1, pipe2]// 第 3 次 connect:// pipe3 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe3)// _lb.attach(pipe3)// _lb._pipes: [pipe1, pipe2, pipe3]// 发送时 Round-Robin:// 消息 1 → pipe1// 消息 2 → pipe2// 消息 3 → pipe3// 消息 4 → pipe1 (循环)

示例 2:PUSH-PULL 模式
// PUSH 端void*push=zmq_socket(ctx,ZMQ_PUSH);zmq_bind(push,"tcp://*:5555");// 第 1 个 PULL 连接// pipe1 → attach_pipe() → push_t::xattach_pipe()// _lb.attach(pipe1)// _lb._pipes: [pipe1]// 第 2 个 PULL 连接// pipe2 → attach_pipe() → push_t::xattach_pipe()// _lb.attach(pipe2)// _lb._pipes: [pipe1, pipe2]// PULL 端void*pull=zmq_socket(ctx,ZMQ_PULL);zmq_connect(pull,"tcp://push:5555");// pipe → attach_pipe() → pull_t::xattach_pipe()// _fq.attach(pipe)

8. attach 时发生了什么?

voidzmq::lb_t::attach(pipe_t*pipe_){// 1. 添加到列表_pipes.push_back(pipe_);// 2. 立即激活(移到活跃区)activated(pipe_);}voidzmq::lb_t::activated(pipe_t*pipe_){// 将 Pipe 移到活跃区(前 _active 个位置)_pipes.swap(_pipes.index(pipe_),_active);_active++;}

效果

attach 前: _pipes: [] _active=0 attach pipe1: _pipes: [pipe1] _active=1 attach pipe2: _pipes: [pipe1, pipe2] _active=2 attach pipe3: _pipes: [pipe1, pipe2, pipe3] _active=3

9. 总结

attach的调用场景

  1. zmq_connect():每次连接时
  2. zmq_bind():inproc 绑定时
  3. 接受连接:TCP 服务端接受客户端
  4. inproc 配对:inproc bind/connect 配对时

调用链

zmq_connect/bind ↓ socket_base_t::attach_pipe() ↓ xattach_pipe() (虚函数) ↓ _lb.attach() / _fq.attach() / _dist.attach()

不同 Socket 的行为

  • DEALER/REQ/CLIENT_fq + _lb(双向)
  • PUSH_lb(只发送)
  • PULL_fq(只接收)
  • ROUTER_fq + 路由表
  • PUB_dist(广播)
  • SUB_fq + 自动订阅

一句话attach每个连接建立时调用,让 Socket 知道有哪些 Pipe 可用!🎯

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/20 20:26:15

3步解锁百度网盘SVIP:macOS用户提升下载速度终极指南

3步解锁百度网盘SVIP:macOS用户提升下载速度终极指南 【免费下载链接】BaiduNetdiskPlugin-macOS For macOS.百度网盘 破解SVIP、下载速度限制~ 项目地址: https://gitcode.com/gh_mirrors/ba/BaiduNetdiskPlugin-macOS 还在为百度网盘Mac版的下载速度限制而…

作者头像 李华
网站建设 2026/4/20 20:25:39

【Dify日志审计黄金标准】:20年SRE亲授企业级审计配置、合规留痕与实时告警闭环实践

第一章:Dify日志审计的核心价值与架构全景日志审计是保障 Dify 平台安全、可追溯与合规运行的关键能力。在 LLM 应用快速迭代与多租户共享的场景下,原始请求、提示词工程、模型调用链路、响应内容及用户操作行为均需完整记录与结构化归档,为异…

作者头像 李华
网站建设 2026/4/20 20:24:26

Phi-3-mini-4k-instruct-gguf应用实践:技术团队用它批量处理PR描述与Issue摘要

Phi-3-mini-4k-instruct-gguf应用实践:技术团队用它批量处理PR描述与Issue摘要 1. 为什么选择Phi-3-mini处理技术文档 在技术团队日常工作中,编写清晰的PR描述和Issue摘要是一项耗时但又至关重要的工作。传统方式需要工程师花费大量时间反复修改文字&a…

作者头像 李华
网站建设 2026/4/20 20:24:24

金融级Dify部署必须做的7项合规配置,漏1项即触发监管问询!

第一章:金融级Dify部署的合规性底层逻辑金融行业对AI应用的部署并非仅关注功能实现,更核心的是构建可审计、可追溯、可隔离的合规基座。Dify作为低代码LLM应用开发平台,其金融级落地必须从基础设施层、数据流层与策略执行层同步满足等保三级、…

作者头像 李华
网站建设 2026/4/20 20:24:24

004、Git初始化:创建你的第一个本地仓库

004、Git初始化:创建你的第一个本地仓库 昨天帮实习生调试代码,发现他的项目目录里散落着十几个以“_final”“_new”“_backup”结尾的文件夹。问他为什么不用Git,他挠头说:“配置太麻烦,感觉用不上。”这场景让我想…

作者头像 李华