ROS 2 DDS 中间件通信优化与 QoS 策略
引言
ROS 2 最大的改进之一就是采用了标准的 DDS(Data Distribution Service) 中间件。与 ROS 1 的自研通信机制不同,DDS 是一个成熟的工业级标准,提供了灵活的通信配置和可靠性保证。
本文将深入讲解 DDS 的核心概念和 QoS 策略,帮助你构建高可靠、高效率的 ROS 2 系统。
ROS 2 采用标准 DDS 中间件替代 ROS 1 自研机制。介绍 DDS 核心概念、常见实现(FastDDS、CycloneDDS、RTI Connext),详细解析 QoS 五大策略(可靠性、持久性、历史、截止期、生存时间)。提供 Python 和 C++ 配置示例,涵盖预定义与自定义 QoS 设置。包含网络传输层配置、性能监控及常见问题排查指南,帮助构建高可靠、低延迟的 ROS 2 系统。
ROS 2 最大的改进之一就是采用了标准的 DDS(Data Distribution Service) 中间件。与 ROS 1 的自研通信机制不同,DDS 是一个成熟的工业级标准,提供了灵活的通信配置和可靠性保证。
本文将深入讲解 DDS 的核心概念和 QoS 策略,帮助你构建高可靠、高效率的 ROS 2 系统。
DDS 是一个实时中间件标准,定义了分布式系统中如何进行数据的发布和订阅。
传输层:DDS 中间件层 -> 用户空间 -> 应用层 (ROS2 节点)
| 特性 | 优势 | 实际应用 |
|---|---|---|
| 实时性 | 确定性消息传输 | 工业控制、机器人 |
| 可靠性 | 消息不丢失、不重复 | 关键任务系统 |
| 灵活性 | 灵活的 QoS 配置 | 适应多种场景 |
| 自动发现 | 无需手动配置连接 | 简化部署 |
| 跨平台 | 统一的标准实现 | 多个供应商支持 |
ROS 2 = ROS 应用 API + DDS 中间件 + 消息定义
ROS 2 架构栈:
| DDS 实现 | 特点 | 厂商 |
|---|---|---|
| FastDDS | 默认实现,功能最全 | eProsima |
| CycloneDDS | 轻量级,启动极快 | Eclipse |
| RTI Connext | 企业级商用,安全认证 | RTI |
# 查看当前默认实现
echo $RMW_IMPLEMENTATION
# 输出:rmw_fastrtps_cpp (默认)
# 临时切换到 CycloneDDS
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp ros2 run pkg_name node_name
# 临时切换回 FastRTPS
export RMW_IMPLEMENTATION=rmw_fastrtps_cpp
# 永久设置(添加到 ~/.bashrc)
echo "export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp" >> ~/.bashrc
source ~/.bashrc
# 安装 CycloneDDS
sudo apt install ros-humble-rmw-cyclonedds-cpp
# 安装 RTI Connext (需要许可证)
sudo apt install ros-humble-rmw-connext-cpp
# 验证安装
ros2 run your_package your_node --help | grep -i "implementation"
QoS 定义了消息传输的特性和保证:
QoS = { 可靠性,持久性,延迟,寿命,历史 }
时间模型对比:
| 等级 | 特性 | 使用场景 |
|---|---|---|
| VOLATILE | 消息只在发送时存在 | 实时数据流 |
| TRANSIENT_LOCAL | 新订阅者可收到最后发送的消息 | 配置更新、快照数据 |
| TRANSIENT | 消息在磁盘上持久化 | 重要数据、跨进程 |
| PERSISTENT | 消息在数据库中持久化 | 历史记录、审计 |
数据生命周期:
有效时间 = T_deadline 如果消息未在截止期内到达,则认为超时。
消息的有效期:
通信成功的概率模型: P_success = P_reliability × P_no_timeout × P_delivery
其中:
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSHistoryPolicy, QoSReliabilityPolicy, QoSDurabilityPolicy
from std_msgs.msg import String
class MyPublisher(Node):
def __init__(self):
super().__init__('my_publisher')
# 方法 1:使用预定义配置(推荐)
# 系统默认配置
qos = QoSProfile(depth=10)
# 传感器数据配置(低延迟,允许丢包)
qos_sensor = rclpy.qos.sensor_data_qos_profile
# 系统事件配置(可靠传输)
qos_reliable = rclpy.qos.qos_profile_system_default
# 创建发布者
self.pub = self.create_publisher(String, 'topic_name', qos_sensor) # 使用传感器 QoS 配置
self.get_logger().info('发布者已创建')
def main(args=None):
rclpy.init(args=args)
node = MyPublisher()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
from rclpy.qos import (
QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy, QoSDurabilityPolicy
)
from rclpy.duration import Duration
from rclpy.node import Node
from std_msgs.msg import String
class QoSCustomizer(Node):
def __init__(self):
super().__init__('qos_customizer')
# 自定义 QoS 配置
custom_qos = QoSProfile(
# 可靠性:RELIABLE = 可靠传输,BEST_EFFORT = 尽力而为
reliability=QoSReliabilityPolicy.RELIABLE,
# 持久性
durability=QoSDurabilityPolicy.VOLATILE,
# 历史:KEEP_LAST = 保留最后 N 条,KEEP_ALL = 保留所有
history=QoSHistoryPolicy.KEEP_LAST,
# 历史深度(KEEP_LAST 时的队列大小)
depth=10,
# 截止期(消息必须在此时间内到达)
deadline=Duration(seconds=1),
# 生存时间(消息的有效期)
lifespan=Duration(seconds=10),
# 策略中的缺失时间再发送间隔
period=Duration(milliseconds=100)
)
# 使用自定义 QoS 创建发布者
self.pub = self.create_publisher(String, 'custom_topic', custom_qos)
# 使用自定义 QoS 创建订阅者
self.sub = self.create_subscription(String, 'custom_topic', self.callback, custom_qos)
self.get_logger().info('自定义 QoS 配置已应用')
def callback(self, msg):
self.get_logger().info(f'接收:{msg.data}')
def main(args=None):
rclpy.init(args=args)
node = QoSCustomizer()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()
# 不同 QoS 配置的对比
# 1. 传感器数据(高频,允许丢包)
sensor_qos = rclpy.qos.sensor_data_qos_profile
# 特点:BEST_EFFORT, VOLATILE, KEEP_LAST(depth=5)
# 2. 参数(可靠传输)
param_qos = rclpy.qos.qos_profile_parameter_events
# 特点:RELIABLE, TRANSIENT_LOCAL, KEEP_LAST(depth=1000)
# 3. 系统默认
default_qos = rclpy.qos.qos_profile_system_default
# 特点:RELIABLE, VOLATILE, KEEP_LAST(depth=10)
# 4. 服务
service_qos = rclpy.qos.qos_profile_services_default
# 特点:RELIABLE, VOLATILE, KEEP_LAST
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
#include "rclcpp/qos.hpp"
class QoSExample : public rclcpp::Node {
public:
QoSExample() : Node("qos_example") {
// 方法 1:使用预定义配置
auto sensor_qos = rclcpp::SensorDataQoS();
// 方法 2:自定义 QoS
rclcpp::QoS custom_qos(10); // 队列大小为 10
custom_qos.reliable(); // 启用可靠传输
custom_qos.transient_local(); // 启用本地持久性
// 创建发布者(使用自定义 QoS)
pub_ = this->create_publisher<std_msgs::msg::String>("my_topic", custom_qos);
// 创建订阅者
sub_ = this->create_subscription<std_msgs::msg::String>("my_topic", sensor_qos,
[this](const std_msgs::msg::String & msg){
RCLCPP_INFO(this->get_logger(), "收到:%s", msg.data.c_str());
});
RCLCPP_INFO(this->get_logger(), "QoS 配置完成");
}
private:
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr pub_;
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr sub_;
};
int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<QoSExample>());
rclcpp::shutdown();
return 0;
}
发布者和订阅者的 QoS 必须兼容(即发布者提供的 QoS 必须满足或优于订阅者的要求):
| 订阅者 Request | 发布者 Offer | 结果 |
|---|---|---|
| Best Effort | Reliable | ✅ 兼容 |
| Reliable | Best Effort | ❌ 不兼容 |
| Transient Local | Volatile | ❌ 不兼容 |
结论:
Reliable 发布者兼容所有订阅者;Best Effort 发布者只能兼容 Best Effort 订阅者。Transient Local 发布者兼容所有订阅者;Volatile 发布者只能兼容 Volatile 订阅者。场景:激光雷达每秒发送 100 帧扫描数据
from rclcpp.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
# 优化方案:优先保证吞吐量和低延迟
lidar_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT, # 允许丢包
history=QoSHistoryPolicy.KEEP_LAST, # 只保留最新数据
depth=1 # 队列只有 1 条
)
# 优势:
# 1. 消息处理快(BEST_EFFORT 无需重传等待)
# 2. 内存占用低(只保留最新数据)
# 3. 延迟最小(深度为 1)
场景:发送"停止"命令给机器人
# 优化方案:保证命令一定被执行
command_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE, # 可靠传输
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL, # 持久化
history=QoSHistoryPolicy.KEEP_LAST, depth=5 # 保留最后 5 条
)
# 优势:
# 1. 消息不会丢失(RELIABLE)
# 2. 新启动的接收方能收到历史消息(TRANSIENT_LOCAL)
# 3. 关键命令不会被遗漏
场景:每秒发送一次机器人状态
status_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=3 # 保留最后 3 秒的数据
)
# 平衡方案:
# 1. 新数据优先于历史数据(BEST_EFFORT)
# 2. 缓冲 3 条消息防止一瞬间的网络波动
# 3. 内存占用合理
| 传输层协议 | 特点 |
|---|---|
| UDP 单播 | 低延迟 / 局域网默认 |
| UDP 多播 | 高效一对多 / 需路由支持 |
| TCP | 可靠 / 跨广域网 |
| 共享内存 | 极低延迟 / 仅限本机进程间 |
# 启用 UDP 单播(默认)
export ROS_SUPER_CLIENT=1
# 启用多播(局域网推荐)
export CYCLONEDDS_URI="multicast"
# 配置网络接口
export CYCLONEDDS_URI="<CycloneDDS><Domain><General><Interfaces><NetworkInterface>eth0</NetworkInterface></Interfaces></General></Domain></CycloneDDS>"
# 设置 IGMP 版本(多播)
export CYCLONEDDS_URI="<CycloneDDS><Domain><General><MulticastRecvNetworkInterfaceAddrs>192.168.1.0/24</MulticastRecvNetworkInterfaceAddrs></General></Domain></CycloneDDS>"
# 监控网络带宽
ros2 topic bw /lidar_scan
# 监控消息延迟
ros2 topic delay /lidar_scan
# 监控消息频率
ros2 topic hz /lidar_scan
# 显示节点和话题的 QoS 配置
ros2 topic info /lidar_scan
创建 fastrtps_profile.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<dds>
<profiles>
<!-- 定义一个发布者配置 -->
<publisher profile_name="fast_pub_profile">
<rtps>
<historyMemoryPolicy>DYNAMIC</historyMemoryPolicy>
<publishMode>
<kind>ASYNCHRONOUS</kind>
</publishMode>
</rtps>
</publisher>
<!-- 定义一个订阅者配置 -->
<subscriber profile_name="fast_sub_profile">
<rtps>
<readerTimes>
<initialAcknackDelay>
<sec>0</sec>
<nanosec>100000000</nanosec>
</initialAcknackDelay>
</readerTimes>
</rtps>
</subscriber>
</profiles>
</dds>
创建 cyclonedds_config.xml:
<?xml version="1.0" encoding="UTF-8" ?>
<CycloneDDS>
<Domain>
<!-- 通用配置 -->
<General>
<NetworkInterfaceAddress>auto</NetworkInterfaceAddress>
<AllowMulticast>default</AllowMulticast>
</General>
<!-- 调度器配置 -->
<Internal>
<ScheduleKind>default</ScheduleKind>
</Internal>
</Domain>
</CycloneDDS>
使用配置文件:
export CYCLONEDDS_URI=file:///path/to/cyclonedds_config.xml
ros2 run pkg_name node_name
诊断:
# 查看话题信息
ros2 topic info /my_topic
# 检查 QoS 配置
ros2 topic info /my_topic --verbose
原因和解决方案:
| 原因 | 症状 | 解决方案 |
|---|---|---|
| Reliability=BEST_EFFORT | 偶尔丢包 | 改为 RELIABLE |
| 队列太小 | 高频数据丢包 | 增加 depth 值 |
| 网络拥塞 | 均匀丢包 | 优化消息大小或频率 |
| 订阅晚启动 | 错过早期消息 | 使用 TRANSIENT_LOCAL |
诊断:
# 测量延迟
ros2 topic delay /my_topic
# 结果示例:
# min: 15ms, max: 50ms, mean: 25ms
优化方案:
# 1. 减小队列深度
qos = QoSProfile(depth=1) # 从 10 改为 1
# 2. 使用 BEST_EFFORT
qos.reliability = QoSReliabilityPolicy.BEST_EFFORT
# 3. 使用共享内存传输(仅限本机)
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
export CYCLONEDDS_URI="<CycloneDDS><Domain><Transport><Shared_Memory/></Transport></Domain></CycloneDDS>"
诊断:
# 检查节点状态
ros2 node list
ros2 topic list
# 查看节点详细信息
ros2 node info /publisher_node
ros2 node info /subscriber_node
常见原因:
| 问题类型 | 详情 | 解决方案 |
|---|---|---|
| QoS 不兼容 | ❌ 发布者:Best Effort ❌ 订阅者:Reliable | ✅ 统一使用 Reliable |
| 话题名称 | ❌ 大小写/拼写错误 如 /topic vs /Topic | ✅ 复制粘贴确保一致 |
| 消息类型 | ❌ 类型不匹配 如 String vs Int32 | ✅ 检查接口定义 |
| DDS 配置 | ❌ Domain ID 不同 Docker/VPN | ✅ 检查 ROS_DOMAIN_ID |
# 传感器数据
qos_sensor = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
)
# 关键命令
qos_command = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
depth=5
)
# 系统默认
qos_default = QoSProfile(depth=10)
# 查看话题 QoS 信息
ros2 topic info /my_topic -v
# 切换 DDS 实现
export RMW_IMPLEMENTATION=rmw_cyclonedds_cpp
# 测试消息吞吐量
ros2 topic hz /my_topic
# 测试消息延迟
ros2 topic delay /my_topic

微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
使用加密算法(如AES、TripleDES、Rabbit或RC4)加密和解密文本明文。 在线工具,加密/解密文本在线工具,online
生成新的随机RSA私钥和公钥pem证书。 在线工具,RSA密钥对生成器在线工具,online
基于 Mermaid.js 实时预览流程图、时序图等图表,支持源码编辑与即时渲染。 在线工具,Mermaid 预览与可视化编辑在线工具,online
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML转Markdown 互为补充。 在线工具,Markdown转HTML在线工具,online