文章目录
- **1. 调用层次结构**
- **2. 完整调用链**
- **场景 1:zmq_connect()**
- **场景 2:zmq_bind()**
- **3. attach_pipe 的实现**
- **4. 不同 Socket 类型的 xattach_pipe**
- **4.1 DEALER**
- **4.2 PUSH**
- **4.3 PULL**
- **4.4 REQ**
- **4.5 ROUTER**
- **4.6 PUB**
- **4.7 SUB**
- **5. attach 的完整流程图**
- **6. attach 的时机**
- **7. 实际示例**
- **示例 1:DEALER 连接多个服务端**
- **示例 2:PUSH-PULL 模式**
- **8. attach 时发生了什么?**
- **9. 总结**
ZeroMQ 创建两个 pipe 来实现双向通信:
- new_pipes[0] :本地端 pipe,附加到当前 socket
- new_pipes[1] :远程端 pipe,附加到 session 对象
这种设计确保了消息可以在两个方向上独立流动,提高了通信效率和可靠性。
1. 调用层次结构
应用层 ↓ zmq_connect() / zmq_bind() ↓ socket_base_t::connect() / bind() ↓ 创建 Pipe (pipepair) ↓ socket_base_t::attach_pipe() ← 第 1 层 ↓ xattach_pipe() ← 第 2 层(虚函数,子类实现) ↓ dealer_t::xattach_pipe() ← 第 3 层 ↓ _lb.attach() / _fq.attach() ← 第 4 层2. 完整调用链
场景 1:zmq_connect()
// socket_base.cpp - connect 流程intzmq::socket_base_t::connect(constchar*endpoint_uri_){// 1. 创建 Sessionsession_base_t*session=session_base_t::create(...);// 2. 创建双向 Pipepipe_t*new_pipes[2]={NULL,NULL};rc=pipepair(parents,new_pipes,hwms,conflates);// 3. ★★★ 调用 attach_pipe ★★★attach_pipe(new_pipes[0],subscribe_to_all,true);// new_pipes[0] 连接到 Socket// new_pipes[1] 连接到 Session// 4. Session 也附加 Pipesession->attach_pipe(new_pipes[1]);// 5. 保存端点add_endpoint(...);return0;}场景 2:zmq_bind()
// socket_base.cpp - bind 流程(以 inproc 为例)intzmq::socket_base_t::bind(constchar*endpoint_uri_){if(protocol==protocol_name::inproc){// 1. 创建 Pipepipe_t*new_pipes[2]={NULL,NULL};rc=pipepair(parents,new_pipes,hwms,conflates);// 2. ★★★ 调用 attach_pipe ★★★attach_pipe(new_pipes[0],true,true);// 3. 保存端点add_endpoint(...);return0;}// TCP 等其他协议:创建监听器,等待连接// 当有连接时,也会调用 attach_pipe}3. attach_pipe 的实现
// socket_base.cpp 第 408 行voidzmq::socket_base_t::attach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){// 1. 设置事件接收者pipe_->set_event_sink(this);// 2. 添加到 Socket 的 Pipe 列表_pipes.push_back(pipe_);// 3. ★★★ 调用子类的 xattach_pipe ★★★xattach_pipe(pipe_,subscribe_to_all_,locally_initiated_);// 4. 如果 Socket 正在终止,立即终止 Pipeif(is_terminating()){register_term_acks(1);pipe_->terminate(false);}}关键点:
- ✅
pipe_->set_event_sink(this):让 Pipe 知道事件发给谁 - ✅
_pipes.push_back():Socket 管理所有 Pipe - ✅
xattach_pipe():虚函数,不同 Socket 类型有不同行为
4. 不同 Socket 类型的 xattach_pipe
4.1 DEALER
// dealer.cpp 第 48 行voidzmq::dealer_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// 1. 如果启用了探测路由,发送探测消息if(_probe_router){msg_t probe_msg;probe_msg.init();pipe_->write(&probe_msg);pipe_->flush();probe_msg.close();}// 2. ★★★ 附加到入站队列和出站队列 ★★★_fq.attach(pipe_);// Fair Queuing(接收)_lb.attach(pipe_);// Load Balancing(发送)}用途:DEALER 需要双向负载均衡
4.2 PUSH
// push.cpp 第 47 行voidzmq::push_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){LIBZMQ_UNUSED(subscribe_to_all_);LIBZMQ_UNUSED(locally_initiated_);// 1. 不延迟 Pipe 终止(没有接收者)pipe_->set_nodelay();zmq_assert(pipe_);// 2. ★★★ 只附加到出站队列 ★★★_lb.attach(pipe_);}用途:PUSH 只发送,不需要入站队列
4.3 PULL
// pull.cppvoidzmq::pull_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){LIBZMQ_UNUSED(subscribe_to_all_);LIBZMQ_UNUSED(locally_initiated_);zmq_assert(pipe_);// ★★★ 只附加到入站队列 ★★★_fq.attach(pipe_);}用途:PULL 只接收,不需要出站队列
4.4 REQ
// req.cpp - REQ 继承自 DEALER// 没有重写 xattach_pipe,使用 DEALER 的实现voidzmq::req_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){// 调用父类 DEALER 的实现dealer_t::xattach_pipe(pipe_,subscribe_to_all_,locally_initiated_);}用途:REQ 和 DEALER 一样,但有自己的状态机
4.5 ROUTER
// router.cpp 第 72 行voidzmq::router_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// 1. 如果启用了探测,发送空消息if(_probe_router){msg_t probe_msg;probe_msg.init();pipe_->write(&probe_msg);pipe_->flush();probe_msg.close();}// 2. ★★★ 附加到入站队列和路由表 ★★★_fq.attach(pipe_);// 3. 为 Pipe 分配路由 IDif(!locally_initiated_&&options.router_mandatory){// 为远程对端生成路由 IDblob_t routing_id=generate_routing_id();pipe_->set_router_socket_routing_id(routing_id);}}用途:ROUTER 需要维护路由表,知道哪个 ID 对应哪个 Pipe
4.6 PUB
// pub.cppvoidzmq::pub_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// ★★★ 附加到分发器 ★★★_dist.attach(pipe_);// 如果订阅了所有消息,立即发送订阅if(subscribe_to_all_){msg_t subscribe_msg;subscribe_msg.init();subscribe_msg.set_flags(msg_t::subscribe);pipe_->write(&subscribe_msg);pipe_->flush();subscribe_msg.close();}}用途:PUB 使用dist_t(分发器)而不是lb_t
4.7 SUB
// sub.cppvoidzmq::sub_t::xattach_pipe(pipe_t*pipe_,boolsubscribe_to_all_,boollocally_initiated_){zmq_assert(pipe_);// ★★★ 附加到入站队列 ★★★_fq.attach(pipe_);// 发送订阅消息if(!subscribe_to_all_){// 发送用户设置的订阅for(auto&subscription:_subscriptions){msg_t sub_msg;sub_msg.init_size(subscription.size());memcpy(sub_msg.data(),subscription.data(),subscription.size());sub_msg.set_flags(msg_t::subscribe);pipe_->write(&sub_msg);pipe_->flush();sub_msg.close();}}}用途:SUB 在连接时自动发送订阅消息
5. attach 的完整流程图
┌─────────────────────────────────────────────────────────┐ │ 应用层:zmq_connect(socket, "tcp://localhost:5555") │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────────┐ │ socket_base_t::connect() │ │ - 创建 Session │ │ - 创建 Pipe (pipepair) │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────────────▼────────────────────────────────────┐ │ socket_base_t::attach_pipe(pipe_) │ │ - pipe_->set_event_sink(this) │ │ - _pipes.push_back(pipe_) │ │ - xattach_pipe(pipe_) ← 虚函数 │ └────────────────────┬────────────────────────────────────┘ │ ┌────────────┴────────────┐ │ │ ┌───────▼───────┐ ┌──────▼──────┐ │ DEALER │ │ PUSH │ │ xattach_pipe │ │ xattach_pipe│ │ _fq.attach() │ │ _lb.attach()│ │ _lb.attach() │ │ │ └───────────────┘ └─────────────┘6. attach 的时机
| 时机 | 触发操作 | 调用位置 |
|---|---|---|
| connect() | 主动连接 | [socket_base.cpp:628](file:///home/victory/test/zmq-build/libzmq/src/socket_base.cpp#L628) |
| bind()(inproc) | 绑定 inproc | [socket_base.cpp:1108](file:///home/victory/test/zmq-build/libzmq/src/socket_base.cpp#L1108) |
| accept() | 接受连接 | 监听器触发 |
| pipe 配对 | inproc 配对 | [session_base.cpp:375](file:///home/victory/test/zmq-build/libzmq/src/session_base.cpp#L375) |
7. 实际示例
示例 1:DEALER 连接多个服务端
// 应用代码void*dealer=zmq_socket(ctx,ZMQ_DEALER);zmq_connect(dealer,"tcp://server1:5555");zmq_connect(dealer,"tcp://server2:5555");zmq_connect(dealer,"tcp://server3:5555");// 内部流程:// 第 1 次 connect:// pipe1 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe1)// _lb.attach(pipe1)// _lb._pipes: [pipe1]// 第 2 次 connect:// pipe2 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe2)// _lb.attach(pipe2)// _lb._pipes: [pipe1, pipe2]// 第 3 次 connect:// pipe3 → attach_pipe() → dealer_t::xattach_pipe()// _fq.attach(pipe3)// _lb.attach(pipe3)// _lb._pipes: [pipe1, pipe2, pipe3]// 发送时 Round-Robin:// 消息 1 → pipe1// 消息 2 → pipe2// 消息 3 → pipe3// 消息 4 → pipe1 (循环)示例 2:PUSH-PULL 模式
// PUSH 端void*push=zmq_socket(ctx,ZMQ_PUSH);zmq_bind(push,"tcp://*:5555");// 第 1 个 PULL 连接// pipe1 → attach_pipe() → push_t::xattach_pipe()// _lb.attach(pipe1)// _lb._pipes: [pipe1]// 第 2 个 PULL 连接// pipe2 → attach_pipe() → push_t::xattach_pipe()// _lb.attach(pipe2)// _lb._pipes: [pipe1, pipe2]// PULL 端void*pull=zmq_socket(ctx,ZMQ_PULL);zmq_connect(pull,"tcp://push:5555");// pipe → attach_pipe() → pull_t::xattach_pipe()// _fq.attach(pipe)8. attach 时发生了什么?
voidzmq::lb_t::attach(pipe_t*pipe_){// 1. 添加到列表_pipes.push_back(pipe_);// 2. 立即激活(移到活跃区)activated(pipe_);}voidzmq::lb_t::activated(pipe_t*pipe_){// 将 Pipe 移到活跃区(前 _active 个位置)_pipes.swap(_pipes.index(pipe_),_active);_active++;}效果:
attach 前: _pipes: [] _active=0 attach pipe1: _pipes: [pipe1] _active=1 attach pipe2: _pipes: [pipe1, pipe2] _active=2 attach pipe3: _pipes: [pipe1, pipe2, pipe3] _active=39. 总结
attach的调用场景:
- ✅zmq_connect():每次连接时
- ✅zmq_bind():inproc 绑定时
- ✅接受连接:TCP 服务端接受客户端
- ✅inproc 配对:inproc bind/connect 配对时
调用链:
zmq_connect/bind ↓ socket_base_t::attach_pipe() ↓ xattach_pipe() (虚函数) ↓ _lb.attach() / _fq.attach() / _dist.attach()不同 Socket 的行为:
- DEALER/REQ/CLIENT:
_fq + _lb(双向) - PUSH:
_lb(只发送) - PULL:
_fq(只接收) - ROUTER:
_fq + 路由表 - PUB:
_dist(广播) - SUB:
_fq + 自动订阅
一句话:attach在每个连接建立时调用,让 Socket 知道有哪些 Pipe 可用!🎯