基于 ROS 与 MAVROS 的无人机定点飞行控制脚本
在无人机开发中,利用 ROS(Robot Operating System)配合 MAVROS 节点进行地面站控制是非常经典的路径。这篇脚本演示了如何通过 Python 客户端库,让无人机进入 OFFBOARD 模式并按预设坐标点依次飞行。
核心逻辑概览
整个流程其实就几个关键步骤:初始化节点、等待飞控连接、发送初始心跳、切换模式、解锁电机、循环发布位置指令。这里有个容易踩坑的地方:OFFBOARD 模式下必须持续发布 setpoint,一旦停止发送,飞控会认为失去控制权并自动退出该模式。
下面直接看代码实现,我会结合注释说明每一步的作用。
#!/usr/bin/env python
# 指定使用系统中的 python 解释器来运行该脚本(Linux 下 ROS 必须)
import rospy
# ROS 的 Python 客户端库,用于节点、话题、服务等操作
import mavros
# MAVROS 的 Python 接口库,用于和飞控(PX4/ArduPilot)通信
from geometry_msgs.msg import PoseStamped
# 引入带时间戳的位姿消息,用于发布位置控制指令
from mavros_msgs.msg import State
# 引入飞控状态消息(连接状态、模式、解锁状态等)
from mavros_msgs.srv import CommandBool, SetMode
# 引入解锁服务(CommandBool)和模式切换服务(SetMode)
# ---------------- 全局变量 ----------------
current_state = State()
# 保存当前飞控状态的全局变量
# ---------------- 回调函数 ----------------
def state_callback(state):
global current_state
# 声明使用全局变量 current_state
current_state = state
# 将订阅到的飞控状态保存到全局变量中
# ---------------- MAVROS 命名空间 ----------------
mavros.set_namespace()
# 设置 MAVROS 命名空间(一般为 /mavros)
des_pos = PoseStamped()
# 期望位置(目标点),用于 offboard 位置控制
# ---------------- 订阅 & 发布 ----------------
state_sub = rospy.Subscriber(
'mavros/state',
State,
callback=state_callback
)
# 订阅飞控状态话题 /mavros/state
position_pub = rospy.Publisher(
'mavros/setpoint_position/local',
PoseStamped,
queue_size=
)
arming_drone = rospy.ServiceProxy(
,
CommandBool
)
setting_mode = rospy.ServiceProxy(
,
SetMode
)
():
()
rospy.init_node(, anonymous=)
prev_state = current_state
rate = rospy.Rate()
des_pos.pose.position.x =
des_pos.pose.position.y =
des_pos.pose.position.z =
( rospy.is_shutdown() current_state.connected):
()
rate.sleep()
()
i ():
(rospy.is_shutdown()):
position_pub.publish(des_pos)
rate.sleep()
()
setting_mode(base_mode=, custom_mode=)
last_request = rospy.get_rostime()
((rospy.get_rostime() - last_request) < rospy.Duration()):
position_pub.publish(des_pos)
rate.sleep()
()
current_state.mode == :
arming_drone()
()
last_request = rospy.get_rostime()
((rospy.get_rostime() - last_request) < rospy.Duration()):
position_pub.publish(des_pos)
rate.sleep()
()
des_pos.pose.position.x =
des_pos.pose.position.y =
des_pos.pose.position.z =
last_request = rospy.get_rostime()
((rospy.get_rostime() - last_request) < rospy.Duration()):
position_pub.publish(des_pos)
rate.sleep()
()
des_pos.pose.position.x =
des_pos.pose.position.y =
des_pos.pose.position.z =
last_request = rospy.get_rostime()
((rospy.get_rostime() - last_request) < rospy.Duration()):
position_pub.publish(des_pos)
rate.sleep()
()
des_pos.pose.position.x =
des_pos.pose.position.y =
des_pos.pose.position.z =
last_request = rospy.get_rostime()
((rospy.get_rostime() - last_request) < rospy.Duration()):
position_pub.publish(des_pos)
rate.sleep()
()
des_pos.pose.position.x =
des_pos.pose.position.y =
des_pos.pose.position.z =
last_request = rospy.get_rostime()
((rospy.get_rostime() - last_request) < rospy.Duration()):
position_pub.publish(des_pos)
rate.sleep()
()
()
__name__ == :
:
offb_control_sample()
rospy.ROSInterruptException:

