You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
9.3 KiB

#!/usr/bin/env python
# encoding: utf-8
import sys
sys.path.append("/home/jetson/Transbot/transbot")
import rospy
import random
import threading
from math import pi
from time import sleep
from transbot_msgs.msg import *
from transbot_msgs.srv import *
from sensor_msgs.msg import Imu
from Transbot_Lib import Transbot
from geometry_msgs.msg import Twist
from arm_transbot import Transbot_ARM
from dynamic_reconfigure.server import Server
from transbot_bringup.cfg import PIDparamConfig
class transbot_driver:
def __init__(self):
rospy.on_shutdown(self.cancel)
self.bot_arm = Transbot_ARM()
bot_arm_offset = self.bot_arm.get_arm_offset()
self.bot = Transbot(arm_offset=bot_arm_offset)
# 弧度转角度
# Radians turn angle
self.RA2DE = 180 / pi
imu = rospy.get_param("imu", "/transbot/imu")
vel = rospy.get_param("vel", "/transbot/get_vel")
self.CameraDevice = rospy.get_param("CameraDevice", "astra")
self.linear_max = rospy.get_param('~linear_speed_limit', 0.4)
self.linear_min = rospy.get_param('~linear_speed_limit', 0.0)
self.angular_max = rospy.get_param('~angular_speed_limit', 2.0)
self.angular_min = rospy.get_param('~angular_speed_limit', 0.0)
self.sub_cmd_vel = rospy.Subscriber("/cmd_vel", Twist, self.cmd_vel_callback, queue_size=10)
self.sub_TargetAngle = rospy.Subscriber("/TargetAngle", Arm, self.sub_armcallback, queue_size=10)
self.sub_PWMServo = rospy.Subscriber("/PWMServo", PWMServo, self.sub_PWMServocallback, queue_size=10)
self.sub_Adjust = rospy.Subscriber("/Adjust", Adjust, self.sub_Adjustcallback, queue_size=10)
self.srv_CurrentAngle = rospy.Service("/CurrentAngle", RobotArm, self.RobotArmcallback)
self.srv_RGBLight = rospy.Service("/RGBLight", RGBLight, self.RGBLightcallback)
self.srv_Buzzer = rospy.Service("/Buzzer", Buzzer, self.Buzzercallback)
self.srv_Headlight = rospy.Service("/Headlight", Headlight, self.Headlightcallback)
self.ediPublisher = rospy.Publisher('/edition', Edition, queue_size=10)
self.volPublisher = rospy.Publisher("/voltage", Battery, queue_size=10)
self.velPublisher = rospy.Publisher(vel, Twist, queue_size=10)
self.imuPublisher = rospy.Publisher(imu, Imu, queue_size=10)
self.dyn_server = Server(PIDparamConfig, self.dynamic_reconfigure_callback)
self.bot.create_receive_threading()
self.bot.set_uart_servo_angle(9, 90)
def cancel(self):
self.srv_CurrentAngle.shutdown()
self.srv_RGBLight.shutdown()
self.srv_Buzzer.shutdown()
self.srv_Headlight.shutdown()
self.velPublisher.unregister()
self.imuPublisher.unregister()
self.volPublisher.unregister()
self.sub_cmd_vel.unregister()
self.sub_TargetAngle.unregister()
self.sub_PWMServo.unregister()
# Always stop the robot when shutting down the node
rospy.loginfo("Close the robot...")
rospy.sleep(1)
def RobotArmcallback(self, request):
# 服务端,机械臂当前关节角度
# Server, the current joint angle of the robotic arm
if not isinstance(request, RobotArmRequest): return
# print ("request: ",request)
response = RobotArmResponse()
joints = self.bot.get_uart_servo_angle_array()
for i in range(3):
joint = Joint()
joint.id = i + 7
joint.angle = joints[i]
joint.run_time = 500
if joints[i] < 0: continue
else: response.RobotArm.joint.append(joint)
# print ("id: {},angle: {}".format(joint.id, joint.angle))
return response
def pub_data(self):
# 发布小车运动速度、陀螺仪数据、电池电压
## Publish the speed of the car, gyroscope data, and battery voltage
while not rospy.is_shutdown():
sleep(0.05)
ax, ay, az = self.bot.get_accelerometer_data()
gx, gy, gz = self.bot.get_gyroscope_data()
velocity, angular = self.bot.get_motion_data()
voltage = self.bot.get_battery_voltage()
battery = Battery()
battery.Voltage = voltage
self.volPublisher.publish(battery)
robot_edition = self.bot.get_version()
edition = Edition()
edition.edition = robot_edition
self.ediPublisher.publish(edition)
# rospy.loginfo("battery: {}".format(battery))
# 发布陀螺仪的数据
# Publish gyroscope data
imu = Imu()
imu.linear_acceleration.x = ax
imu.linear_acceleration.y = ay
imu.linear_acceleration.z = az
imu.angular_velocity.x = gx
imu.angular_velocity.y = gy
imu.angular_velocity.z = gz
self.imuPublisher.publish(imu)
# 将小车当前的线速度和角速度发布出去
# Publish the current linear velocity and angular velocity of the car
twist = Twist()
twist.linear.x = velocity
twist.angular.z = angular
# print(ax, ay, az, gx, gy, gz)
# rospy.loginfo("velocity: {}, angular: {}".format(twist.linear.x, twist.angular.z))
self.velPublisher.publish(twist)
def sub_Adjustcallback(self, msg):
# 机械臂控制,订阅者回调函数
# Robotic arm control, subscriber callback function
if not isinstance(msg, Adjust): return
self.bot.set_imu_adjust(msg.adjust)
def sub_armcallback(self, msg):
# 机械臂控制,订阅者回调函数
# Robotic arm control, subscriber callback function
if not isinstance(msg, Arm): return
for joint in msg.joint:
if not isinstance(joint, Joint): continue
# print "joint: ", joint
if joint.run_time != 0: self.bot.set_uart_servo_angle(joint.id, joint.angle, joint.run_time)
def sub_PWMServocallback(self, msg):
# 云台控制,订阅者回调函数
# PWMServo control, subscriber callback function
PWMServo_angle = int(msg.angle)
if self.CameraDevice == "astra":
if msg.id == 1:
if PWMServo_angle <= 60: PWMServo_angle = 60
elif PWMServo_angle >= 120: PWMServo_angle = 120
if msg.id == 2:
if PWMServo_angle > 140: PWMServo_angle = 140
self.bot.set_pwm_servo(msg.id, PWMServo_angle)
def cmd_vel_callback(self, msg):
# 小车运动控制,订阅者回调函数
# Car motion control, subscriber callback function
if not isinstance(msg, Twist): return
# 下发线速度和角速度
# Issue linear velocity and angular velocity
velocity = msg.linear.x
angular = msg.angular.z
# 小车运动控制,velocity=[-0.45, 0.45], angular=[2, 2]
# Trolley motion control,velocity=[-0.45, 0.45], angular=[2, 2]
if velocity > self.linear_max:
velocity = self.linear_max
elif velocity < -self.linear_max:
velocity = -self.linear_max
elif -self.linear_min < velocity < 0:
velocity = -self.linear_min
elif 0 < velocity < self.linear_min:
velocity = self.linear_min
if angular > self.angular_max:
angular = self.angular_max
elif angular < -self.angular_max:
angular = -self.angular_max
elif -self.angular_min < angular < 0:
angular = -self.angular_min
elif 0 < angular < self.angular_min:
angular = self.angular_min
# rospy.loginfo("cmd_vel: {}, cmd_ang: {}".format(velocity, angular))
self.bot.set_car_motion(velocity, angular)
def RGBLightcallback(self, request):
# 流水灯控制,服务端回调函数
# RGBLight control, server callback function
if not isinstance(request, RGBLightRequest): return
# print (request.effect, request.speed)
self.bot.set_colorful_effect(request.effect, request.speed, parm=1)
response = RGBLightResponse()
response.result = True
return response
def Buzzercallback(self, request):
# 蜂鸣器控制,服务端回调函数
# Buzzer control, server callback function
if not isinstance(request, BuzzerRequest): return
self.bot.set_beep(request.buzzer)
response = BuzzerResponse()
response.result = True
return response
def Headlightcallback(self, request):
# 探照灯控制,服务端回调函数
# Searchlight control, server callback function
if not isinstance(request, HeadlightRequest): return
self.bot.set_floodlight(request.Headlight)
response = HeadlightResponse()
response.result = True
return response
def dynamic_reconfigure_callback(self, config, level):
# self.bot.set_pid_param(config['Kp'], config['Ki'], config['Kd'])
print(config['Kp'], config['Ki'], config['Kd'])
self.linear_max = config['linear_max']
self.linear_min = config['linear_min']
self.angular_max = config['angular_max']
self.angular_min = config['angular_min']
return config
if __name__ == '__main__':
rospy.init_node("driver_node", anonymous=False)
try:
driver = transbot_driver()
driver.pub_data()
rospy.spin()
except:
rospy.loginfo("Final!!!")