news 2026/6/10 19:56:36

ROS2话题、服务、动作通讯

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ROS2话题、服务、动作通讯

前面一篇博客 ,介绍了如何在工作空间中,创建包,并在包中创建一个或多个可执行程序,程序里定义了单个或多个节点类对象,以便可执行程序运行起来的时候,类对象能够执行动作,干一些事情。那多个可执行程序运行起来时,他们之间靠什么来通讯呢,常规是走tcp/ip通讯、共享内存等其它,但在ROS2中,已定定义了一些现成的方式,封装好了三种标准通信方式:话题,服务,动作通讯。ROS2 把底层的 TCP/UDP/DDS/ 共享内存 全部封装起来,你只需要用话题、服务、动作 这三种高级方式通信,不需要关心网络底层。不用自己写了,简化了很多操作。

注:话题,服务,动作通讯其实是在填充节点对象的功能,后面的节点对象就不再像前面博客演示那样,只打印几句话而已。

一. 话题通讯

为了方便,博主这边就不再下命令去创建包了,直接复用上一篇博客的包,通过新增一个可执行程序方式去拓展。

1.新建一个topic_demo1_publisher_side.py文件,在里面实现话题的发布,代码如下:

# 导入rclpy库 import rclpy from rclpy.node import Node # 导入String字符串消息 from std_msgs.msg import String # 创建一个继承于Node基类的子类 class Topic_Pub(Node): def __init__(self, name): super().__init__(name) # 参数含义: 消息类型, "话题名", 消息队列长度 self.pub = self.create_publisher(String, "/topic_demo1", 1) #self.create_timer(1.0, self.timer_callback) #意思:每隔1秒执行一次timer_callback self.timer = self.create_timer(1, self.pub_msg) def pub_msg(self): msg = String() # 创建一个String类型的变量msg msg.data = "Hi,I publish a message." # 给msg里边的data赋值 self.pub.publish(msg) # 发布话题数据 # 主函数 def main(): rclpy.init() # 初始化 pub_demo = Topic_Pub("topic_publisher_node") rclpy.spin(pub_demo) pub_demo.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口

2. 再创建一个名为topic_demo1_subscriber_side.py的文件,在里面实现话题的订阅

#导入相关的库 import rclpy from rclpy.node import Node from std_msgs.msg import String class Topic_Sub(Node): def __init__(self,name): super().__init__(name) #self.create_subscription( # msg_type, # 1. 消息类型(必须) # topic, # 2. 话题名称(必须) # callback, # 3. 收到消息后的回调函数(必须) # qos_profile, # 4. 队列长度(必须) # callback_group=None, # 5. 回调组(可选) #) # 4个必须参数(你写代码必写) # msg_type 消息类型,如String、Image、LaserScan # topic 话题名字字符串,如"my_topic" # callback 收到消息后自动调用的函数 # qos_profile队列长度,一般写10 self.sub = self.create_subscription(String,"/topic_demo1",self.sub_callback,1) def sub_callback(self,msg): # print(msg.data,flush=True) self.get_logger().info(msg.data) def main(): rclpy.init() sub_demo = Topic_Sub("topic_subscriber_node") # 创建对象并进行初始化 rclpy.spin(sub_demo) sub_demo.destroy_node() #销毁节点对象 rclpy.shutdown() #关闭ROS2 Python接口

3. 再继续编辑配置文件setup.py, 如下

4. 完毕后,编译功能包

5. 接下来我们验证下是否设置成功,先运行含有发布话题的节点的可执行程序

ros2 run pythonpackagedemo1 topic_demo1_publisher_side

注1: 如下语句可查下当前在可执行程序里,运行起来的节点(节点类对象)的名字

ros2 node list

注2: 我们还可以通过命令查看节点的信息

ros2 node info /topic_publisher_node

注3:若想看下当前发布的话题,可使用如下命令

ros2 topic list

注4: 如果想看下话题发布的数据,可执行如下语句

ros2 topic echo /topic_demo1

注5:如果想看话题相关的信息,可执行如下语句

ros2 topic info /topic_demo1

6. 再运行含有订阅话题的节点的可执行程序

