You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
8.6 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import time
import datetime
import threading
import cv2
import cvlib
import communicate
from communicate import Subscribe, Pubilsh
from communicate import host, topic,data
vehicle_count = [0, 0, 0, 0]
nonmotor_vehicle = 0
motor_vehicle = 0
vehicle_sum = 0
total_state = [0, 0, 0, 0] # 0红灯1黄灯2绿灯
direction = 4
delta_t = 1
T = 400
T_day = 400
scale = 3 / 4
T_nightday = T_day * scale
day_time = ' 04:00:00'
night_time = ' 01:00:00'
emergency_vehicle = False
class Sub(Subscribe):
def __init__(self, host, post=1883):
super().__init__(host, post)
self.T_sunny = 400
self.T_extreme = 500
# def adjustment(self, time, mode):
# """
# 统一调整时间
# :param time:调整的时间
# :param mode: 1为增加时间-1为减少时间
# """
# global T, T_day, T_nightday
# time1 = time * mode
# T = T + time1
# T_day = T_day + time1
# T_nightday = T_nightday + time1
def on_message(self, client, userdata, msg):
global T, T_day, T_nightday
print('--' * 8)
print(msg.topic)
string = str(msg.payload, 'utf-8')
if string != 'close':
if msg.topic == topic['weather']:
if string == "sunny":
T = T_day = self.T_sunny
else:
T = T_day = self.T_extreme
T_nightday = T_day * scale
elif msg.topic == topic['cont']:
global nonmotor_vehicle, motor_vehicle
nonmotor_vehicle += ()
motor_vehicle += ()
class Time():
def __init__(self, day_time=86400):
self.day_time = day_time
def loop(self, time_line, time):
now_time = datetime.datetime.now()
# 获取明天时间
next_time = now_time + datetime.timedelta(days=+1)
"""
请注意输入如格式\' 11:32:00','分-秒-毫秒','且前面要放一个空格
"""
next_time = datetime.datetime.strptime(
str(next_time.date().year) + "-" + str(next_time.date().month) + "-" + str(
next_time.date().day) + time_line, "%Y-%m-%d %H:%M:%S")
timer_start_time = (next_time - now_time).total_seconds()
if timer_start_time > self.day_time:
timer_start_time -= self.day_time
print('timer_start_time', timer_start_time)
# 定时器,参数为(多少时间后执行,单位为秒,执行的方法)
timer = threading.Timer(timer_start_time, self.func, args=(time,))
timer.start()
def func(self, time):
# 如果需要循环调用,就要添加以下方法
print('改变T')
global T
T = time
timer = threading.Timer(self.day_time, self.func, args=(time,))
timer.start()
class Calculate():
"""
得到全局变量:
nonmotor_vehicle, motor_vehiclevehicle_sum
"""
def init(self):
global nonmotor_vehicle, motor_vehicle
nonmotor_vehicle = motor_vehicle = 0
def count(self):
global nonmotor_vehicle, motor_vehicle
# TODO:视频检测
lo = cv2.imread(r'D:\workspace\Traffic-Signal-Scheduling\Trial1\13.jpg')
bbox, label, conf = cvlib.detect_common_objects(lo)
# output_image = draw_bbox(lo, bbox, label, conf)
nonmotor_vehicle = label.count('person') + label.count('motorcycle')
motor_vehicle = label.count('car') + label.count('truck')
return motor_vehicle, nonmotor_vehicle
def pubilsh_num(self):
pulish = Pubilsh(host)
motor_vehicle, nomotor_vehicle = self.count()
data = [motor_vehicle, nomotor_vehicle]
pulish.pub_message(data, topic['cont'])
def total_count(self):
motor, nonmotor = self.count()
global vehicle_sum, nonmotor_vehicle, motor_vehicle
# get_num 以开启用于接收其他检测数据的线程
nonmotor_vehicle += nonmotor
motor_vehicle += motor
vehicle_sum = nonmotor_vehicle + motor_vehicle
self.init()
class Algorithm():
def __init__(self, T=400, delta_t=3):
self.delta_t = delta_t
self.sleep_time = delta_t
self.time1 = 0
self.time_consumption = 0
self.T = T
self.T_init = T / 4
self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4]
self.T_statistical = [self.T / 4, self.T / 4, self.T / 4, self.T / 4]
self.red = 'R'
self.yellow = 1
self.green = 'G'
self.yellow_time = delta_t
self.cal = Calculate()
self.time_switch = True
self.T_nightday = 300
self.T_day = 400
self.weather_switch = True
self.extreme_weather_time = 100
self.emergency_vehicle_switch = True
def update_init(self):
self.T = T
self.T_init = T / 4
self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4]
def sleep(self, time1):
print('进入{}s等待时间'.format(self.sleep_time))
while (time.time() - time1) <= self.sleep_time:
pass
def pub(self, host, topic, data):
p = Pubilsh(host)
p.pub_message(data, topic)
def sub(self, host, topic):
s = Subscribe.Subscribe(host)
s.sub(topic)
def signaling(self, i, state, time=None, host=host, topic=topic['count']):
#0红灯1黄灯2绿灯
print('{},{}个灯变成状态{}'.format(i, (i + 2) % 4, state))
now_data = {
"CrossroadsMessageID": i, # 红绿灯路口号
"TrafficLightStatus": state,
}
data.update(now_data)
self.pub(host, topic, data)
# total_state[i] = state
# total_state[(i+2)%4] = state
def calculate_time(self, i):
self.time_consumption = time.time() - self.time1
print('vehicle_count',vehicle_count)
return (vehicle_count[i] + vehicle_count[(i + 2) % 4] / vehicle_sum) * (self.T - self.time_consumption)
def update(self):
pass
# print('--' * 8)
# print('更新之前各个方向灯的状态', self.T_direction)
# for i in range(direction):
# self.T_direction[i] = self.calculate_time(i)
# print('更新之后各个方向灯的状态', self.T_direction)
# print('--' * 8)
def transform_single(self, i):
"""
更新所有绿灯时间
信号灯i:绿->黄->3s->红
信号灯i+1红->绿
"""
self.update()
self.signaling(i, self.yellow, self.yellow_time)
time.sleep(self.yellow_time)
self.signaling(i, self.red)
self.signaling((i + 1) % direction, self.green, self.T_direction[(i + 1) % direction])
def weather(self):
subscribe = Sub(host)
weather_threading = threading.Thread(target=subscribe.sub, args=("data/weather",))
weather_threading.start()
def time(self):
day = Time()
day.loop(day_time, T_day)
night = Time()
night.loop(night_time, T_nightday)
def emergency_vehicle(self):
pass
# TODO 在硬件处直接异步控制???
def get_num(self):
sub = Subscribe(host)
num_sub = threading.Thread(target=sub.sub, args=(topic['count'],))
num_sub.start()
def distribution_time(self):
if self.time_switch:
self.time()
if self.emergency_vehicle_switch:
self.emergency_vehicle()
if self.weather_switch:
self.weather()
self.get_num()
self.signaling(0, self.green, self.T_init)
while True:
for i in range(direction):
T_remain = self.T_direction[i]
while True:
self.time1 = time.time()
self.sleep(self.time1)
# TODO此处需要考虑计算时间
self.cal.total_count()
T_remain -= (time.time() - self.time1)
T_now_remain = self.calculate_time(i)
if T_remain >= self.delta_t:
if T_now_remain <= T_remain:
T_remain = T_now_remain
self.update()
else:
self.transform_single(i)
break
self.update_init()
def main():
a = Algorithm(T, delta_t)
a.distribution_time()
if __name__ == '__main__':
main()