init route and TL setting func

main
alexha 19 hours ago
parent 3c02dcde54
commit fcd2b1095e

@ -1,2 +1,100 @@
# route_TLset
以下是为两个 Python 脚本编写的 `README.md` 文件,包含红绿灯控制脚本 (`cyber_trafficlight.py`) 和车道跟随命令脚本 (`cyber_routing.py`) 的详细说明。
```markdown
# Apollo 外部命令控制脚本
本仓库提供两个用于 Apollo 9.0 仿真环境的 Python 脚本,分别用于手动控制红绿灯状态和发送车道跟随命令,以测试规划与控制模块的响应。
## 环境要求
- Apollo 9.0 Docker 容器环境(已安装 Cyber RT 及相关模块)
- Python 3.6+
- 已启动 Dreamview 或相关 Cyber 节点(`Planning`、`Control`、`external_command_processor` 等)
## 脚本说明
### 1. 红绿灯控制脚本 (`cyber_trafficlight.py`)
**功能**
`/apollo/perception/traffic_light` 话题周期性发送红绿灯检测消息,手动切换红绿灯状态,用于仿真中测试车辆在红灯前的停车及绿灯后的起步行为。
**消息协议**
- 使用 `modules/common_msgs/perception_msgs/traffic_light_detection.proto`
- 每个 `TrafficLight` 消息包含 `id`(地图中的信号灯 ID、`color`、`tracking_time` 等字段
- 支持同时控制多个红绿灯 ID
**使用方法**
```bash
python cyber_trafficlight.py
```
运行后按提示输入数字:
- `1` → 将所有指定红绿灯设为红灯,车辆应停车
- `2` → 设为绿灯,车辆应通行
- `3` → 退出程序
**配置**
在脚本开头的 `SIGNAL_IDS` 列表中,可修改需要控制的红绿灯 ID必须与地图 `base_map` 中的 `signal.id` 一致)。
---
### 2. 车道跟随命令脚本 (`cyber_routing.py`)
**功能**
通过 Cyber 客户端向 `/apollo/external_command/lane_follow` 服务发送 `LaneFollowCommand`命令车辆从指定起点行驶至终点。不手动设置车辆朝向heading由 Apollo 自动匹配最近车道的切线方向。
**消息协议**
- 使用 `modules/common_msgs/external_command_msgs/lane_follow_command.proto`
- 必需的字段:`command_id`、`is_start_pose_set`、`way_point`(起点)、`end_pose`(终点)
**使用方法**
1. 确认 `external_command_processor` 已运行(通常跟随 Planning 模块启动)。
2. 执行脚本:
```bash
python cyber_routing.py
```
3. 脚本会输出发送的命令及服务响应状态,若响应 `status=1`RUNNING车辆即开始沿车道行驶。
**坐标输入**
脚本中使用 `parse_coordinate()` 函数解析形如 `"(x,y)"` 的字符串,可方便地更换起点/终点坐标。示例:
```python
start_x, start_y = parse_coordinate("(587107.2299980448,4141578.018806449)")
end_x, end_y = parse_coordinate("(587010.5463351277,4141603.8543796046)")
```
---
## 常见问题
### 红绿灯脚本发送后车辆无响应
- 检查话题名称是否正确:`/apollo/perception/traffic_light`
- 确认 Planning/Control 模块已启动,且红绿灯 ID 与地图中的 `signal.id` 完全匹配(注意前缀 `signal_`
- 查看 Planning 日志中是否有 `stop by TL_xxx` 的决策输出
### 车道跟随命令返回 ERROR
- 最常见原因:缺少 `routing_map` 文件。执行以下命令生成(替换为实际地图路径):
```bash
./scripts/generate_routing_topo_graph.sh --map_dir /path/to/map/directory
```
- 检查起点/终点坐标是否落在可行驶车道上(可使用 `query_position.py` 工具验证)
- 确认 `external_command_processor` 模块已运行:
```bash
cyber_channel list | grep external_command
```
### 响应状态码含义
- `0` → STATUS_UNKNOWN未知
- `1` → STATUS_RUNNING执行中
- `2` → STATUS_FINISHED已完成
- `3` → STATUS_ERROR失败查看日志
## 日志查看
- 红绿灯相关日志:`tail -f /apollo/data/log/planning.INFO`
- 外部命令日志:`tail -f /apollo/data/log/external_command.INFO`
## 许可证
本脚本仅用于 Apollo 仿真学习与测试,遵循 Apollo 项目所使用的 Apache 2.0 许可证。
```
该 README 文件可直接保存为 `README.md`,与两个 Python 脚本放在同一目录下。