ros2 run pythonpackagedemo1 topic_demo1_subscriber_side

可看到订阅者能够正常收到发布者发布的信息了,且收到了发布者传过来的String类型参数

注:订阅者中有对应的收到发布的话题数据后,要执行的函数

二. 服务通讯

同样,直接复用之前创建的包,通过新增一个可执行程序方式去拓展。

1.新建一个service_demo1_server_side.py文件,在里面创建服务端,代码如下:

#导入相关的库文件 import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts class Service_Server(Node): def __init__(self,name): super().__init__(name) #作用是:注册一个服务,监听客户端的请求,并调用回调函数处理请求。 self.srv = self.create_service(AddTwoInts, '/add_two_ints', self.Add2Ints_callback) #参数 作用 #AddTwoInts 服务接口类型(ROS2预定义的求和服务),常见的还有Setbool, Trigger, Empty, 后面讲自定义服务 #'/add_two_ints' 服务名称(客户端必须用这个名字才能找到服务),可自定义名字,必须以 / 开头,只能用 字母、数字、下划线 _,大小写敏感(/Add 和 /add 是两个不同服务) #self.Add2Ints_callback 回调函数(收到请求后自动执行的处理逻辑) def Add2Ints_callback(self,request,response): response.sum = request.a + request.b print("response.sum = ",response.sum) return response # ROS2规定服务回调函数必须有两个参数: # request:客户端发来的请求数据(这里是两个整数a、b) # response:服务端返回给客户端的响应数据(这里是求和结果sum) # 函数最后必须return response。 # 话题 = 聊天、广播、持续发消息(单向) # 服务 = 打电话、请求 - 响应、干完就挂(双向) # 话题(Topic)= 大喇叭广播 # 老师拿着喇叭一直喊:“现在温度 25 度!现在速度 1m/s!” # 学生们随便听,想听就听,不想听就不听 # 不用回复,老师只管喊 # 服务(Service)= 打电话 # 你打给朋友:“帮我算一下 3+5 等于几?”(请求) # 朋友算完:“等于 8”(响应) # 必须一问一答,打完电话就结束 # 特性 话题 (Topic) 服务 (Service) # 通信模式 发布 → 订阅(单向) 请求 → 响应(双向) # 是否需要回复 不需要 必须等待回复 # 使用场景 传感器数据、持续状态 执行任务、计算、开关 # 频率 高频(几十~几百 Hz) 低频(偶尔调用) # 连接方式 多对多 一对一 # 阻塞 不阻塞 会阻塞(等结果) # 只要是持续不断、实时传输的数据,都用话题: # 摄像头图像 # 激光雷达数据 # 机器人位置、速度 # 温度、气压 # 遥控器指令 # 特点:只管发,不管对方收没收到。 # 只要是需要执行一个动作、需要结果、需要确认的,都用服务: # 让机器人移动到某个点 # 计算两个数的和 # 打开 / 关闭传感器 # 保存一张图片 # 重置系统 # 查询机器人当前状态 # 特点:我叫你做,你必须做完告诉我结果。 # 最关键的区别:有没有响应 # 数据流用话题,任务调用用服务! def main(): rclpy.init() server_demo = Service_Server("service_server_node") rclpy.spin(server_demo) server_demo.destroy_node() # 销毁节点对象 rclpy.shutdown() # 关闭ROS2 Python接口

2. 再创建一个名为service_demo1_client_side.py的文件,在里面实现客户端

