news 2026/4/17 3:20:45

【ROS/ROS2与实时Linux系列】第八篇 ROS 2实时执行器:单线程、多线程与专用调度

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【ROS/ROS2与实时Linux系列】第八篇 ROS 2实时执行器:单线程、多线程与专用调度

一、简介:为什么执行器优化对 ROS 2 至关重要?

在 ROS 2 应用中,无论是机器人导航、工业自动化控制还是自动驾驶,实时性都是关键需求。例如,自动驾驶车辆需要在极短时间内对传感器数据进行处理并做出决策,以确保安全。ROS 2 执行器负责管理节点的回调函数,其配置直接影响实时任务的执行效率和响应速度。掌握执行器优化,可以帮助开发者确保关键任务及时执行,提升系统的整体性能和可靠性。


二、核心概念:执行器的工作机制与术语

2.1 执行器(Executor)

  • 定义:执行器是 ROS 2 中负责管理节点回调函数的组件。它决定何时调用回调函数,以及如何分配线程来执行这些回调。

  • 类型

    • 单线程执行器(SingleThreadedExecutor):所有回调都在同一个线程中执行,适合简单的应用。

    • 多线程执行器(MultiThreadedExecutor):使用多个线程来执行回调,适合需要高并发的应用。

    • 自定义执行器:可以自定义线程池和调度策略,适合实时性要求高的应用。

2.2 回调函数(Callback)

  • 定义:回调函数是节点中需要执行的函数,例如订阅者回调、定时器回调等。

  • 特性:回调函数的执行顺序和时间由执行器决定。

2.3 调度策略(Scheduling Policy)

  • 定义:调度策略决定回调函数的执行顺序和时间。

  • 类型

    • 默认调度策略:由执行器默认配置。

    • 实时调度策略:通过自定义执行器配置,确保关键任务及时执行。


三、环境准备:搭建实时 Linux 环境

3.1 硬件需求

  • CPU:多核处理器(建议至少 4 核)

  • 内存:至少 4 GB RAM

  • 存储:SSD 硬盘

3.2 软件需求

  • 操作系统:Ubuntu 20.04 或更高版本(推荐使用实时内核)

  • 开发工具:GCC、CMake、Git

  • ROS 2:ROS 2 Foxy 或更高版本

3.3 安装实时内核

  1. 安装实时内核(推荐使用 PREEMPT_RT 内核):

sudo apt update sudo apt install linux-headers-$(uname -r) linux-image-$(uname -r) sudo apt install linux-headers-$(uname -r)-realtime linux-image-$(uname -r)-realtime
  1. 重启并选择实时内核

sudo reboot

重启后,选择实时内核启动。

3.4 安装 ROS 2

  1. 安装 ROS 2 Foxy

sudo apt update && sudo apt install -y curl gnupg2 lsb-release curl -s https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add - echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -sc) main" | sudo tee /etc/apt/sources.list.d/ros2.list sudo apt update && sudo apt install -y ros-foxy-desktop source /opt/ros/foxy/setup.bash

四、应用场景:机器人实时控制

在机器人实时控制场景中,关键任务如传感器数据处理、运动控制算法需要在极短时间内完成,以确保机器人的响应速度和精度。通过优化 ROS 2 执行器,可以确保这些任务及时获得 CPU 资源,避免因调度延迟导致的控制误差。例如,在一个自动驾驶机器人项目中,传感器数据处理和路径规划算法需要在 10 毫秒内完成,以确保机器人的安全和效率。通过配置实时执行器,可以显著提升系统的实时性和稳定性。


五、实际案例与步骤:配置实时执行器

5.1 查看默认执行器

  1. 创建一个简单的 ROS 2 节点

# file: talker.py import rclpy from rclpy.node import Node from std_msgs.msg import String class Talker(Node): def __init__(self): super().__init__('talker') self.publisher = self.create_publisher(String, 'topic', 10) timer_period = 1.0 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = f'Hello World: {self.i}' self.publisher.publish(msg) self.get_logger().info(f'Published: {msg.data}') self.i += 1 def main(args=None): rclpy.init(args=args) talker = Talker() rclpy.spin(talker) talker.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
  1. 运行节点

