You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
teamwk123/carsrun/base_ctrl.py

286 lines
7.6 KiB

import serial
import json
import queue
import threading
import yaml
import os
import time
import glob
import numpy as np
curpath = os.path.realpath(__file__)
thisPath = os.path.dirname(curpath)
with open(thisPath + '/config.yaml', 'r') as yaml_file:
f = yaml.safe_load(yaml_file)
class ReadLine:
def __init__(self, s):
self.buf = bytearray()
self.s = s
self.sensor_data = []
self.sensor_list = []
try:
self.sensor_data_ser = serial.Serial(glob.glob('/dev/ttyUSB*')[0], 115200)
print("/dev/ttyUSB* connected succeed")
except:
self.sensor_data_ser = None
self.sensor_data_max_len = 51
try:
self.lidar_ser = serial.Serial(glob.glob('/dev/ttyACM*')[0], 230400, timeout=1)
print("/dev/ttyACM* connected succeed")
except:
self.lidar_ser = None
self.ANGLE_PER_FRAME = 12
self.HEADER = 0x54
self.lidar_angles = []
self.lidar_distances = []
self.lidar_angles_show = []
self.lidar_distances_show = []
self.last_start_angle = 0
def readline(self):
i = self.buf.find(b"\n")
if i >= 0:
r = self.buf[:i+1]
self.buf = self.buf[i+1:]
return r
while True:
i = max(1, min(512, self.s.in_waiting))
data = self.s.read(i)
i = data.find(b"\n")
if i >= 0:
r = self.buf + data[:i+1]
self.buf[0:] = data[i+1:]
return r
else:
self.buf.extend(data)
def clear_buffer(self):
self.s.reset_input_buffer()
def read_sensor_data(self):
if self.sensor_data_ser == None:
return
try:
buffer_clear = False
while self.sensor_data_ser.in_waiting > 0:
buffer_clear = True
sensor_readline = self.sensor_data_ser.readline()
if len(sensor_readline) <= self.sensor_data_max_len:
self.sensor_list.append(sensor_readline.decode('utf-8')[:-2])
else:
self.sensor_list.append(sensor_readline.decode('utf-8')[:self.sensor_data_max_len])
self.sensor_list.append(sensor_readline.decode('utf-8')[self.sensor_data_max_len:-2])
if buffer_clear:
self.sensor_data = self.sensor_list.copy()
self.sensor_list.clear()
self.sensor_data_ser.reset_input_buffer()
except Exception as e:
print(f"[base_ctrl.read_sensor_data] error: {e}")
def parse_lidar_frame(self, data):
# header = data[0]
# verlen = data[1]
# speed = data[3] << 8 | data[2]
start_angle = (data[5] << 8 | data[4]) * 0.01
# print(start)
# end_angle = (data[43] << 8 | data[42]) * 0.01
for i in range(0, self.ANGLE_PER_FRAME):
offset = 6 + i * 3
distance = data[offset+1] << 8 | data[offset]
confidence = data[offset+2]
# lidar_angles.append(np.radians(start_angle + i * 0.167))
self.lidar_angles.append(np.radians(start_angle + i * 0.83333 + 180))
# lidar_angles.append(np.radians(start_angle + end_angle))
self.lidar_distances.append(distance)
# end_angle = (data[43] << 8 | data[42]) * 0.01
# timestamp = data[45] << 8 | data[44]
# crc = data[46]
return start_angle
def lidar_data_recv(self):
if self.lidar_ser == None:
return
try:
while True:
self.header = self.lidar_ser.read(1)
if self.header == b'\x54':
# Read the rest of the data
data = self.header + self.lidar_ser.read(46)
hex_data = [int(hex(byte), 16) for byte in data]
start_angle = self.parse_lidar_frame(hex_data)
if self.last_start_angle > start_angle:
break
self.last_start_angle = start_angle
else:
self.lidar_ser.flushInput()
self.last_start_angle = start_angle
self.lidar_angles_show = self.lidar_angles.copy()
self.lidar_distances_show = self.lidar_distances.copy()
self.lidar_angles.clear()
self.lidar_distances.clear()
except Exception as e:
print(f"[base_ctrl.lidar_data_recv] error: {e}")
self.lidar_ser = serial.Serial(glob.glob('/dev/ttyACM*')[0], 230400, timeout=1)
class BaseController:
def __init__(self, uart_dev_set, buad_set):
self.ser = serial.Serial(uart_dev_set, buad_set, timeout=1)
self.rl = ReadLine(self.ser)
self.command_queue = queue.Queue()
self.command_thread = threading.Thread(target=self.process_commands, daemon=True)
self.command_thread.start()
self.base_light_status = 0
self.head_light_status = 0
self.data_buffer = None
self.base_data = None
self.use_lidar = f['base_config']['use_lidar']
self.extra_sensor = f['base_config']['extra_sensor']
def feedback_data(self):
try:
while self.rl.s.in_waiting > 0:
self.data_buffer = json.loads(self.rl.readline().decode('utf-8'))
if 'T' in self.data_buffer:
self.base_data = self.data_buffer
self.data_buffer = None
if self.base_data["T"] == 1003:
print(self.base_data)
return self.base_data
self.rl.clear_buffer()
self.data_buffer = json.loads(self.rl.readline().decode('utf-8'))
self.base_data = self.data_buffer
return self.base_data
except Exception as e:
self.rl.clear_buffer()
print(f"[base_ctrl.feedback_data] error: {e}")
def on_data_received(self):
self.ser.reset_input_buffer()
data_read = json.loads(self.rl.readline().decode('utf-8'))
return data_read
def send_command(self, data):
self.command_queue.put(data)
def process_commands(self):
while True:
data = self.command_queue.get()
self.ser.write((json.dumps(data) + '\n').encode("utf-8"))
def base_json_ctrl(self, input_json):
self.send_command(input_json)
def gimbal_emergency_stop(self):
data = {"T":0}
self.send_command(data)
def base_speed_ctrl(self, input_left, input_right):
data = {"T":1,"L":input_left,"R":input_right}
self.send_command(data)
def gimbal_ctrl(self, input_x, input_y, input_speed, input_acceleration):
data = {"T":133,"X":input_x,"Y":input_y,"SPD":input_speed,"ACC":input_acceleration}
self.send_command(data)
def gimbal_base_ctrl(self, input_x, input_y, input_speed):
data = {"T":141,"X":input_x,"Y":input_y,"SPD":input_speed}
self.send_command(data)
def base_oled(self, input_line, input_text):
data = {"T":3,"lineNum":input_line,"Text":input_text}
self.send_command(data)
def base_default_oled(self):
data = {"T":-3}
self.send_command(data)
def bus_servo_id_set(self, old_id, new_id):
# data = {"T":54,"old":old_id,"new":new_id}
data = {"T":f['cmd_config']['cmd_set_servo_id'],"raw":old_id,"new":new_id}
self.send_command(data)
def bus_servo_torque_lock(self, input_id, input_status):
# data = {"T":55,"id":input_id,"status":input_status}
data = {"T":f['cmd_config']['cmd_servo_torque'],"id":input_id,"cmd":input_status}
self.send_command(data)
def bus_servo_mid_set(self, input_id):
# data = {"T":58,"id":input_id}
data = {"T":f['cmd_config']['cmd_set_servo_mid'],"id":input_id}
self.send_command(data)
def lights_ctrl(self, pwmA, pwmB):
data = {"T":132,"IO4":pwmA,"IO5":pwmB}
self.send_command(data)
self.base_light_status = pwmA
self.head_light_status = pwmB
def base_lights_ctrl(self):
if self.base_light_status != 0:
self.base_light_status = 0
else:
self.base_light_status = 255
self.lights_ctrl(self.base_light_status, self.head_light_status)
def gimbal_dev_close(self):
self.ser.close()
def breath_light(self, input_time):
breath_start_time = time.time()
while time.time() - breath_start_time < input_time:
for i in range(0, 128, 10):
self.lights_ctrl(i, 128-i)
time.sleep(0.1)
for i in range(0, 128, 10):
self.lights_ctrl(128-i, i)
time.sleep(0.1)
self.lights_ctrl(0, 0)
if __name__ == '__main__':
# RPi5
base = BaseController('/dev/ttyAMA0', 115200)
# RPi4B
# base = BaseController('/dev/serial0', 115200)
# breath light for 15s
base.breath_light(15)
# gimble ctrl, look forward
# x y spd acc
base.gimbal_ctrl(0, 0, 10, 0)
# x(-180 ~ 180)
# x- look left
# x+ look right
# y(-30 ~ 90)
# y- look down
# y+ look up