# 导入相关的库 import rclpy from rclpy.node import Node from example_interfaces.srv import AddTwoInts # ROS2 天生支持: # 一个服务端(Server) + 多个客户端(Client) 同时通信。 class Service_Client(Node): def __init__(self, name): super().__init__(name) # 创建客户端:绑定服务名必须和服务端一模一样! self.client = self.create_client(AddTwoInts, '/add_two_ints') # 等待服务上线(防止服务没启动就调用) while not self.client.wait_for_service(timeout_sec=1.0): print("service not available, waiting again...") # 创建请求对象 self.request = AddTwoInts.Request() def send_request(self): self.request.a = 10 self.request.b = 90 # 异步发送请求(非阻塞) self.future = self.client.call_async(self.request) print("等待服务端为我解惑....") def main(): rclpy.init() # 节点初始化 service_client = Service_Client("service_client_node") # 创建对象 service_client.send_request() # 发送服务请求 while rclpy.ok(): rclpy.spin_once(service_client) # 判断数据是否处理完成 if service_client.future.done(): try: # 获得服务反馈的信息并且打印 response = service_client.future.result() print("service_client.request.a = ", service_client.request.a) print("service_client.request.b = ", service_client.request.b) print("Result = ", response.sum) except Exception as e: service_client.get_logger().info('Service call failed %r' % (e,)) break service_client.destroy_node() rclpy.shutdown()

3. 再继续编辑配置文件setup.py, 如下

4. 完毕后,编译功能包

colcon build

5. 接下来我们验证下是否设置成功,先运行含有服务服务器端的节点的可执行程序

ros2 run pythonpackagedemo1 service_demo1_server_side

注1:查看当前所有运行的服务

ros2 service list

注2:查看某个运行服务,所对应的服务类型

ros2 service type /add_two_ints

注意:service没有info指令

注3:查看服务类型的详细结构(请求 / 响应有哪些字段),如下是查看ROS2自带的服务类型,也可以查看自定义的服务类型,后面再说

ros2 interface show example_interfaces/srv/AddTwoInts

注4:直接用命令行调用服务(超级常用)

6. 再运行客户端程序

ros2 run pythonpackagedemo1 service_demo1_client_side

运行之后,可看到结果如下:

三. 动作通讯

开始之前,我们先回答下为什么要有动作,它和话题,服务的区别是什么?如下是三者的核心区别。

特性话题 (Topic)服务 (Service)动作 (Action)
通信方式单向发布双向请求 / 响应双向:目标 + 反馈 + 结果
耗时长短一直持续极快(毫秒级)很慢(秒级 / 分钟级)
能否取消不能不能能中途取消
有无进度有实时进度反馈
阻塞吗?不阻塞会阻塞等待不阻塞(后台运行)
典型场景摄像头、雷达、速度开关、查询、计算导航、机械臂移动、充电

可看到,针对不同的场景需要使用不同的方式。

1. 我们首先要去自定义接口,这个要自己手动创建,必须先定义接口(Goal/Feedback/Result 三部分),这是最关键的一步。这个要如创建C++类型包

ros2 pkg create --build-type ament_cmake action_interfaces_demo

2. 接着在这个文件夹下新建Progress.action的文件(记得第一个字母大写),文件内容如下:

# 目标 (Goal):客户端发送 int64 mytarget --- # 结果 (Result):服务端最后返回: 1~target中每个数值*2的值 int64[] myresult --- # 反馈 (Feedback):服务端实时发送:比如target值为100,当前算到45,那么返回1~45各值*2的值 int64[] myfeedback

3. 在package.xml中需要添加一些依赖包,具体内容如下:

<buildtool_depend>rosidl_default_generators</buildtool_depend> <exec_depend>rosidl_default_runtime</exec_depend> <depend>action_msgs</depend> <member_of_group>rosidl_interface_packages</member_of_group>

4. 在CMakeLists.txt中添加如下配置

find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "action/Progress.action" )

5. 编译下包

colcon build

输入如下命令,可查看文件定义及编译是否正常

ros2 interface show action_interfaces_demo/action/Progress

6. 有了动作接口后,接下来编写动作服务端(使用python)。这边还是复用之前创建的包pythonpackagedemo1,新建一个action_demo1_server_side.py文件,在里面创建服务端,代码如下:

