ADD file via upload

master
ponfi9ftq 3 years ago
parent 49a914d0b7
commit 7077c1ba6e

@ -0,0 +1,184 @@
import time
import datetime
import threading
vehicle_count = [0, 0, 0, 0]
nonmotor_vehicle = 0
motor_vehicle = 0
vehicle_sum = 0
total_state = [0, 0, 0, 0] # 0红灯1黄灯2绿灯
direction = 4
delta_t = 1
T = 400
T_day = 400
scale = 3 / 4
T_nightday = T_day * scale
day_time = ' 04:00:00'
night_time = ' 01:00:00'
emergency_vehicle = False
class Time():
def __init__(self, day_time=86400):
self.day_time = day_time
def loop(self, time_line, time):
now_time = datetime.datetime.now()
# 获取明天时间
next_time = now_time + datetime.timedelta(days=+1)
"""
请注意输入如格式\' 11:32:00','分-秒-毫秒','且前面要放一个空格
"""
next_time = datetime.datetime.strptime(
str(next_time.date().year) + "-" + str(next_time.date().month) + "-" + str(
next_time.date().day) + time_line, "%Y-%m-%d %H:%M:%S")
timer_start_time = (next_time - now_time).total_seconds()
if timer_start_time > self.day_time:
timer_start_time -= self.day_time
print('timer_start_time', timer_start_time)
# 定时器,参数为(多少时间后执行,单位为秒,执行的方法)
timer = threading.Timer(timer_start_time, self.func, args=(time,))
timer.start()
def func(self, time):
# 如果需要循环调用,就要添加以下方法
print('改变T')
global T
T = time
timer = threading.Timer(self.day_time, self.func, args=(time,))
timer.start()
class Calculate():
"""
得到全局变量
nonmotor_vehicle, motor_vehiclevehicle_sum
"""
def init(self):
global nonmotor_vehicle, motor_vehicle
nonmotor_vehicle = motor_vehicle = 0
def count(self):
pass
def total_count(self):
motor, nonmotor = self.count()
global vehicle_sum, nonmotor_vehicle, motor_vehicle
# get_num 以开启用于接收其他检测数据的线程
nonmotor_vehicle += nonmotor
motor_vehicle += motor
vehicle_sum = nonmotor_vehicle + motor_vehicle
self.init()
class Algorithm():
def __init__(self, T=400, delta_t=3):
self.delta_t = delta_t
self.sleep_time = delta_t
self.time1 = 0
self.time_consumption = 0
self.T = T
self.T_init = T / 4
self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4]
self.T_statistical = [self.T / 4, self.T / 4, self.T / 4, self.T / 4]
self.red = 'R'
self.yellow = 1
self.green = 'G'
self.yellow_time = delta_t
self.cal = Calculate()
self.time_switch = True
self.T_nightday = 300
self.T_day = 400
self.weather_switch = True
self.extreme_weather_time = 100
self.emergency_vehicle_switch = True
def update_init(self):
self.T = T
self.T_init = T / 4
self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4]
def sleep(self, time1):
print('进入{}s等待时间'.format(self.sleep_time))
while (time.time() - time1) <= self.sleep_time:
pass
def calculate_time(self, i):
self.time_consumption = time.time() - self.time1
print('vehicle_count',vehicle_count)
return (vehicle_count[i] + vehicle_count[(i + 2) % 4] / vehicle_sum) * (self.T - self.time_consumption)
def update(self):
pass
def transform_single(self, i):
"""
更新所有绿灯时间
信号灯i:绿->->3s->
信号灯i+1->绿
"""
self.update()
self.signaling(i, self.yellow, self.yellow_time)
time.sleep(self.yellow_time)
self.signaling(i, self.red)
self.signaling((i + 1) % direction, self.green, self.T_direction[(i + 1) % direction])
def time(self):
day = Time()
day.loop(day_time, T_day)
night = Time()
night.loop(night_time, T_nightday)
def emergency_vehicle(self):
pass
# TODO 在硬件处直接异步控制???
def distribution_time(self):
if self.time_switch:
self.time()
if self.emergency_vehicle_switch:
self.emergency_vehicle()
if self.weather_switch:
self.weather()
self.get_num()
self.signaling(0, self.green, self.T_init)
while True:
for i in range(direction):
T_remain = self.T_direction[i]
while True:
self.time1 = time.time()
self.sleep(self.time1)
# TODO此处需要考虑计算时间
self.cal.total_count()
T_remain -= (time.time() - self.time1)
T_now_remain = self.calculate_time(i)
if T_remain >= self.delta_t:
if T_now_remain <= T_remain:
T_remain = T_now_remain
self.update()
else:
self.transform_single(i)
break
self.update_init()
def main():
a = Algorithm(T, delta_t)
a.distribution_time()
if __name__ == '__main__':
main()
Loading…
Cancel
Save