main
zhaoxin 2 weeks ago
parent ad8b5c8def
commit 25039b044d

@ -1,140 +1,276 @@
#ros lib
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
颜色识别与HSV参数调整节点
Color recognition and HSV parameter adjustment node
功能说明 (Functionality):
- 基于HSV颜色空间的目标颜色识别
- 实时调整HSV颜色范围参数
- 鼠标选择ROI区域进行颜色学习
- 发布检测到的目标位置信息
- 支持实时颜色追踪模式切换
作者 (Author): UVPTCCS Team
日期 (Date): 2024
"""
# ROS2相关库 (ROS2 related libraries)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import CompressedImage, LaserScan, Image
from yahboomcar_msgs.msg import Position
#common lib
# 通用库 (Common libraries)
import os
import threading
import math
from yahboomcar_astra.astra_common import *
from yahboomcar_msgs.msg import Position
print("import finish")
cv_edition = cv.__version__
print("cv_edition: ",cv_edition)
class Color_Identify(Node):
def __init__(self,name):
"""
颜色识别节点类
Color identification node class
功能 (Functions):
- 实时颜色识别和追踪
- HSV参数动态调整
- 鼠标交互式颜色选择
- 目标位置信息发布
"""
def __init__(self, name):
"""
初始化颜色识别节点
Initialize color identification node
Args:
name (str): 节点名称 (Node name)
"""
super().__init__(name)
#create a publisher
self.pub_position = self.create_publisher(Position,"/Current_point", 10)
self.pub_cmdVel = self.create_publisher(Twist, '/cmd_vel', 10)
self.index = 2
self.Roi_init = ()
self.hsv_range = ()
self.circle = (0, 0, 0)
self.point_pose = (0, 0, 0)
self.dyn_update = True
self.Start_state = True
self.select_flags = False
self.gTracker_state = False
self.windows_name = 'frame'
self.Track_state = 'identify'
self.color = color_follow()
self.cols, self.rows = 0, 0
self.Mouse_XY = (0, 0)
# 创建发布者 (Create publishers)
self.pub_position = self.create_publisher(Position, "/Current_point", 10) # 发布目标位置 (Publish target position)
self.pub_cmdVel = self.create_publisher(Twist, '/cmd_vel', 10) # 发布速度控制命令 (Publish velocity commands)
# 初始化状态变量 (Initialize state variables)
self.index = 2 # 索引 (Index)
self.Roi_init = () # ROI初始化区域 (ROI initialization area)
self.hsv_range = () # HSV颜色范围 (HSV color range)
self.circle = (0, 0, 0) # 检测到的圆形目标 (x, y, radius)
self.point_pose = (0, 0, 0) # 目标点位置 (Target point position)
self.dyn_update = True # 动态更新标志 (Dynamic update flag)
self.Start_state = True # 启动状态 (Start state)
self.select_flags = False # 选择标志 (Selection flag)
self.gTracker_state = False # 追踪器状态 (Tracker state)
self.windows_name = 'frame' # 窗口名称 (Window name)
self.Track_state = 'identify' # 追踪状态 (Tracking state)
self.color = color_follow() # 颜色跟随对象 (Color following object)
self.cols, self.rows = 0, 0 # 图像尺寸 (Image dimensions)
self.Mouse_XY = (0, 0) # 鼠标坐标 (Mouse coordinates)
# 声明参数 (Declare parameters)
self.declare_param()
# HSV参数文件路径 (HSV parameter file path)
self.hsv_text = "/root/yahboomcar_ros2_ws/yahboomcar_ws/src/yahboomcar_astra/yahboomcar_astra/colorHSV.text"
self.capture = cv.VideoCapture(0)
if cv_edition[0]=='3': self.capture.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc(*'XVID'))
else: self.capture.set(cv.CAP_PROP_FOURCC, cv.VideoWriter.fourcc('M', 'J', 'P', 'G'))
self.timer = self.create_timer(0.001, self.on_timer)
# 初始化摄像头 (Initialize camera)
self.capture = cv.VideoCapture(0)
if cv_edition[0] == '3':
self.capture.set(cv.CAP_PROP_FOURCC, cv.VideoWriter_fourcc(*'XVID'))
else:
self.capture.set(cv.CAP_PROP_FOURCC, cv.VideoWriter.fourcc('M', 'J', 'P', 'G'))
# 创建定时器 (Create timer)
self.timer = self.create_timer(0.001, self.on_timer)
def declare_param(self):
#HSV
self.declare_parameter("Hmin",0)
"""
声明ROS参数
Declare ROS parameters
包含HSV颜色空间的最小值和最大值参数
Contains min and max parameters for HSV color space
"""
# HSV最小值参数 (HSV minimum parameters)
self.declare_parameter("Hmin", 0) # 色调最小值 (Hue minimum)
self.Hmin = self.get_parameter('Hmin').get_parameter_value().integer_value
self.declare_parameter("Smin",85)
self.declare_parameter("Smin", 85) # 饱和度最小值 (Saturation minimum)
self.Smin = self.get_parameter('Smin').get_parameter_value().integer_value
self.declare_parameter("Vmin",126)
self.declare_parameter("Vmin", 126) # 明度最小值 (Value minimum)
self.Vmin = self.get_parameter('Vmin').get_parameter_value().integer_value
self.declare_parameter("Hmax",9)
# HSV最大值参数 (HSV maximum parameters)
self.declare_parameter("Hmax", 9) # 色调最大值 (Hue maximum)
self.Hmax = self.get_parameter('Hmax').get_parameter_value().integer_value
self.declare_parameter("Smax",253)
self.declare_parameter("Smax", 253) # 饱和度最大值 (Saturation maximum)
self.Smax = self.get_parameter('Smax').get_parameter_value().integer_value
self.declare_parameter("Vmax",253)
self.declare_parameter("Vmax", 253) # 明度最大值 (Value maximum)
self.Vmax = self.get_parameter('Vmax').get_parameter_value().integer_value
self.declare_parameter('refresh',False)
# 刷新参数 (Refresh parameter)
self.declare_parameter('refresh', False) # 是否刷新HSV参数 (Whether to refresh HSV parameters)
self.refresh = self.get_parameter('refresh').get_parameter_value().bool_value
def on_timer(self):
"""
定时器回调函数
Timer callback function
功能 (Functions):
- 读取摄像头图像
- 处理图像并进行颜色识别
- 显示处理结果
- 处理键盘输入
"""
# 读取摄像头图像 (Read camera image)
ret, frame = self.capture.read()
action = cv.waitKey(10) & 0xFF
frame, binary =self.process(frame, action)
# 处理图像 (Process image)
frame, binary = self.process(frame, action)
# 计算并显示FPS (Calculate and display FPS)
start = time.time()
end = time.time()
fps = 1 / (end - start)
text = "FPS : " + str(int(fps))
cv.putText(frame, text, (30, 30), cv.FONT_HERSHEY_SIMPLEX, 0.6, (100, 200, 200), 1)
if len(binary) != 0: cv.imshow('frame', ManyImgs(1, ([frame, binary])))
else:cv.imshow('frame', frame)
# 显示图像 (Display image)
if len(binary) != 0:
cv.imshow('frame', ManyImgs(1, ([frame, binary])))
else:
cv.imshow('frame', frame)
# 处理退出命令 (Handle exit command)
if action == ord('q') or action == 113:
self.capture.release()
cv.destroyAllWindows()
def process(self, rgb_img, action):
"""
图像处理主函数
Main image processing function
Args:
rgb_img: 输入的RGB图像 (Input RGB image)
action: 键盘输入动作 (Keyboard input action)
Returns:
tuple: 处理后的图像和二值化图像 (Processed image and binary image)
"""
# 更新参数 (Update parameters)
self.get_param()
# 调整图像大小 (Resize image)
rgb_img = cv.resize(rgb_img, (640, 480))
binary = []
if action == 32: self.Track_state = 'tracking'
elif action == ord('i') or action == ord('I'): self.Track_state = "identify"
elif action == ord('r') or action == ord('R'): self.Reset()
elif action == ord('q') or action == ord('Q'): self.cancel()
# 处理键盘输入 (Handle keyboard input)
if action == 32: # 空格键 - 切换到追踪模式 (Space key - switch to tracking mode)
self.Track_state = 'tracking'
elif action == ord('i') or action == ord('I'): # I键 - 切换到识别模式 (I key - switch to identify mode)
self.Track_state = "identify"
elif action == ord('r') or action == ord('R'): # R键 - 重置 (R key - reset)
self.Reset()
elif action == ord('q') or action == ord('Q'): # Q键 - 退出 (Q key - quit)
self.cancel()
# 初始化模式 - 鼠标选择ROI (Initialization mode - mouse ROI selection)
if self.Track_state == 'init':
cv.namedWindow(self.windows_name, cv.WINDOW_AUTOSIZE)
cv.setMouseCallback(self.windows_name, self.onMouse, 0)
if self.select_flags == True:
# 绘制选择框 (Draw selection box)
cv.line(rgb_img, self.cols, self.rows, (255, 0, 0), 2)
cv.rectangle(rgb_img, self.cols, self.rows, (0, 255, 0), 2)
# 如果ROI有效提取HSV范围 (If ROI is valid, extract HSV range)
if self.Roi_init[0] != self.Roi_init[2] and self.Roi_init[1] != self.Roi_init[3]:
rgb_img, self.hsv_range = self.color.Roi_hsv(rgb_img, self.Roi_init)
self.gTracker_state = True
self.dyn_update = True
else: self.Track_state = 'init'
else:
self.Track_state = 'init'
# 识别模式 - 从文件读取HSV参数 (Identify mode - read HSV parameters from file)
elif self.Track_state == "identify":
if os.path.exists(self.hsv_text): self.hsv_range = read_HSV(self.hsv_text)
else: self.Track_state = 'init'
if os.path.exists(self.hsv_text):
self.hsv_range = read_HSV(self.hsv_text)
else:
self.Track_state = 'init'
# 执行颜色识别 (Execute color recognition)
if self.Track_state != 'init':
if len(self.hsv_range) != 0:
# 目标跟随处理 (Object following processing)
rgb_img, binary, self.circle = self.color.object_follow(rgb_img, self.hsv_range)
# 动态更新HSV参数 (Dynamic update HSV parameters)
if self.dyn_update == True:
write_HSV(self.hsv_text, self.hsv_range)
self.Hmin = rclpy.parameter.Parameter('Hmin',rclpy.Parameter.Type.INTEGER,self.hsv_range[0][0])
self.Smin = rclpy.parameter.Parameter('Smin',rclpy.Parameter.Type.INTEGER,self.hsv_range[0][1])
self.Vmin = rclpy.parameter.Parameter('Vmin',rclpy.Parameter.Type.INTEGER,self.hsv_range[0][2])
self.Hmax = rclpy.parameter.Parameter('Hmax',rclpy.Parameter.Type.INTEGER,self.hsv_range[1][0])
self.Smax = rclpy.parameter.Parameter('Smax',rclpy.Parameter.Type.INTEGER,self.hsv_range[1][1])
self.Vmax = rclpy.parameter.Parameter('Vmax',rclpy.Parameter.Type.INTEGER,self.hsv_range[1][2])
all_new_parameters = [self.Hmin,self.Smin,self.Vmin,self.Hmax,self.Smax,self.Vmax]
# 更新ROS参数 (Update ROS parameters)
self.Hmin = rclpy.parameter.Parameter('Hmin', rclpy.Parameter.Type.INTEGER, self.hsv_range[0][0])
self.Smin = rclpy.parameter.Parameter('Smin', rclpy.Parameter.Type.INTEGER, self.hsv_range[0][1])
self.Vmin = rclpy.parameter.Parameter('Vmin', rclpy.Parameter.Type.INTEGER, self.hsv_range[0][2])
self.Hmax = rclpy.parameter.Parameter('Hmax', rclpy.Parameter.Type.INTEGER, self.hsv_range[1][0])
self.Smax = rclpy.parameter.Parameter('Smax', rclpy.Parameter.Type.INTEGER, self.hsv_range[1][1])
self.Vmax = rclpy.parameter.Parameter('Vmax', rclpy.Parameter.Type.INTEGER, self.hsv_range[1][2])
all_new_parameters = [self.Hmin, self.Smin, self.Vmin, self.Hmax, self.Smax, self.Vmax]
self.set_parameters(all_new_parameters)
self.dyn_update = False
self.dyn_update = False
# 追踪模式 - 发布目标位置 (Tracking mode - publish target position)
if self.Track_state == 'tracking':
self.Start_state = True
if self.circle[2] != 0: threading.Thread(
target=self.execute, args=(self.circle[0], self.circle[1], self.circle[2])).start()
if self.point_pose[0] != 0 and self.point_pose[1] != 0: threading.Thread(
target=self.execute, args=(self.point_pose[0], self.point_pose[1], self.point_pose[2])).start()
if self.circle[2] != 0:
threading.Thread(target=self.execute, args=(self.circle[0], self.circle[1], self.circle[2])).start()
if self.point_pose[0] != 0 and self.point_pose[1] != 0:
threading.Thread(target=self.execute, args=(self.point_pose[0], self.point_pose[1], self.point_pose[2])).start()
else:
# 停止机器人运动 (Stop robot movement)
if self.Start_state == True:
self.pub_cmdVel.publish(Twist())
self.Start_state = False
return rgb_img, binary
def execute(self, x, y, z):
"""
执行目标位置发布
Execute target position publishing
Args:
x (float): 目标X坐标 (Target X coordinate)
y (float): 目标Y坐标 (Target Y coordinate)
z (float): 目标距离/半径 (Target distance/radius)
"""
position = Position()
position.anglex = x * 1.0
position.angley = y * 1.0
position.distance = z * 1.0
position.anglex = x * 1.0 # X轴角度 (X-axis angle)
position.angley = y * 1.0 # Y轴角度 (Y-axis angle)
position.distance = z * 1.0 # 距离 (Distance)
self.pub_position.publish(position)
def get_param(self):
#hsv
"""
获取当前HSV参数
Get current HSV parameters
从ROS参数服务器获取最新的HSV颜色范围参数
Get latest HSV color range parameters from ROS parameter server
"""
self.Hmin = self.get_parameter('Hmin').get_parameter_value().integer_value
self.Smin = self.get_parameter('Smin').get_parameter_value().integer_value
self.Vmin = self.get_parameter('Vmin').get_parameter_value().integer_value
@ -144,34 +280,65 @@ class Color_Identify(Node):
self.refresh = self.get_parameter('refresh').get_parameter_value().bool_value
def Reset(self):
"""
重置颜色识别系统
Reset color recognition system
清除HSV范围目标圆形鼠标坐标等状态
Clear HSV range, target circle, mouse coordinates and other states
"""
self.hsv_range = ()
self.circle = (0, 0, 0)
self.Mouse_XY = (0, 0)
self.Track_state = 'init'
for i in range(3): self.pub_position.publish(Position())
print("succes!!!")
# 发布空的位置信息 (Publish empty position information)
for i in range(3):
self.pub_position.publish(Position())
print("Reset success!!!")
def cancel(self):
"""
取消操作并关闭窗口
Cancel operation and close windows
"""
print("Shutting down this node.")
cv.destroyAllWindows()
def onMouse(self, event, x, y, flags, param):
if event == 1:
"""
鼠标事件回调函数
Mouse event callback function
Args:
event: 鼠标事件类型 (Mouse event type)
x, y: 鼠标坐标 (Mouse coordinates)
flags: 鼠标标志 (Mouse flags)
param: 参数 (Parameters)
"""
if event == 1: # 鼠标左键按下 (Left mouse button pressed)
self.Track_state = 'init'
self.select_flags = True
self.Mouse_XY = (x, y)
if event == 4:
if event == 4: # 鼠标左键释放 (Left mouse button released)
self.select_flags = False
self.Track_state = 'mouse'
if self.select_flags == True:
# 计算ROI区域 (Calculate ROI area)
self.cols = min(self.Mouse_XY[0], x), min(self.Mouse_XY[1], y)
self.rows = max(self.Mouse_XY[0], x), max(self.Mouse_XY[1], y)
self.Roi_init = (self.cols[0], self.cols[1], self.rows[0], self.rows[1])
def main():
"""
主函数
Main function
初始化ROS节点并启动颜色识别功能
Initialize ROS node and start color identification functionality
"""
rclpy.init()
color_identify = Color_Identify("ColorIdentify")
print("start it")
print("Color identification node started")
rclpy.spin(color_identify)

@ -1,68 +1,142 @@
#ros lib
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
颜色目标追踪节点
Color target tracking node
功能说明 (Functionality):
- 基于深度信息的颜色目标追踪
- PID控制器实现平滑跟随
- 深度图像处理和距离计算
- 机器人运动控制
- 手柄状态监控
作者 (Author): UVPTCCS Team
日期 (Date): 2024
"""
# ROS2相关库 (ROS2 related libraries)
import rclpy
from rclpy.node import Node
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import CompressedImage, LaserScan, Image
from yahboomcar_msgs.msg import Position
#common lib
# 通用库 (Common libraries)
import os
import threading
import math
from yahboomcar_astra.astra_common import *
from yahboomcar_msgs.msg import Position
from cv_bridge import CvBridge
print("import done")
class color_Tracker(Node):
def __init__(self,name):
"""
颜色追踪节点类
Color tracking node class
功能 (Functions):
- 接收目标位置信息并进行追踪
- 使用深度信息计算目标距离
- PID控制器实现平滑运动
- 手柄状态监控和安全控制
"""
def __init__(self, name):
"""
初始化颜色追踪节点
Initialize color tracking node
Args:
name (str): 节点名称 (Node name)
"""
super().__init__(name)
#create the publisher
self.pub_cmdVel = self.create_publisher(Twist,'/cmd_vel',10)
#create the subscriber
self.sub_depth = self.create_subscription(Image,"/camera/depth/image_raw", self.depth_img_Callback, 1)
self.sub_JoyState = self.create_subscription(Bool,'/JoyState', self.JoyStateCallback,1)
self.sub_position = self.create_subscription(Position,"/Current_point",self.positionCallback,1)
# 创建发布者 (Create publisher)
self.pub_cmdVel = self.create_publisher(Twist, '/cmd_vel', 10) # 发布速度控制命令 (Publish velocity commands)
# 创建订阅者 (Create subscribers)
self.sub_depth = self.create_subscription(Image, "/camera/depth/image_raw", self.depth_img_Callback, 1) # 订阅深度图像 (Subscribe to depth image)
self.sub_JoyState = self.create_subscription(Bool, '/JoyState', self.JoyStateCallback, 1) # 订阅手柄状态 (Subscribe to joystick state)
self.sub_position = self.create_subscription(Position, "/Current_point", self.positionCallback, 1) # 订阅目标位置 (Subscribe to target position)
# CV桥接器 (CV bridge)
self.bridge = CvBridge()
self.minDist = 1500
self.Center_x = 0
self.Center_y = 0
self.Center_r = 0
self.Center_prevx = 0
self.Center_prevr = 0
self.prev_time = 0
self.prev_dist = 0
self.prev_angular = 0
self.Joy_active = False
self.Robot_Run = False
self.dist = []
self.encoding = ['16UC1', '32FC1']
self.linear_PID = (3.0, 0.0, 1.0)
self.angular_PID = (0.5, 0.0, 2.0)
self.scale = 1000
# 追踪参数 (Tracking parameters)
self.minDist = 1500 # 最小距离阈值 (Minimum distance threshold)
self.Center_x = 0 # 目标中心X坐标 (Target center X coordinate)
self.Center_y = 0 # 目标中心Y坐标 (Target center Y coordinate)
self.Center_r = 0 # 目标半径 (Target radius)
self.Center_prevx = 0 # 上一帧目标X坐标 (Previous frame target X coordinate)
self.Center_prevr = 0 # 上一帧目标半径 (Previous frame target radius)
# 时间和状态变量 (Time and state variables)
self.prev_time = 0 # 上一次时间 (Previous time)
self.prev_dist = 0 # 上一次距离 (Previous distance)
self.prev_angular = 0 # 上一次角度 (Previous angle)
self.Joy_active = False # 手柄是否激活 (Joystick active flag)
self.Robot_Run = False # 机器人运行状态 (Robot running state)
# 深度处理参数 (Depth processing parameters)
self.dist = [] # 距离数组 (Distance array)
self.encoding = ['16UC1', '32FC1'] # 深度图像编码格式 (Depth image encoding formats)
# PID控制器参数 (PID controller parameters)
self.linear_PID = (3.0, 0.0, 1.0) # 线性PID参数 (Linear PID parameters)
self.angular_PID = (0.5, 0.0, 2.0) # 角度PID参数 (Angular PID parameters)
self.scale = 1000 # 比例因子 (Scale factor)
# 初始化PID控制器 (Initialize PID controllers)
self.PID_init()
# 声明参数 (Declare parameters)
self.declare_param()
print("init done")
print("Color tracker initialization done")
def declare_param(self):
self.declare_parameter("linear_Kp",3.0)
"""
声明ROS参数
Declare ROS parameters
包含PID控制器参数和距离阈值参数
Contains PID controller parameters and distance threshold parameters
"""
# 线性PID参数 (Linear PID parameters)
self.declare_parameter("linear_Kp", 3.0) # 线性比例增益 (Linear proportional gain)
self.linear_Kp = self.get_parameter('linear_Kp').get_parameter_value().double_value
self.declare_parameter("linear_Ki",0.0)
self.declare_parameter("linear_Ki", 0.0) # 线性积分增益 (Linear integral gain)
self.linear_Ki = self.get_parameter('linear_Ki').get_parameter_value().double_value
self.declare_parameter("linear_Kd",1.0)
self.declare_parameter("linear_Kd", 1.0) # 线性微分增益 (Linear derivative gain)
self.linear_Kd = self.get_parameter('linear_Kd').get_parameter_value().double_value
self.declare_parameter("angular_Kp",0.5)
# 角度PID参数 (Angular PID parameters)
self.declare_parameter("angular_Kp", 0.5) # 角度比例增益 (Angular proportional gain)
self.angular_Kp = self.get_parameter('angular_Kp').get_parameter_value().double_value
self.declare_parameter("angular_Ki",0.0)
self.declare_parameter("angular_Ki", 0.0) # 角度积分增益 (Angular integral gain)
self.angular_Ki = self.get_parameter('angular_Ki').get_parameter_value().double_value
self.declare_parameter("angular_Kd",2.0)
self.declare_parameter("angular_Kd", 2.0) # 角度微分增益 (Angular derivative gain)
self.angular_Kd = self.get_parameter('angular_Kd').get_parameter_value().double_value
self.declare_parameter("scale",1000)
# 其他参数 (Other parameters)
self.declare_parameter("scale", 1000) # 比例因子 (Scale factor)
self.scale = self.get_parameter('scale').get_parameter_value().integer_value
self.declare_parameter("minDistance",1.0)
self.declare_parameter("minDistance", 1.0) # 最小距离 (Minimum distance)
self.minDistance = self.get_parameter('minDistance').get_parameter_value().double_value
def get_param(self):
"""
获取当前参数值
Get current parameter values
从ROS参数服务器获取最新的PID参数
Get latest PID parameters from ROS parameter server
"""
# 获取PID参数 (Get PID parameters)
self.linear_Kp = self.get_parameter('linear_Kp').get_parameter_value().double_value
self.linear_Ki = self.get_parameter('linear_Ki').get_parameter_value().double_value
self.linear_Kd = self.get_parameter('linear_Kd').get_parameter_value().double_value
@ -72,101 +146,225 @@ class color_Tracker(Node):
self.scale = self.get_parameter('scale').get_parameter_value().integer_value
self.minDistance = self.get_parameter('minDistance').get_parameter_value().double_value
# 更新PID参数组 (Update PID parameter groups)
self.linear_PID = (self.linear_Kp, self.linear_Ki, self.linear_Kd)
self.angular_PID = (self.angular_Kp, self.angular_Ki, self.angular_Kd)
self.minDist = self.minDistance * 1000
def PID_init(self):
"""
初始化PID控制器
Initialize PID controllers
创建线性和角度PID控制器实例
Create linear and angular PID controller instances
"""
# 线性PID控制器 (Linear PID controller)
self.linear_pid = simplePID(self.linear_PID[0] / 1000.0, self.linear_PID[1] / 1000.0, self.linear_PID[2] / 1000.0)
# 角度PID控制器 (Angular PID controller)
self.angular_pid = simplePID(self.angular_PID[0] / 100.0, self.angular_PID[1] / 100.0, self.angular_PID[2] / 100.0)
def depth_img_Callback(self, msg):
"""
深度图像回调函数
Depth image callback function
Args:
msg: 深度图像消息 (Depth image message)
功能 (Functions):
- 处理深度图像数据
- 计算目标距离
- 执行追踪控制
- 处理超时检测
"""
if not isinstance(msg, Image): return
# 转换深度图像 (Convert depth image)
depthFrame = self.bridge.imgmsg_to_cv2(msg, desired_encoding=self.encoding[1])
self.action = cv.waitKey(1)
# 如果检测到目标 (If target is detected)
if self.Center_r != 0:
# 超时检测 - 防止目标丢失后继续追踪 (Timeout detection - prevent tracking after target loss)
now_time = time.time()
if now_time - self.prev_time > 5:
if self.Center_prevx == self.Center_x and self.Center_prevr == self.Center_r: self.Center_r = 0
if self.Center_prevx == self.Center_x and self.Center_prevr == self.Center_r:
self.Center_r = 0
self.prev_time = now_time
# 多点距离采样 (Multi-point distance sampling)
distance = [0, 0, 0, 0, 0]
if 0 < int(self.Center_y - 3) and int(self.Center_y + 3) < 480 and 0 < int(
self.Center_x - 3) and int(self.Center_x + 3) < 640:
# print("depthFrame: ", len(depthFrame), len(depthFrame[0]))
distance[0] = depthFrame[int(self.Center_y - 3)][int(self.Center_x - 3)]
distance[1] = depthFrame[int(self.Center_y + 3)][int(self.Center_x - 3)]
distance[2] = depthFrame[int(self.Center_y - 3)][int(self.Center_x + 3)]
distance[3] = depthFrame[int(self.Center_y + 3)][int(self.Center_x + 3)]
distance[4] = depthFrame[int(self.Center_y)][int(self.Center_x)]
# 检查边界条件 (Check boundary conditions)
if (0 < int(self.Center_y - 3) and int(self.Center_y + 3) < 480 and
0 < int(self.Center_x - 3) and int(self.Center_x + 3) < 640):
# 采样5个点的深度值 (Sample depth values from 5 points)
distance[0] = depthFrame[int(self.Center_y - 3)][int(self.Center_x - 3)] # 左上 (Top-left)
distance[1] = depthFrame[int(self.Center_y + 3)][int(self.Center_x - 3)] # 左下 (Bottom-left)
distance[2] = depthFrame[int(self.Center_y - 3)][int(self.Center_x + 3)] # 右上 (Top-right)
distance[3] = depthFrame[int(self.Center_y + 3)][int(self.Center_x + 3)] # 右下 (Bottom-right)
distance[4] = depthFrame[int(self.Center_y)][int(self.Center_x)] # 中心 (Center)
# 计算平均距离 (Calculate average distance)
distance_ = 1000.0
num_depth_points = 5
for i in range(5):
if 40 < distance[i] < 80000: distance_ += distance[i]
else: num_depth_points -= 1
if num_depth_points == 0: distance_ = self.minDist
else: distance_ /= num_depth_points
#print("Center_x: {}, Center_y: {}, distance_: {}".format(self.Center_x, self.Center_y, distance_))
# 过滤有效深度值 (Filter valid depth values)
if 40 < distance[i] < 80000:
distance_ += distance[i]
else:
num_depth_points -= 1
# 计算平均值或使用默认值 (Calculate average or use default)
if num_depth_points == 0:
distance_ = self.minDist
else:
distance_ /= num_depth_points
# 执行追踪控制 (Execute tracking control)
self.execute(self.Center_x, distance_)
# 更新历史值 (Update historical values)
self.Center_prevx = self.Center_x
self.Center_prevr = self.Center_r
else:
if self.Robot_Run ==True:
# 目标丢失,停止机器人 (Target lost, stop robot)
if self.Robot_Run == True:
self.pub_cmdVel.publish(Twist())
self.Robot_Run = False
if self.action == ord('q') or self.action == 113: self.cleanup()
# 处理退出命令 (Handle exit command)
if self.action == ord('q') or self.action == 113:
self.cleanup()
def JoyStateCallback(self, msg):
"""
手柄状态回调函数
Joystick state callback function
Args:
msg: 手柄状态消息 (Joystick state message)
功能 (Functions):
- 监控手柄状态
- 在手柄激活时停止自动追踪
"""
if not isinstance(msg, Bool): return
self.Joy_active = msg.data
# 手柄激活时停止机器人运动 (Stop robot movement when joystick is active)
self.pub_cmdVel.publish(Twist())
def positionCallback(self, msg):
"""
目标位置回调函数
Target position callback function
Args:
msg: 目标位置消息 (Target position message)
功能 (Functions):
- 接收目标位置信息
- 更新追踪目标的坐标和半径
"""
if not isinstance(msg, Position): return
self.Center_x = msg.anglex
self.Center_y = msg.angley
self.Center_r = msg.distance
# 更新目标位置信息 (Update target position information)
self.Center_x = msg.anglex # 目标X坐标 (Target X coordinate)
self.Center_y = msg.angley # 目标Y坐标 (Target Y coordinate)
self.Center_r = msg.distance # 目标半径/距离 (Target radius/distance)
def cleanup(self):
"""
清理函数
Cleanup function
功能 (Functions):
- 停止机器人运动
- 关闭OpenCV窗口
- 清理资源
"""
self.pub_cmdVel.publish(Twist())
print ("Shutting down this node.")
print("Shutting down this node.")
cv.destroyAllWindows()
def execute(self, point_x, dist):
"""
执行追踪控制
Execute tracking control
Args:
point_x (float): 目标X坐标 (Target X coordinate)
dist (float): 目标距离 (Target distance)
功能 (Functions):
- 使用PID控制器计算运动命令
- 实现平滑的目标跟随
- 应用速度和角速度限制
"""
# 获取最新参数 (Get latest parameters)
self.get_param()
# 距离变化过滤 (Distance change filtering)
if abs(self.prev_dist - dist) > 300:
self.prev_dist = dist
return
# 角度变化过滤 (Angular change filtering)
if abs(self.prev_angular - point_x) > 300:
self.prev_angular = point_x
return
# 手柄激活时不执行自动控制 (Don't execute auto control when joystick is active)
if self.Joy_active == True: return
linear_x = self.linear_pid.compute(dist, self.minDist)
angular_z = self.angular_pid.compute(320, point_x)
if abs(dist - self.minDist) < 30: linear_x = 0
if abs(point_x - 320.0) < 30: angular_z = 0
# PID控制计算 (PID control calculation)
linear_x = self.linear_pid.compute(dist, self.minDist) # 线性速度控制 (Linear velocity control)
angular_z = self.angular_pid.compute(320, point_x) # 角速度控制 (Angular velocity control)
# 死区处理 (Dead zone processing)
if abs(dist - self.minDist) < 30: linear_x = 0 # 距离死区 (Distance dead zone)
if abs(point_x - 320.0) < 30: angular_z = 0 # 角度死区 (Angular dead zone)
# 创建运动命令 (Create motion command)
twist = Twist()
if angular_z>2.0:
# 角速度限制 (Angular velocity limits)
if angular_z > 2.0:
angular_z = 2.0
if angular_z<-2.0:
if angular_z < -2.0:
angular_z = -2.0
# 线速度限制 (Linear velocity limits)
if linear_x > 1.0:
linear_x = 1.0
if linear_x <-1.0:
if linear_x < -1.0:
linear_x = -1.0
twist.angular.z = angular_z * 1.0
twist.linear.x = linear_x * 1.0
print("twist.linear.x: ",twist.linear.x)
print("twist.angular.z: ",twist.angular.z)
self.pub_cmdVel.publish(twist)
self.Robot_Run = True
# 设置运动命令 (Set motion command)
twist.angular.z = angular_z * 1.0 # 角速度 (Angular velocity)
twist.linear.x = linear_x * 1.0 # 线速度 (Linear velocity)
# 调试输出 (Debug output)
print("twist.linear.x: ", twist.linear.x)
print("twist.angular.z: ", twist.angular.z)
# 发布运动命令 (Publish motion command)
self.pub_cmdVel.publish(twist)
self.Robot_Run = True
def main():
"""
主函数
Main function
初始化ROS节点并启动颜色追踪功能
Initialize ROS node and start color tracking functionality
"""
rclpy.init()
color_tracker = color_Tracker("ColorTracker")
print("start it")
print("Color tracker node started")
rclpy.spin(color_tracker)