import rclpy from rclpy.action import ActionServer from rclpy.node import Node import time # 导入我们定义的动作接口 from action_interfaces_demo.action import Progress class Action_Server(Node): def __init__(self): super().__init__('action_server') # 创建动作服务端 self._action_server = ActionServer( self, #当前节点 Progress, #动作接口类型,也就是前面你定义的action下的数据格式 'multicase', # 动作名称(客户端要对应),客户端必须用相同名字才能连上 self.execute_callback) #收到目标后自动调用这个函数 self.get_logger().info('动作服务端已启动,等待客户端请求...') # 执行任务的回调函数(核心) def execute_callback(self, goal_handle): self.get_logger().info('开始执行任务...') # 获取客户端发送的目标 target = goal_handle.request.mytarget #来自你定义的action中的变量名 sequence = [] # 循环生成数列(模拟长时间任务) feedback_msg = Progress.Feedback() for i in range(1, target): sequence.append(i * 2) self.get_logger().info(f'反馈:{sequence}') feedback_msg.myfeedback = sequence goal_handle.publish_feedback(feedback_msg) # 模拟任务耗时 time.sleep(1) # 任务完成 goal_handle.succeed() result = Progress.Result() result.myresult = sequence self.get_logger().info(f'任务完成!结果:{sequence}') return result def main(args=None): rclpy.init(args=args) server = Action_Server() rclpy.spin(server) if __name__ == '__main__': main()

7. 再创建一个名为action_demo1_client_side.py的文件,在里面实现客户端,代码如下

import rclpy from rclpy.action import ActionClient from rclpy.node import Node from action_interfaces_demo.action import Progress class Action_Client(Node): def __init__(self): super().__init__('action_client') # 创建动作客户端 #参数如下: # 当前节点 # 动作接口类型,也就是前面你定义的action下的数据格式 # 动作名称(和服务器端要对应),客户端必须用相同名字才能连上 self._action_client = ActionClient(self, Progress, 'multicase') def send_goal(self, tatget): # 发送目标 # 等待服务端上线 self._action_client.wait_for_server() # 构造目标 goal_msg = Progress.Goal() goal_msg.mytarget = tatget # 异步发送目标 self._send_goal_future = self._action_client.send_goal_async( goal_msg, feedback_callback=self.feedback_callback # 绑定反馈回调 ) self._send_goal_future.add_done_callback(self.goal_response_callback)#当一个异步任务做完了,自动调用你指定的函数。 def goal_response_callback(self, future): # 处理目标发送后的反馈; # 收到服务端响应(接受/拒绝目标) goal_handle = future.result() if not goal_handle.accepted: self.get_logger().info('目标被拒绝!') return self.get_logger().info('目标已接受!') self._goal_handle = goal_handle # 等待最终结果 self._get_result_future = goal_handle.get_result_async() self._get_result_future.add_done_callback(self.get_result_callback) def feedback_callback(self, feedback_msg): # 处理连续反馈; # 接收实时反馈 feedback = feedback_msg.feedback self.get_logger().info(f'收到反馈:{feedback.myfeedback}') def get_result_callback(self, future): # 处理最终响应。 # 接收最终结果 result = future.result().result self.get_logger().info(f'最终结果:{result.myresult}') rclpy.shutdown() def main(args=None): rclpy.init(args=args) client = Action_Client() client.send_goal(10) # rclpy.spin(client) if __name__ == '__main__': main()

8. 再继续编辑配置文件setup.py, 如下

9. 然后编译

10. 接下来我们验证下是否设置成功,先运行动作服务器端程序

ros2 run pythonpackagedemo1 action_demo1_server_side

注1:如下是一些常用的查询命令和命令行调用语句,这边就不再像前面话题,服务那样一一文字阐述了,直接截图,action有info指令

注2:如果想用命令行方式调用,则可使用如下格式语句

ros2 action send_goal 动作名 动作类型 "{参数}"

ros2 action send_goal /multicase action_interfaces_demo/action/Progress "{mytarget: 5}"

11. 再运行客户端程序

ros2 run pythonpackagedemo1 action_demo1_client_side

运行之后,可看到结果如下:

可看到如效果如自己所希望的,由于遍历每一个元素,且将元素值*2,(循环中加了睡眠时间),这个完整过程较为耗时,需要闭环,就像人一样事事有回响,中间不断地再反馈结果,直到结束。

注1:好多话题/服务/动作全都是路径格式,用/分层,比如

/turtle1/cmd_vel中,turtle1是全局命名空间,cmd_vel是话题名