python3 talker.py

5.2 使用单线程执行器

  1. 修改节点代码,使用单线程执行器

# file: talker_single_threaded.py import rclpy from rclpy.node import Node from std_msgs.msg import String from rclpy.executors import SingleThreadedExecutor class Talker(Node): def __init__(self): super().__init__('talker') self.publisher = self.create_publisher(String, 'topic', 10) timer_period = 1.0 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = f'Hello World: {self.i}' self.publisher.publish(msg) self.get_logger().info(f'Published: {msg.data}') self.i += 1 def main(args=None): rclpy.init(args=args) talker = Talker() executor = SingleThreadedExecutor() executor.add_node(talker) executor.spin() talker.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
  1. 运行节点

python3 talker_single_threaded.py

5.3 使用多线程执行器

  1. 修改节点代码,使用多线程执行器

# file: talker_multi_threaded.py import rclpy from rclpy.node import Node from std_msgs.msg import String from rclpy.executors import MultiThreadedExecutor class Talker(Node): def __init__(self): super().__init__('talker') self.publisher = self.create_publisher(String, 'topic', 10) timer_period = 1.0 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = f'Hello World: {self.i}' self.publisher.publish(msg) self.get_logger().info(f'Published: {msg.data}') self.i += 1 def main(args=None): rclpy.init(args=args) talker = Talker() executor = MultiThreadedExecutor() executor.add_node(talker) executor.spin() talker.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()
  1. 运行节点

python3 talker_multi_threaded.py

5.4 配置实时调度策略

  1. 创建自定义执行器

# file: talker_realtime.py import rclpy from rclpy.node import Node from std_msgs.msg import String from rclpy.executors import ExternalExecutor import threading class Talker(Node): def __init__(self): super().__init__('talker') self.publisher = self.create_publisher(String, 'topic', 10) timer_period = 1.0 # seconds self.timer = self.create_timer(timer_period, self.timer_callback) self.i = 0 def timer_callback(self): msg = String() msg.data = f'Hello World: {self.i}' self.publisher.publish(msg) self.get_logger().info(f'Published: {msg.data}') self.i += 1 class RealtimeExecutor(ExternalExecutor): def __init__(self): super().__init__() self.thread = threading.Thread(target

=self.spin)

def spin(self): while rclpy.ok(): self.wait_for_work() self.do_work()

def main(args=None): rclpy.init(args=args) talker = Talker() executor = RealtimeExecutor() executor.add_node(talker) executor.thread.start() executor.spin() talker.destroy_node() rclpy.shutdown()

ifname== 'main': main()