@ -0,0 +1,64 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from cyber.python.cyber_py3 import cyber
from cyber.python.cyber_py3 import cyber_time
from modules.common_msgs.external_command_msgs import lane_follow_command_pb2
from modules.common_msgs.external_command_msgs import command_status_pb2
import time
import random
def parse_coordinate(coord_str):
cleaned = coord_str.strip().strip('()')
parts = cleaned.split(',')
if len(parts) != 2:
raise ValueError(f"Invalid coordinate format: {coord_str}")
x = float(parts[0].strip())
y = float(parts[1].strip())
return x, y
def main():
cyber.init()
node = cyber.Node("lane_follow_requester")
cmd = lane_follow_command_pb2.LaneFollowCommand()
cmd.header.timestamp_sec = cyber_time.Time.now().to_sec()
cmd.header.module_name = "manual_command"
cmd.header.sequence_num = 1
cmd.command_id = random.randint(1, 2**31 - 1)
cmd.is_start_pose_set = True
# 使用解析函数设置起点和终点坐标
start_x, start_y = parse_coordinate("(587107.2299980448,4141578.018806449)")
end_x, end_y = parse_coordinate("(587010.5463351277,4141603.8543796046)")
start_pose = cmd.way_point.add()
start_pose.x = start_x
start_pose.y = start_y
cmd.end_pose.x = end_x
cmd.end_pose.y = end_y
client = node.create_client(
'/apollo/external_command/lane_follow',
lane_follow_command_pb2.LaneFollowCommand,
command_status_pb2.CommandStatus
)
time.sleep(1.0)
print("Sending LaneFollowCommand:")
print(cmd)
response = client.send_request(cmd)
if response is not None:
print("Response status:")
print(f" - command_id: {response.command_id}")
print(f" - status: {response.status}")
print(f" - message: {response.message}")
else:
print("No response received. Make sure external_command_processor is running.")
cyber.shutdown()
if __name__ == '__main__':
main()

@ -0,0 +1,66 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from cyber.python.cyber_py3 import cyber
from cyber.python.cyber_py3 import cyber_time
import modules.common_msgs.perception_msgs.traffic_light_detection_pb2 as traffic_light_detection_pb2
import threading
import time
# 需要控制的红绿灯ID列表必须与地图中的signal.id一致
SIGNAL_IDS = ["signal_14", "signal_13", "signal_0","signal_9"]
def add_forward_lights(color, traffic_light_pb, signal_ids):
"""为指定的多个红绿灯ID添加相同颜色的消息"""
for sid in signal_ids:
light = traffic_light_pb.traffic_light.add()
light.id = sid
light.color = color
light.tracking_time = 10.0
seq_num = 0
def add_header(msg):
global seq_num
msg.header.sequence_num = seq_num
msg.header.timestamp_sec = cyber_time.Time.now().to_sec()
msg.header.module_name = "manual_traffic_light"
seq_num = seq_num + 1
def pub_func(writer):
while not cyber.is_shutdown():
global traffic_light_msg
add_header(traffic_light_msg)
writer.write(traffic_light_msg)
time.sleep(0.1)
traffic_light_msg = None
if __name__ == '__main__':
traffic_light_msg = traffic_light_detection_pb2.TrafficLightDetection()
cyber.init()
node = cyber.Node("traffic_light_command")
writer = node.create_writer(
"/apollo/perception/traffic_light",
traffic_light_detection_pb2.TrafficLightDetection)
thread = threading.Thread(target=pub_func, args=(writer,))
thread.start()
while not cyber.is_shutdown():
m = input("1: forward red 2: forward green 3: exit\n")
if m == '1':
traffic_light_msg.ClearField('traffic_light')
add_forward_lights(traffic_light_detection_pb2.TrafficLight.RED,
traffic_light_msg, SIGNAL_IDS)
print("Set all signals to RED")
elif m == '2':
traffic_light_msg.ClearField('traffic_light')
add_forward_lights(traffic_light_detection_pb2.TrafficLight.GREEN,
traffic_light_msg, SIGNAL_IDS)
print("Set all signals to GREEN")
elif m == '3':
break
else:
print("Invalid input")
cyber.shutdown()
Loading…
Cancel
Save