/= 根(所有节点、话题都从这里开始)

/turtle1= 命名空间(区分第 1 只乌龟)

/cmd_vel= 话题名(速度控制)

而ros2 interface show也有/符号,则不是路径分层,是固定格式

【包】 / 【msg/srv/action】 / 【接口名】

注意节点、话题/服务/动作、接口的区别。

节点可以认为是在可执行程序里,定义的节点对象名

话题/服务/动作可认为是在节点中实现的通讯方式名

而接口则是通讯的数据格式

注2:如下是一些常用命令(在上文博客内也都有用过)

ros2 pkg create pythonpackagedemo1 --build-type ament_python --dependencies rclpy --node-name pythonexecute1 #创建python包 ros2 pkg create cpp_packagedemo1 --build-type ament_cmake --dependencies rclcpp --node-name cppexecute1 #创建c++包 colcon build #编译包 ros2 pkg list #例举ROS2已安装或编译的包 ros2 pkg executables 包名 #查指定包里的可执行程序 ros2 pkg executables #查所有可执行程序 # 先启动小乌龟 ros2 run turtlesim turtlesim_node #run后面跟 <包名> <可执行文件> ros2 run turtlesim turtle_teleop_key #run后面跟 <包名> <可执行文件> # 查看节点列表 ros2 node list # 输出大概是:/turtlesim /teleop_turtle # 查看某个节点详细 info ros2 node info /turtlesim # 查看所有话题 ros2 topic list # 查看某个话题详情(类型、发布者、订阅者) ros2 topic info /turtle1/cmd_vel #注意也可以直接是ros2 topic info /topic_demo1这种单分层形式,不用全局命名空间 # 查看话题发布的数据 ros2 topic echo /turtle1/cmd_vel # 查看所有服务 ros2 service list # 查看服务类型 ros2 service type /spawn # 查看服务接口定义(等价于 ros2 interface show) ros2 interface show turtlesim/srv/Spawn # 查看所有动作 ros2 action list # 查看动作详情(类型、服务器、客户端) ros2 action info /turtle1/rotate_absolute # 查看动作接口定义 ros2 interface show turtlesim/action/RotateAbsolute

下一篇博客我们再继续深入,此篇暂就到这儿。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 19:54:05

鸿蒙原生应用实战(五):构建调试、异常处理与HAP发布

鸿蒙原生应用实战&#xff08;五&#xff09;&#xff1a;构建调试、异常处理与HAP发布系列目录&#xff1a; 第一篇&#xff1a;项目搭建与页面架构设计第二篇&#xff1a;首页开发与全局数据流设计第三篇&#xff1a;笔记详情与编辑页面的路由与CRUD第四篇&#xff1a;分类浏…

作者头像 李华
网站建设 2026/6/10 19:43:25

后端技术栈实战指南:打造高性能、高可用系统

在当今数字化浪潮中&#xff0c;构建高性能、高可用的后端系统已成为企业竞争的核心。无论是电商平台的秒杀活动&#xff0c;还是社交网络的实时消息推送&#xff0c;系统稳定性与响应速度直接决定了用户体验与业务成败。因此&#xff0c;掌握一套成熟的技术栈并将其有效整合&a…

作者头像 李华
网站建设 2026/6/10 19:19:42

Redis 分布式锁进阶第六十篇

Redis 分布式锁进阶与生产级优化&#xff1a;从原理到高可用落地 在微服务与分布式架构中&#xff0c;Redis 分布式锁是解决跨进程资源竞争、防止重复提交、保证接口幂等性的核心方案。基础版 SETNX EXPIRE 仅能满足简单场景&#xff0c;在高并发、长事务、集群部署等生产环境…

作者头像 李华
网站建设 2026/6/10 19:10:07

Java Swing学生信息管理系统(带MySQL连接与完整CRUD功能)

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;一个开箱即用的Java桌面程序&#xff0c;用Swing搭建图形界面&#xff0c;通过JDBC连接本地MySQL数据库管理学生信息。主窗口StudentTestWindow提供入口&#xff0c;点击按钮可打开新增&#xff08;StudentAddW…

作者头像 李华