在Ubuntu 22.04上快速搭建Fast DDS通信Demo:从零到实时数据流
如果你正在探索现代分布式系统的通信方案,DDS(数据分发服务)绝对值得深入了解。作为一个专为实时系统设计的中间件协议,DDS的发布/订阅模型和丰富的QoS策略使其在自动驾驶、工业物联网等领域大放异彩。本文将带你用Fast DDS(前身为Fast RTPS)这个开源实现,在Ubuntu 22.04上快速搭建一个可运行的Demo,直观感受DDS的核心机制。
1. 环境准备与Fast DDS安装
在开始之前,确保你的Ubuntu 22.04系统已经更新到最新状态。打开终端,执行以下命令:
sudo apt update && sudo apt upgrade -yFast DDS的安装可以通过apt包管理器轻松完成:
sudo apt install ros-humble-rmw-fastrtps-cpp为什么选择Fast DDS?作为Eclipse Cyclone DDS之外的另一主流开源实现,Fast DDS:
- 完全兼容DDSI-RTPS标准
- 支持多种平台包括Linux、Windows和macOS
- 提供C++和Python API
- 被ROS 2默认采用作为中间件
验证安装是否成功:
fastdds --version如果看到版本信息输出(如2.6.x),说明安装正确。接下来我们需要准备一个工作目录:
mkdir ~/dds_demo && cd ~/dds_demo2. 定义数据模型与Topic
DDS通信的核心是Topic——它是数据发布者和订阅者之间的抽象通道。首先我们需要定义一个IDL(接口定义语言)文件来描述数据结构。
创建HelloWorld.idl文件:
module demo { struct HelloWorld { unsigned long index; string message; }; };这个结构体包含一个计数器index和一个消息内容message。使用Fast DDS提供的工具生成对应的C++代码:
fastddsgen HelloWorld.idl这会生成多个文件,其中最重要的是:
HelloWorld.h:数据结构定义HelloWorldPubSubTypes.h:序列化相关代码HelloWorldPublisher.cpp&HelloWorldSubscriber.cpp:示例代码
3. 编写发布者程序
让我们修改生成的发布者代码,使其周期性发送数据。创建publisher.cpp:
#include <chrono> #include <thread> #include "HelloWorldPublisher.h" using namespace demo; int main(int argc, char *argv[]) { std::cout << "Starting publisher..." << std::endl; HelloWorldPublisher mypub; if (mypub.init()) { HelloWorld hello; hello.message("Hello from Fast DDS!"); for (uint32_t i = 0; ; ++i) { hello.index(i); mypub.publish(&hello); std::cout << "Publishing: " << i << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); } } return 0; }关键点解析:
mypub.init()初始化发布者mypub.publish(&hello)发布数据- 每秒发布一次,附带递增的计数器
编译命令:
g++ -std=c++11 publisher.cpp HelloWorld.cxx \ -I/usr/include/fastrtps -I/usr/include/fastcdr \ -lfastcdr -lfastrtps -o publisher4. 编写订阅者程序
订阅者需要监听相同Topic的数据。创建subscriber.cpp:
#include "HelloWorldSubscriber.h" using namespace demo; class SubListener : public HelloWorldSubscriberListener { public: void on_data_available(HelloWorldSubscriber* sub) override { HelloWorld hello; if (sub->takeNextData(&hello)) { std::cout << "Received: [" << hello.index() << "] " << hello.message() << std::endl; } } }; int main(int argc, char *argv[]) { std::cout << "Starting subscriber..." << std::endl; SubListener listener; HelloWorldSubscriber mysub(&listener); if (mysub.init()) { while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } return 0; }关键组件:
- 自定义
SubListener处理到达的数据 takeNextData获取新数据- 主循环保持程序运行
编译订阅者:
g++ -std=c++11 subscriber.cpp HelloWorld.cxx \ -I/usr/include/fastrtps -I/usr/include/fastcdr \ -lfastcdr -lfastrtps -o subscriber5. 运行与调试
打开两个终端窗口,先启动订阅者:
./subscriber再启动发布者:
./publisher正常情况下的输出示例:
发布者终端:
Starting publisher... Publishing: 0 Publishing: 1 Publishing: 2订阅者终端:
Starting subscriber... Received: [0] Hello from Fast DDS! Received: [1] Hello from Fast DDS! Received: [2] Hello from Fast DDS!如果遇到问题,检查以下几点:
- 确保两个程序在同一个Domain(默认domain ID=0)
- 检查Topic名称是否一致
- 使用
netstat -tulnp | grep 7410查看DDS发现端口是否正常
6. 扩展:QoS策略实战
DDS的强大之处在于其丰富的QoS策略。让我们修改发布者,添加可靠性保证:
// 在HelloWorldPublisher.h的init()中添加 eprosima::fastdds::dds::ReliabilityQosPolicy reliability; reliability.kind = eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS; writer_qos.reliability(reliability);对应的订阅者也需做相同修改。这样即使在网络不稳定时,也能确保数据最终送达。
另一个实用QoS是历史深度,控制缓存的数据量:
eprosima::fastdds::dds::HistoryQosPolicy history; history.kind = eprosima::fastdds::dds::KEEP_LAST_HISTORY_QOS; history.depth = 10; // 保留最后10条消息 writer_qos.history(history);7. 性能监控与优化
Fast DDS提供内置的统计模块。启用方法:
export FASTDDS_STATISTICS="HISTORY_LATENCY,NETWORK_LATENCY" ./subscriber监控数据可通过DDS的MONITORTopic获取,或使用Fast DDS提供的统计工具可视化。
对于高性能场景,考虑调整:
- 发送缓冲区大小
- 心跳间隔
- 多线程配置
<!-- XML配置示例 --> <participant profile_name="high_perf_participant"> <rtps> <sendBuffers> <physicalPort size="8192"/> </sendBuffers> <builtin> <metatrafficUnicastLocatorList> <locator> <udpv4 port="7411"/> </locator> </metatrafficUnicastLocatorList> </builtin> </rtps> </participant>8. 容器化部署
为了便于移植,我们可以将Demo打包为Docker容器。创建Dockerfile:
FROM ubuntu:22.04 RUN apt update && apt install -y \ ros-humble-rmw-fastrtps-cpp \ g++ \ && rm -rf /var/lib/apt/lists/* WORKDIR /app COPY . . RUN g++ -std=c++11 publisher.cpp HelloWorld.cxx \ -I/usr/include/fastrtps -I/usr/include/fastcdr \ -lfastcdr -lfastrtps -o publisher && \ g++ -std=c++11 subscriber.cpp HelloWorld.cxx \ -I/usr/include/fastrtps -I/usr/include/fastcdr \ -lfastcdr -lfastrtps -o subscriber构建并运行:
docker build -t dds_demo . docker run -it --net=host dds_demo ./publisher--net=host确保容器使用主机网络,这对DDS的自动发现机制很重要。