2. **运行节点**: ```bash python3 talker_realtime.py

六、常见问题与解答

6.1 如何确定任务是否需要实时调度?

  • 问题:如何判断一个任务是否需要实时调度?

  • 解答:实时任务通常需要在极短时间内完成,对延迟敏感。例如,传感器数据处理、运动控制算法等。可以通过任务的响应时间要求来判断是否需要实时调度。

6.2 如何设置实时优先级?

  • 问题:如何设置实时任务的优先级?

  • 解答:可以使用chrt命令或在代码中调用sched_setscheduler函数。例如,chrt -f -p 99 <pid>将进程设置为 SCHED_FIFO,优先级为 99。

6.3 如何验证调度策略是否生效?

  • 问题:如何验证调度策略是否生效?

  • 解答:可以使用top -H -p <pid>查看任务的调度情况。如果任务的优先级和调度策略正确设置,任务将按预期运行。

6.4 如何调整 CFS 调度器参数?

  • 问题:如何调整 CFS 调度器参数?

  • 解答:可以通过写入/sys/kernel/mm/cfs/rt_runtime_us/sys/kernel/mm/cfs/rt_period_us文件来调整 CFS 的时间片和调度周期。


七、实践建议与最佳实践

7.1 调试技巧

  • 使用strace跟踪系统调用

strace -p <pid>
  • 使用perf分析性能

perf record -g -p <pid> perf report

7.2 性能优化

  • 减少上下文切换:尽量减少实时任务的上下文切换,提高任务的连续运行时间。

  • 合理分配 CPU 核心:使用taskset命令将实时任务固定在特定的 CPU 核心上,减少 CPU 亲和性切换带来的延迟。

7.3 常见错误的解决方案

  • 实时任务被挂起:检查任务的优先级是否过高,导致其他任务无法运行。适当调整优先级。

  • 任务响应时间过长:检查任务是否被其他高优先级任务抢占,调整任务的调度策略。


八、总结与应用场景

通过本文的介绍,我们深入讲解了 ROS 2 执行器的工作机制,包括默认单线程、多线程执行器,以及如何构建或配置具有实时调度策略的定制化执行器。掌握这些技能,可以帮助开发者确保关键任务及时执行,提升系统的整体性能和可靠性。

在实际应用中,例如机器人实时控制、自动驾驶、工业自动化等场景,通过优化 ROS 2 执行器,可以显著提升系统的实时性和稳定性。希望本文能够帮助读者在实际项目中应用所学知识,优化系统性能,确保任务的高效执行。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 7:37:10

振动器-Android studio软件源代码-java语言

振动器 App 简介与使用说明 一、软件简介 &#x1f4f1; 软件定位 本软件是一款基于 Android 系统的多功能振动控制应用&#xff0c;通过调用设备振动马达&#xff0c;提供多种预设振动模式和高度自定义的振动编辑功能&#xff0c;满足用户在提醒、反馈、娱乐等场景下的多样化…

作者头像 李华
网站建设 2026/4/16 7:35:52

人工智能应用- 人机对战:04. 蒙特卡洛树搜索

蒙特卡洛树搜索&#xff08;MCTS&#xff09;是 AlphaGo 成功的核心技术之一。围棋中的最大挑战在于如何评估当前局势&#xff0c;而MCTS 通过模拟走棋到终局来进行评估。为了提高模拟走棋的效率&#xff0c;MCTS 并不会扩展所有可能的路径&#xff0c;而是通过随机采样的…

作者头像 李华
网站建设 2026/4/15 15:54:58

雨量监测站 雨量实时监测系统

问&#xff1a;这款翻斗式自动雨量站的核心定位是什么&#xff1f;为什么能被称为防汛抗旱的“千里眼”&#xff1f;答&#xff1a;核心定位是全自动高精度雨量监测终端&#xff0c;主打“精准采集、稳定运行、免维护、低功耗”&#xff0c;专为防汛抗旱、水资源监测设计&#…

作者头像 李华
网站建设 2026/4/15 20:19:24

拖延症福音 10个AI论文网站测评:MBA毕业论文写作+格式规范全攻略

在当前学术研究日益数字化的背景下&#xff0c;MBA学员在撰写毕业论文时面临诸多挑战&#xff0c;如选题构思困难、文献资料繁杂、格式规范不熟悉以及写作效率低下等问题。为帮助更多学生高效完成论文任务&#xff0c;本次测评基于2026年的实测数据与用户真实反馈&#xff0c;全…

作者头像 李华
网站建设 2026/4/16 7:38:21

JumpServer API 使用指南

JumpServer 是开源的堡垒机系统,本文档介绍如何通过 API 进行自动化管理。 一、API 认证方式 JumpServer 支持多种认证方式: 认证方式 说明 适用场景 Session 登录后使用 session_id Web 页面 Token 一次性 Token,有有效期 临时调用 Private Token 永久 Token 脚本调用 Acc…

作者头像 李华
网站建设 2026/4/16 9:06:33

互联网大厂Java求职面试实战:从Spring Boot到AI技术的全栈考察

互联网大厂Java求职面试实战&#xff1a;从Spring Boot到AI技术的全栈考察 本文通过模拟互联网大厂Java岗位的求职面试&#xff0c;展现了严肃的面试官与搞笑的水货程序员谢飞机之间的问答。场景涵盖电商与内容社区的业务背景&#xff0c;问题围绕Java核心技术栈、微服务架构、…

作者头像 李华