@ -1,7 +1,22 @@
#!/usr/bin/env python
# encoding: utf-8
# -*- coding: utf-8 -*-
"""
X3型机器人底盘驱动节点
X3 robot chassis driver node
#public lib
功能说明 (Functionality):
- X3型机器人底层运动控制
- IMU数据采集和发布
- 电池电压监控
- RGB灯光控制
- 蜂鸣器控制
- 关节状态发布
作者 (Author): UVPTCCS Team
日期 (Date): 2024
"""
# 通用库 (Common libraries)
import sys
import math
import random
@ -10,169 +25,282 @@ from math import pi
from time import sleep
from Rosmaster_Lib import Rosmaster
#ros lib
# ROS2相关库 (ROS2 related libraries)
import rclpy
from rclpy.node import Node
from std_msgs.msg import String,Float32,Int32,Bool
from std_msgs.msg import String, Float32, Int32, Bool
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Imu,MagneticField, JointState
from sensor_msgs.msg import Imu, MagneticField, JointState
from rclpy.clock import Clock
#from dynamic_reconfigure.server import Server
car_type_dic={
'R2':5,
'X3':1,
'NONE':-1
# 机器人类型字典 (Robot type dictionary)
car_type_dic = {
'R2': 5, # R2型机器人 (R2 robot type)
'X3': 1, # X3型机器人 (X3 robot type)
'NONE': -1 # 无类型 (No type)
}
class yahboomcar_driver(Node):
def __init__(self, name):
super().__init__(name)
global car_type_dic
self.RA2DE = 180 / pi
self.car = Rosmaster()
self.car.set_car_type(1)
#get parameter
self.declare_parameter('car_type', 'X3')
self.car_type = self.get_parameter('car_type').get_parameter_value().string_value
print (self.car_type)
self.declare_parameter('imu_link', 'imu_link')
self.imu_link = self.get_parameter('imu_link').get_parameter_value().string_value
print (self.imu_link)
self.declare_parameter('Prefix', "")
self.Prefix = self.get_parameter('Prefix').get_parameter_value().string_value
print (self.Prefix)
self.declare_parameter('xlinear_limit', 1.0)
self.xlinear_limit = self.get_parameter('xlinear_limit').get_parameter_value().double_value
print (self.xlinear_limit)
self.declare_parameter('ylinear_limit', 1.0)
self.ylinear_limit = self.get_parameter('ylinear_limit').get_parameter_value().double_value
print (self.ylinear_limit)
self.declare_parameter('angular_limit', 5.0)
self.angular_limit = self.get_parameter('angular_limit').get_parameter_value().double_value
print (self.angular_limit)
#create subcriber
self.sub_cmd_vel = self.create_subscription(Twist,"cmd_vel",self.cmd_vel_callback,1)
self.sub_RGBLight = self.create_subscription(Int32,"RGBLight",self.RGBLightcallback,100)
self.sub_BUzzer = self.create_subscription(Bool,"Buzzer",self.Buzzercallback,100)
#create publisher
self.EdiPublisher = self.create_publisher(Float32,"edition",100)
self.volPublisher = self.create_publisher(Float32,"voltage",100)
self.staPublisher = self.create_publisher(JointState,"joint_states",100)
self.velPublisher = self.create_publisher(Twist,"vel_raw",50)
self.imuPublisher = self.create_publisher(Imu,"/imu/data_raw",100)
self.magPublisher = self.create_publisher(MagneticField,"/imu/mag",100)
#create timer
self.timer = self.create_timer(0.1, self.pub_data)
#create and init variable
self.edition = Float32()
self.edition.data = 1.0
self.car.create_receive_threading()
#callback function
def cmd_vel_callback(self,msg):
# 小车运动控制,订阅者回调函数
# Car motion control, subscriber callback function
if not isinstance(msg, Twist): return
# 下发线速度和角速度
# Issue linear vel and angular vel
vx = msg.linear.x*1.0
#vy = msg.linear.y/1000.0*180.0/3.1416 #Radian system
vy = msg.linear.y*1.0
angular = msg.angular.z*1.0 # wait for chang
self.car.set_car_motion(vx, vy, angular)
'''print("cmd_vx: ",vx)
print("cmd_vy: ",vy)
print("cmd_angular: ",angular)'''
#rospy.loginfo("nav_use_rot:{}".format(self.nav_use_rotvel))
#print(self.nav_use_rotvel)
def RGBLightcallback(self,msg):
# 流水灯控制,服务端回调函数 RGBLight control
if not isinstance(msg, Int32): return
# print ("RGBLight: ", msg.data)
for i in range(3): self.car.set_colorful_effect(msg.data, 6, parm=1)
def Buzzercallback(self,msg):
if not isinstance(msg, Bool): return
if msg.data:
for i in range(3): self.car.set_beep(1)
else:
for i in range(3): self.car.set_beep(0)
#pub data
def pub_data(self):
time_stamp = Clock().now()
imu = Imu()
twist = Twist()
battery = Float32()
edition = Float32()
mag = MagneticField()
state = JointState()
state.header.stamp = time_stamp.to_msg()
state.header.frame_id = "joint_states"
if len(self.Prefix)==0:
state.name = ["back_right_joint", "back_left_joint","front_left_steer_joint","front_left_wheel_joint",
"front_right_steer_joint", "front_right_wheel_joint"]
else:
state.name = [self.Prefix+"back_right_joint",self.Prefix+ "back_left_joint",self.Prefix+"front_left_steer_joint",self.Prefix+"front_left_wheel_joint",
self.Prefix+"front_right_steer_joint", self.Prefix+"front_right_wheel_joint"]
#print ("mag: ",self.car.get_magnetometer_data())
edition.data = self.car.get_version()*1.0
battery.data = self.car.get_battery_voltage()*1.0
ax, ay, az = self.car.get_accelerometer_data()
gx, gy, gz = self.car.get_gyroscope_data()
mx, my, mz = self.car.get_magnetometer_data()
mx = mx * 1.0
my = my * 1.0
mz = mz * 1.0
vx, vy, angular = self.car.get_motion_data()
'''print("vx: ",vx)
print("vy: ",vy)
print("angular: ",angular)'''
# 发布陀螺仪的数据
# Publish gyroscope data
imu.header.stamp = time_stamp.to_msg()
imu.header.frame_id = self.imu_link
imu.linear_acceleration.x = ax*1.0
imu.linear_acceleration.y = ay*1.0
imu.linear_acceleration.z = az*1.0
imu.angular_velocity.x = gx*1.0
imu.angular_velocity.y = gy*1.0
imu.angular_velocity.z = gz*1.0
mag.header.stamp = time_stamp.to_msg()
mag.header.frame_id = self.imu_link
mag.magnetic_field.x = mx*1.0
mag.magnetic_field.y = my*1.0
mag.magnetic_field.z = mz*1.0
# 将小车当前的线速度和角速度发布出去
# Publish the current linear vel and angular vel of the car
twist.linear.x = vx *1.0
twist.linear.y = vy *1.0
twist.angular.z = angular*1.0
self.velPublisher.publish(twist)
# print("ax: %.5f, ay: %.5f, az: %.5f" % (ax, ay, az))
# print("gx: %.5f, gy: %.5f, gz: %.5f" % (gx, gy, gz))
# print("mx: %.5f, my: %.5f, mz: %.5f" % (mx, my, mz))
# rospy.loginfo("battery: {}".format(battery))
# rospy.loginfo("vx: {}, vy: {}, angular: {}".format(twist.linear.x, twist.linear.y, twist.angular.z))
self.imuPublisher.publish(imu)
self.magPublisher.publish(mag)
self.volPublisher.publish(battery)
self.EdiPublisher.publish(edition)
"""
X3型机器人驱动节点类
X3 robot driver node class
功能 (Functions):
- 机器人底层运动控制
- 传感器数据采集和发布
- 外设控制灯光蜂鸣器
- 实时状态监控
"""
def __init__(self, name):
"""
初始化X3机器人驱动节点
Initialize X3 robot driver node
Args:
name (str): 节点名称 (Node name)
"""
super().__init__(name)
global car_type_dic
# 角度转换常数 (Angle conversion constant)
self.RA2DE = 180 / pi # 弧度转角度 (Radians to degrees)
# 初始化机器人底层库 (Initialize robot low-level library)
self.car = Rosmaster()
self.car.set_car_type(1) # 设置为X3型机器人 (Set to X3 robot type)
# 声明和获取参数 (Declare and get parameters)
self.declare_parameter('car_type', 'X3') # 机器人类型 (Robot type)
self.car_type = self.get_parameter('car_type').get_parameter_value().string_value
print("Car type:", self.car_type)
self.declare_parameter('imu_link', 'imu_link') # IMU坐标系名称 (IMU frame name)
self.imu_link = self.get_parameter('imu_link').get_parameter_value().string_value
print("IMU link:", self.imu_link)
self.declare_parameter('Prefix', "") # 话题前缀 (Topic prefix)
self.Prefix = self.get_parameter('Prefix').get_parameter_value().string_value
print("Prefix:", self.Prefix)
self.declare_parameter('xlinear_limit', 1.0) # X轴线速度限制 (X-axis linear velocity limit)
self.xlinear_limit = self.get_parameter('xlinear_limit').get_parameter_value().double_value
print("X linear limit:", self.xlinear_limit)
self.declare_parameter('ylinear_limit', 1.0) # Y轴线速度限制 (Y-axis linear velocity limit)
self.ylinear_limit = self.get_parameter('ylinear_limit').get_parameter_value().double_value
print("Y linear limit:", self.ylinear_limit)
self.declare_parameter('angular_limit', 5.0) # 角速度限制 (Angular velocity limit)
self.angular_limit = self.get_parameter('angular_limit').get_parameter_value().double_value
print("Angular limit:", self.angular_limit)
# 创建订阅者 (Create subscribers)
self.sub_cmd_vel = self.create_subscription(Twist, "cmd_vel", self.cmd_vel_callback, 1) # 运动控制订阅 (Motion control subscription)
self.sub_RGBLight = self.create_subscription(Int32, "RGBLight", self.RGBLightcallback, 100) # RGB灯光控制订阅 (RGB light control subscription)
self.sub_BUzzer = self.create_subscription(Bool, "Buzzer", self.Buzzercallback, 100) # 蜂鸣器控制订阅 (Buzzer control subscription)
# 创建发布者 (Create publishers)
self.EdiPublisher = self.create_publisher(Float32, "edition", 100) # 版本信息发布 (Edition information publisher)
self.volPublisher = self.create_publisher(Float32, "voltage", 100) # 电池电压发布 (Battery voltage publisher)
self.staPublisher = self.create_publisher(JointState, "joint_states", 100) # 关节状态发布 (Joint state publisher)
self.velPublisher = self.create_publisher(Twist, "vel_raw", 50) # 原始速度发布 (Raw velocity publisher)
self.imuPublisher = self.create_publisher(Imu, "/imu/data_raw", 100) # IMU数据发布 (IMU data publisher)
self.magPublisher = self.create_publisher(MagneticField, "/imu/mag", 100) # 磁力计数据发布 (Magnetometer data publisher)
# 创建定时器 (Create timer)
self.timer = self.create_timer(0.1, self.pub_data) # 10Hz数据发布定时器 (10Hz data publishing timer)
# 创建和初始化变量 (Create and initialize variables)
self.edition = Float32()
self.edition.data = 1.0
# 创建接收线程 (Create receive thread)
self.car.create_receive_threading()
print("X3 robot driver initialized successfully")
def cmd_vel_callback(self, msg):
"""
运动控制回调函数
Motion control callback function
Args:
msg (Twist): 运动控制消息包含线速度和角速度 (Motion control message with linear and angular velocities)
功能 (Functions):
- 接收运动控制命令
- 解析线速度和角速度
- 下发控制指令到底层
"""
if not isinstance(msg, Twist): return
# 提取速度信息 (Extract velocity information)
vx = msg.linear.x * 1.0 # X轴线速度 (X-axis linear velocity)
vy = msg.linear.y * 1.0 # Y轴线速度 (Y-axis linear velocity)
angular = msg.angular.z * 1.0 # 角速度 (Angular velocity)
# 下发运动控制指令 (Issue motion control command)
self.car.set_car_motion(vx, vy, angular)
# 调试输出 (Debug output)
# print("cmd_vx: ", vx)
# print("cmd_vy: ", vy)
# print("cmd_angular: ", angular)
def RGBLightcallback(self, msg):
"""
RGB灯光控制回调函数
RGB light control callback function
Args:
msg (Int32): RGB灯光控制消息 (RGB light control message)
功能 (Functions):
- 接收RGB灯光控制指令
- 控制机器人RGB灯光效果
"""
if not isinstance(msg, Int32): return
# 设置RGB灯光效果 (Set RGB light effect)
# 重复3次确保指令执行 (Repeat 3 times to ensure command execution)
for i in range(3):
self.car.set_colorful_effect(msg.data, 6, parm=1)
def Buzzercallback(self, msg):
"""
蜂鸣器控制回调函数
Buzzer control callback function
Args:
msg (Bool): 蜂鸣器控制消息 (Buzzer control message)
功能 (Functions):
- 接收蜂鸣器控制指令
- 控制机器人蜂鸣器开关
"""
if not isinstance(msg, Bool): return
if msg.data:
# 开启蜂鸣器 (Turn on buzzer)
for i in range(3):
self.car.set_beep(1)
else:
# 关闭蜂鸣器 (Turn off buzzer)
for i in range(3):
self.car.set_beep(0)
def pub_data(self):
"""
数据发布函数
Data publishing function
功能 (Functions):
- 采集传感器数据
- 发布IMU数据
- 发布磁力计数据
- 发布电池电压
- 发布版本信息
- 发布关节状态
- 发布原始速度
"""
# 获取当前时间戳 (Get current timestamp)
time_stamp = Clock().now()
# 初始化消息结构 (Initialize message structures)
imu = Imu()
twist = Twist()
battery = Float32()
edition = Float32()
mag = MagneticField()
state = JointState()
# 设置关节状态消息头 (Set joint state message header)
state.header.stamp = time_stamp.to_msg()
state.header.frame_id = "joint_states"
# 设置关节名称 (Set joint names)
if len(self.Prefix) == 0:
state.name = ["back_right_joint", "back_left_joint", "front_left_steer_joint", "front_left_wheel_joint",
"front_right_steer_joint", "front_right_wheel_joint"]
else:
state.name = [self.Prefix + "back_right_joint", self.Prefix + "back_left_joint",
self.Prefix + "front_left_steer_joint", self.Prefix + "front_left_wheel_joint",
self.Prefix + "front_right_steer_joint", self.Prefix + "front_right_wheel_joint"]
# 获取传感器数据 (Get sensor data)
edition.data = self.car.get_version() * 1.0 # 版本信息 (Version information)
battery.data = self.car.get_battery_voltage() * 1.0 # 电池电压 (Battery voltage)
# 获取IMU数据 (Get IMU data)
ax, ay, az = self.car.get_accelerometer_data() # 加速度计数据 (Accelerometer data)
gx, gy, gz = self.car.get_gyroscope_data() # 陀螺仪数据 (Gyroscope data)
mx, my, mz = self.car.get_magnetometer_data() # 磁力计数据 (Magnetometer data)
# 转换磁力计数据 (Convert magnetometer data)
mx = mx * 1.0
my = my * 1.0
mz = mz * 1.0
# 获取运动数据 (Get motion data)
vx, vy, angular = self.car.get_motion_data()
# 调试输出 (Debug output)
# print("vx: ", vx)
# print("vy: ", vy)
# print("angular: ", angular)
# 设置IMU消息 (Set IMU message)
imu.header.stamp = time_stamp.to_msg()
imu.header.frame_id = self.imu_link
imu.linear_acceleration.x = ax * 1.0 # X轴加速度 (X-axis acceleration)
imu.linear_acceleration.y = ay * 1.0 # Y轴加速度 (Y-axis acceleration)
imu.linear_acceleration.z = az * 1.0 # Z轴加速度 (Z-axis acceleration)
imu.angular_velocity.x = gx * 1.0 # X轴角速度 (X-axis angular velocity)
imu.angular_velocity.y = gy * 1.0 # Y轴角速度 (Y-axis angular velocity)
imu.angular_velocity.z = gz * 1.0 # Z轴角速度 (Z-axis angular velocity)
# 设置磁力计消息 (Set magnetometer message)
mag.header.stamp = time_stamp.to_msg()
mag.header.frame_id = self.imu_link
mag.magnetic_field.x = mx * 1.0 # X轴磁场强度 (X-axis magnetic field)
mag.magnetic_field.y = my * 1.0 # Y轴磁场强度 (Y-axis magnetic field)
mag.magnetic_field.z = mz * 1.0 # Z轴磁场强度 (Z-axis magnetic field)
# 设置速度消息 (Set velocity message)
twist.linear.x = vx * 1.0 # X轴线速度 (X-axis linear velocity)
twist.linear.y = vy * 1.0 # Y轴线速度 (Y-axis linear velocity)
twist.angular.z = angular * 1.0 # 角速度 (Angular velocity)
# 发布所有消息 (Publish all messages)
self.velPublisher.publish(twist) # 发布速度信息 (Publish velocity information)
self.imuPublisher.publish(imu) # 发布IMU数据 (Publish IMU data)
self.magPublisher.publish(mag) # 发布磁力计数据 (Publish magnetometer data)
self.volPublisher.publish(battery) # 发布电池电压 (Publish battery voltage)
self.EdiPublisher.publish(edition) # 发布版本信息 (Publish edition information)
# 调试输出 (Debug output)
# print("ax: %.5f, ay: %.5f, az: %.5f" % (ax, ay, az))
# print("gx: %.5f, gy: %.5f, gz: %.5f" % (gx, gy, gz))
# print("mx: %.5f, my: %.5f, mz: %.5f" % (mx, my, mz))
# print("battery: {}".format(battery.data))
# print("vx: {}, vy: {}, angular: {}".format(twist.linear.x, twist.linear.y, twist.angular.z))
def main():
rclpy.init()
driver = yahboomcar_driver('driver_node')
rclpy.spin(driver)
"""
主函数
Main function
初始化ROS节点并启动X3机器人驱动
Initialize ROS node and start X3 robot driver
"""
rclpy.init()
driver = yahboomcar_driver('driver_node')
print("X3 robot driver node started")
rclpy.spin(driver)
'''if __name__ == '__main__':
main()'''
if __name__ == '__main__':
main()

@ -1,45 +1,119 @@
#!/usr/bin/env python
# coding:utf-8
# -*- coding: utf-8 -*-
"""
激光雷达数据滤波节点
Laser scan data filtering node
功能说明 (Functionality):
- 激光雷达数据降采样处理
- 减少激光雷达数据点数量
- 提高数据处理效率
- 保持激光雷达数据的基本特征
作者 (Author): UVPTCCS Team
日期 (Date): 2024
"""
# ROS2相关库 (ROS2 related libraries)
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
import numpy as np
from rclpy.clock import Clock
class scan_compression(Node):
def __init__(self,name):
"""
激光雷达数据压缩节点类
Laser scan data compression node class
功能 (Functions):
- 对激光雷达数据进行降采样
- 减少数据传输量
- 提高系统处理效率
- 保持激光雷达数据的空间分辨率
"""
def __init__(self, name):
"""
初始化激光雷达数据压缩节点
Initialize laser scan data compression node
Args:
name (str): 节点名称 (Node name)
"""
super().__init__(name)
self.multiple = 2
self.pub = self.create_publisher(LaserScan, "/downsampled_scan", 1000)
self.laserSub = self.create_subscription(LaserScan,"/scan", self.laserCallback, 1000)
# 降采样倍数 (Downsampling multiple)
self.multiple = 2 # 每隔2个点采样一次 (Sample every 2 points)
# 创建发布者 (Create publisher)
self.pub = self.create_publisher(LaserScan, "/downsampled_scan", 1000) # 发布降采样后的激光雷达数据 (Publish downsampled laser scan data)
# 创建订阅者 (Create subscriber)
self.laserSub = self.create_subscription(LaserScan, "/scan", self.laserCallback, 1000) # 订阅原始激光雷达数据 (Subscribe to raw laser scan data)
print("Laser scan compression node initialized")
def laserCallback(self, data):
# self.get_logger().info("laserCallback")
"""
激光雷达数据回调函数
Laser scan data callback function
Args:
data (LaserScan): 原始激光雷达数据 (Raw laser scan data)
功能 (Functions):
- 接收原始激光雷达数据
- 进行降采样处理
- 发布处理后的数据
- 保持数据的时间戳和坐标系信息
"""
# 数据类型检查 (Data type check)
if not isinstance(data, LaserScan): return
# 创建新的激光雷达消息 (Create new laser scan message)
laser_scan = LaserScan()
laser_scan.header.stamp = Clock().now().to_msg()
laser_scan.header.frame_id = data.header.frame_id
laser_scan.angle_increment = data.angle_increment * self.multiple
laser_scan.time_increment = data.time_increment
laser_scan.intensities = data.intensities
laser_scan.scan_time = data.scan_time
laser_scan.angle_min = data.angle_min
laser_scan.angle_max = data.angle_max
laser_scan.range_min = data.range_min
laser_scan.range_max = data.range_max
# 设置消息头 (Set message header)
laser_scan.header.stamp = Clock().now().to_msg() # 当前时间戳 (Current timestamp)
laser_scan.header.frame_id = data.header.frame_id # 保持原始坐标系 (Keep original frame)
# 设置激光雷达参数 (Set laser scan parameters)
laser_scan.angle_increment = data.angle_increment * self.multiple # 角度增量乘以降采样倍数 (Angle increment multiplied by downsampling factor)
laser_scan.time_increment = data.time_increment # 时间增量保持不变 (Time increment remains unchanged)
laser_scan.intensities = data.intensities # 强度数据保持不变 (Intensity data remains unchanged)
laser_scan.scan_time = data.scan_time # 扫描时间保持不变 (Scan time remains unchanged)
laser_scan.angle_min = data.angle_min # 最小角度保持不变 (Minimum angle remains unchanged)
laser_scan.angle_max = data.angle_max # 最大角度保持不变 (Maximum angle remains unchanged)
laser_scan.range_min = data.range_min # 最小距离保持不变 (Minimum range remains unchanged)
laser_scan.range_max = data.range_max # 最大距离保持不变 (Maximum range remains unchanged)
# 调试输出 (Debug output)
# self.get_logger().info("len(np.array(data.ranges)) = {}".format(len(np.array(data.ranges))))
# 降采样处理 (Downsampling processing)
for i in range(len(np.array(data.ranges))):
if i % self.multiple == 0: laser_scan.ranges.append(data.ranges[i])
# 每隔multiple个点采样一次 (Sample every 'multiple' points)
if i % self.multiple == 0:
laser_scan.ranges.append(data.ranges[i])
# 发布降采样后的数据 (Publish downsampled data)
self.pub.publish(laser_scan)
def main():
"""
主函数
Main function
初始化ROS节点并启动激光雷达数据压缩功能
Initialize ROS node and start laser scan data compression functionality
"""
rclpy.init()
scan_cp = scan_compression("scan_dilute")
print("Laser scan compression node started")
rclpy.spin(scan_cp)
scan_cp.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

@ -1,15 +1,54 @@
/**
*
* Point cloud mapping main program
*
* (Functionality):
* - ROS2
* -
* - 线
* -
*
* (Author): UVPTCCS Team
* (Date): 2024
*/
#include "point_cloud.h"
#include <thread>
using namespace std;
/**
*
* Main function
*
* Args:
* argc: (Command line argument count)
* argv: (Command line argument array)
*
* Returns:
* int: 退 (Program exit status code)
*/
int main(int argc, char **argv)
{
// 初始化ROS2系统 (Initialize ROS2 system)
rclcpp::init(argc, argv);
// 创建点云建图节点 (Create point cloud mapping node)
auto node = std::make_shared<PointCloudMapper>();
thread t(&PointCloudMapper::viewer, node); // 第二个参数必须填对象的this指针否则会拷贝对象。
// 启动点云可视化线程 (Start point cloud visualization thread)
// 第二个参数必须填对象的this指针否则会拷贝对象
// The second parameter must be the object's this pointer, otherwise the object will be copied
thread t(&PointCloudMapper::viewer, node);
// 运行ROS2主循环 (Run ROS2 main loop)
rclcpp::spin(node);
// 关闭点云建图器 (Shutdown point cloud mapper)
node->shutdown();
// 关闭ROS2系统 (Shutdown ROS2 system)
rclcpp::shutdown();
return 0;
}

@ -1,5 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
激光雷达数据转点云发布器
Laser scan to point cloud publisher
功能说明 (Functionality):
- 将激光雷达扫描数据转换为点云数据
- 进行坐标变换计算
- 发布路径消息格式的点云数据
- 实时处理激光雷达数据流
作者 (Author): UVPTCCS Team
日期 (Date): 2024
"""
# ROS2相关库 (ROS2 related libraries)
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped, TransformStamped
@ -9,55 +24,148 @@ import tf2_ros
import math
class laserscanToPointPublish(Node):
"""
激光雷达数据转点云发布器类
Laser scan to point cloud publisher class
功能 (Functions):
- 订阅激光雷达扫描数据
- 将极坐标数据转换为笛卡尔坐标
- 生成路径消息格式的点云数据
- 实时发布转换后的点云数据
"""
def __init__(self):
super().__init__('robot_pose_publisher')
"""
初始化激光雷达数据转点云发布器
Initialize laser scan to point cloud publisher
功能 (Functions):
- 创建激光雷达数据订阅者
- 创建点云数据发布者
- 设置消息队列大小
"""
super().__init__('laserscan_to_point_publisher')
# 创建激光雷达数据订阅者 (Create laser scan subscriber)
self.subscription = self.create_subscription(
LaserScan,
'/scan',
self.laserscan_callback,
10)
self.sacn_point_publisher = self.create_publisher(
Path,
'/scan_points',
10)
LaserScan, # 消息类型 (Message type)
'/scan', # 话题名称 (Topic name)
self.laserscan_callback, # 回调函数 (Callback function)
10) # 队列大小 (Queue size)
# 创建点云数据发布者 (Create point cloud publisher)
self.scan_point_publisher = self.create_publisher(
Path, # 消息类型 (Message type)
'/scan_points', # 话题名称 (Topic name)
10) # 队列大小 (Queue size)
print("Laser scan to point cloud publisher initialized")
def laserscan_callback(self, msg):
# print(msg)
angle_min = msg.angle_min
angle_increment = msg.angle_increment
laserscan = msg.ranges
# print(laserscan)
laser_points = self.laserscan_to_points(laserscan, angle_increment, angle_increment)
self.sacn_point_publisher.publish(laser_points)
"""
激光雷达数据回调函数
Laser scan data callback function
Args:
msg (LaserScan): 激光雷达扫描数据消息 (Laser scan data message)
功能 (Functions):
- 接收激光雷达扫描数据
- 提取扫描参数
- 转换为点云数据
- 发布转换后的数据
"""
# 提取激光雷达扫描参数 (Extract laser scan parameters)
angle_min = msg.angle_min # 最小扫描角度 (Minimum scan angle)
angle_increment = msg.angle_increment # 角度增量 (Angle increment)
laserscan = msg.ranges # 距离数据数组 (Range data array)
# 调试输出 (Debug output)
# print("Received laser scan data:", msg)
# print("Range data:", laserscan)
# 将激光雷达数据转换为点云 (Convert laser scan data to point cloud)
laser_points = self.laserscan_to_points(laserscan, angle_min, angle_increment)
# 发布转换后的点云数据 (Publish converted point cloud data)
self.scan_point_publisher.publish(laser_points)
def laserscan_to_points(self, laserscan, angle_min, angle_increment):
points = []
angle = angle_min
laser_points = Path()
"""
将激光雷达扫描数据转换为点云数据
Convert laser scan data to point cloud data
Args:
laserscan (list): 激光雷达距离数据数组 (Laser scan range data array)
angle_min (float): 最小扫描角度 (Minimum scan angle)
angle_increment (float): 角度增量 (Angle increment)
Returns:
Path: 包含点云数据的路径消息 (Path message containing point cloud data)
功能 (Functions):
- 遍历所有激光雷达数据点
- 进行极坐标到笛卡尔坐标的转换
- 生成PoseStamped消息
- 组装成Path消息格式
"""
points = [] # 点云数据列表 (Point cloud data list)
angle = angle_min # 当前角度 (Current angle)
laser_points = Path() # 路径消息 (Path message)
# 遍历所有距离数据点 (Iterate through all range data points)
for distance in laserscan:
x = distance * math.cos(angle)
y = distance * math.sin(angle)
# 极坐标转笛卡尔坐标 (Convert polar coordinates to Cartesian coordinates)
x = distance * math.cos(angle) # X坐标 (X coordinate)
y = distance * math.sin(angle) # Y坐标 (Y coordinate)
# 创建位姿消息 (Create pose message)
pose = PoseStamped()
pose.pose.position.x = x
pose.pose.position.y = y
pose.pose.position.x = x # 设置X坐标 (Set X coordinate)
pose.pose.position.y = y # 设置Y坐标 (Set Y coordinate)
pose.pose.position.z = 0.0 # Z坐标设为0 (Set Z coordinate to 0)
# 添加到点云列表 (Add to point cloud list)
points.append(pose)
# 更新角度 (Update angle)
angle += angle_increment
# 设置路径消息的位姿数组 (Set pose array of path message)
laser_points.poses = points
return laser_points
def main(args=None):
"""
主函数
Main function
Args:
args: 命令行参数 (Command line arguments)
功能 (Functions):
- 初始化ROS2系统
- 创建激光雷达数据转点云发布器节点
- 启动节点循环
- 清理资源
"""
# 初始化ROS2系统 (Initialize ROS2 system)
rclpy.init(args=args)
# 创建激光雷达数据转点云发布器节点 (Create laser scan to point cloud publisher node)
robot_laser_scan_publisher = laserscanToPointPublish()
print("Laser scan to point cloud publisher node started")
# 启动节点循环 (Start node loop)
rclpy.spin(robot_laser_scan_publisher)
robot_pose_publisher.destroy_node()
# 清理节点资源 (Clean up node resources)
robot_laser_scan_publisher.destroy_node()
# 关闭ROS2系统 (Shutdown ROS2 system)
rclpy.shutdown()
if __name__ == '__main__':

@ -1,12 +1,18 @@
/*!
* \file robot_pose_publisher.cpp
* \brief Publishes the robot's position in a geometry_msgs/Pose message.
* \brief 姿 - Publishes the robot's position in a geometry_msgs/Pose message.
*
* (Functionality):
* - TF姿
* - 姿
* - 姿
*
* Publishes the robot's position in a geometry_msgs/Pose message based on the TF
* difference between /map and /base_link.
*
* \author Milan - milan.madathiparambil@gmail.com
* \date April 20 1020
* \author UVPTCCS Team (Chinese comments)
* \date April 20 2020
*/
#include <chrono>
@ -20,79 +26,150 @@
using namespace std::chrono_literals;
/* This example creates a subclass of Node and uses std::bind() to register a
* member function as a callback from the timer. */
/**
* 姿
* Robot pose publisher class
*
* (Functions):
* - TF
* - 姿
* - 姿
* -
*/
class RobotPosePublisher : public rclcpp::Node
{
public:
RobotPosePublisher() : Node("robot_pose_publisher")
{
tf_buffer_ = std::make_shared<tf2_ros::Buffer>(this->get_clock());
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_);
/**
*
* Constructor
*
* (Functions):
* - TF
* -
* -
* -
*/
RobotPosePublisher() : Node("robot_pose_publisher")
{
// 初始化TF缓冲区和监听器 (Initialize TF buffer and listener)
tf_buffer_ = std::make_shared<tf2_ros::Buffer>(this->get_clock());
tf_listener_ = std::make_shared<tf2_ros::TransformListener>(*tf_buffer_);
this->declare_parameter<std::string>("map_frame","map");
this->declare_parameter<std::string>("base_frame","base_link");
this->declare_parameter<bool>("is_stamped",false);
// 声明参数 (Declare parameters)
this->declare_parameter<std::string>("map_frame", "map"); // 地图坐标系名称 (Map frame name)
this->declare_parameter<std::string>("base_frame", "base_link"); // 机器人基座坐标系名称 (Robot base frame name)
this->declare_parameter<bool>("is_stamped", false); // 是否发布带时间戳的消息 (Whether to publish stamped messages)
this->get_parameter("map_frame", map_frame);
this->get_parameter("base_frame", base_frame);
this->get_parameter("is_stamped", is_stamped);
// 获取参数值 (Get parameter values)
this->get_parameter("map_frame", map_frame);
this->get_parameter("base_frame", base_frame);
this->get_parameter("is_stamped", is_stamped);
if (is_stamped)
publisher_stamp = this->create_publisher<geometry_msgs::msg::PoseStamped>("robot_pose", 1);
else
publisher_ = this->create_publisher<geometry_msgs::msg::Pose>("robot_pose", 1);
timer_ = this->create_wall_timer(
50ms, std::bind(&RobotPosePublisher::timer_callback, this));
}
// 根据参数创建相应的发布者 (Create appropriate publisher based on parameters)
if (is_stamped)
publisher_stamp = this->create_publisher<geometry_msgs::msg::PoseStamped>("robot_pose", 1);
else
publisher_ = this->create_publisher<geometry_msgs::msg::Pose>("robot_pose", 1);
// 创建定时器50ms周期 (Create timer with 50ms period)
timer_ = this->create_wall_timer(
50ms, std::bind(&RobotPosePublisher::timer_callback, this));
RCLCPP_INFO(this->get_logger(), "Robot pose publisher initialized");
RCLCPP_INFO(this->get_logger(), "Map frame: %s", map_frame.c_str());
RCLCPP_INFO(this->get_logger(), "Base frame: %s", base_frame.c_str());
RCLCPP_INFO(this->get_logger(), "Is stamped: %s", is_stamped ? "true" : "false");
}
private:
void timer_callback()
{
geometry_msgs::msg::TransformStamped transformStamped;
try
{
transformStamped = tf_buffer_->lookupTransform(map_frame, base_frame,
this->now());
}
catch (tf2::TransformException &ex)
{
return;
}
geometry_msgs::msg::PoseStamped pose_stamped;
pose_stamped.header.frame_id = map_frame;
pose_stamped.header.stamp = this->now();
/**
*
* Timer callback function
*
* (Functions):
* - TF
* - 姿
* - 姿
* -
*/
void timer_callback()
{
geometry_msgs::msg::TransformStamped transformStamped;
try
{
// 查找从base_frame到map_frame的变换 (Look up transform from base_frame to map_frame)
transformStamped = tf_buffer_->lookupTransform(
map_frame, base_frame, tf2::TimePointZero);
}
catch (tf2::TransformException &ex)
{
// TF变换查找失败 (TF transform lookup failed)
RCLCPP_WARN(this->get_logger(), "Could not transform %s to %s: %s",
base_frame.c_str(), map_frame.c_str(), ex.what());
return;
}
pose_stamped.pose.orientation.x = transformStamped.transform.rotation.x;
pose_stamped.pose.orientation.y = transformStamped.transform.rotation.y;
pose_stamped.pose.orientation.z = transformStamped.transform.rotation.z;
pose_stamped.pose.orientation.w = transformStamped.transform.rotation.w;
// 根据消息类型发布位姿信息 (Publish pose information based on message type)
if (is_stamped)
{
// 发布带时间戳的位姿消息 (Publish stamped pose message)
geometry_msgs::msg::PoseStamped pose_stamped;
pose_stamped.header.stamp = this->get_clock()->now();
pose_stamped.header.frame_id = map_frame;
pose_stamped.pose.position.x = transformStamped.transform.translation.x;
pose_stamped.pose.position.y = transformStamped.transform.translation.y;
pose_stamped.pose.position.z = transformStamped.transform.translation.z;
pose_stamped.pose.orientation = transformStamped.transform.rotation;
publisher_stamp->publish(pose_stamped);
}
else
{
// 发布不带时间戳的位姿消息 (Publish non-stamped pose message)
geometry_msgs::msg::Pose pose;
pose.position.x = transformStamped.transform.translation.x;
pose.position.y = transformStamped.transform.translation.y;
pose.position.z = transformStamped.transform.translation.z;
pose.orientation = transformStamped.transform.rotation;
publisher_->publish(pose);
}
}
pose_stamped.pose.position.x = transformStamped.transform.translation.x;
pose_stamped.pose.position.y = transformStamped.transform.translation.y;
pose_stamped.pose.position.z = transformStamped.transform.translation.z;
if (is_stamped)
publisher_stamp->publish(pose_stamped);
else
publisher_->publish(pose_stamped.pose);
}
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<geometry_msgs::msg::PoseStamped>::SharedPtr publisher_stamp;
rclcpp::Publisher<geometry_msgs::msg::Pose>::SharedPtr publisher_;
size_t count_;
bool is_stamped = false;
std::string base_frame = "base_link";
std::string map_frame = "map";
std::shared_ptr<tf2_ros::TransformListener> tf_listener_;
std::shared_ptr<tf2_ros::Buffer> tf_buffer_;
// 成员变量 (Member variables)
rclcpp::TimerBase::SharedPtr timer_; // 定时器 (Timer)
rclcpp::Publisher<geometry_msgs::msg::PoseStamped>::SharedPtr publisher_stamp; // 带时间戳的发布者 (Stamped publisher)
rclcpp::Publisher<geometry_msgs::msg::Pose>::SharedPtr publisher_; // 不带时间戳的发布者 (Non-stamped publisher)
size_t count_; // 计数器 (Counter)
bool is_stamped = false; // 是否使用时间戳 (Whether to use timestamp)
std::string base_frame = "base_link"; // 机器人基座坐标系 (Robot base frame)
std::string map_frame = "map"; // 地图坐标系 (Map frame)
std::shared_ptr<tf2_ros::TransformListener> tf_listener_; // TF监听器 (TF listener)
std::shared_ptr<tf2_ros::Buffer> tf_buffer_; // TF缓冲区 (TF buffer)
};
/**
*
* Main function
*
* Args:
* argc: (Command line argument count)
* argv: (Command line argument array)
*
* Returns:
* int: 退 (Program exit status code)
*/
int main(int argc, char *argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<RobotPosePublisher>());
rclcpp::shutdown();
return 0;
// 初始化ROS2系统 (Initialize ROS2 system)
rclcpp::init(argc, argv);
// 创建并运行机器人位姿发布器节点 (Create and run robot pose publisher node)
rclcpp::spin(std::make_shared<RobotPosePublisher>());
// 关闭ROS2系统 (Shutdown ROS2 system)
rclcpp::shutdown();
return 0;
}

Loading…
Cancel
Save