diff --git a/signal_center_1.py b/signal_center_1.py new file mode 100644 index 0000000..61211a9 --- /dev/null +++ b/signal_center_1.py @@ -0,0 +1,184 @@ +import time +import datetime +import threading + + +vehicle_count = [0, 0, 0, 0] +nonmotor_vehicle = 0 +motor_vehicle = 0 +vehicle_sum = 0 +total_state = [0, 0, 0, 0] # 0红灯,1黄灯,2绿灯 +direction = 4 + +delta_t = 1 +T = 400 +T_day = 400 +scale = 3 / 4 +T_nightday = T_day * scale + +day_time = ' 04:00:00' +night_time = ' 01:00:00' + +emergency_vehicle = False + + +class Time(): + def __init__(self, day_time=86400): + self.day_time = day_time + + def loop(self, time_line, time): + now_time = datetime.datetime.now() + # 获取明天时间 + next_time = now_time + datetime.timedelta(days=+1) + """ + 请注意输入如格式\' 11:32:00','分-秒-毫秒','且前面要放一个空格 + """ + next_time = datetime.datetime.strptime( + str(next_time.date().year) + "-" + str(next_time.date().month) + "-" + str( + next_time.date().day) + time_line, "%Y-%m-%d %H:%M:%S") + + timer_start_time = (next_time - now_time).total_seconds() + if timer_start_time > self.day_time: + timer_start_time -= self.day_time + print('timer_start_time', timer_start_time) + # 定时器,参数为(多少时间后执行,单位为秒,执行的方法) + timer = threading.Timer(timer_start_time, self.func, args=(time,)) + timer.start() + + def func(self, time): + # 如果需要循环调用,就要添加以下方法 + print('改变T') + global T + T = time + timer = threading.Timer(self.day_time, self.func, args=(time,)) + timer.start() + + +class Calculate(): + """ + 得到全局变量: + nonmotor_vehicle, motor_vehicle,vehicle_sum + """ + + def init(self): + global nonmotor_vehicle, motor_vehicle + nonmotor_vehicle = motor_vehicle = 0 + + def count(self): + pass + + + def total_count(self): + motor, nonmotor = self.count() + global vehicle_sum, nonmotor_vehicle, motor_vehicle + # get_num 以开启用于接收其他检测数据的线程 + nonmotor_vehicle += nonmotor + motor_vehicle += motor + vehicle_sum = nonmotor_vehicle + motor_vehicle + self.init() + + +class Algorithm(): + def __init__(self, T=400, delta_t=3): + self.delta_t = delta_t + self.sleep_time = delta_t + self.time1 = 0 + self.time_consumption = 0 + self.T = T + self.T_init = T / 4 + self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4] + self.T_statistical = [self.T / 4, self.T / 4, self.T / 4, self.T / 4] + self.red = 'R' + self.yellow = 1 + self.green = 'G' + self.yellow_time = delta_t + self.cal = Calculate() + + self.time_switch = True + self.T_nightday = 300 + self.T_day = 400 + + self.weather_switch = True + self.extreme_weather_time = 100 + + self.emergency_vehicle_switch = True + + def update_init(self): + self.T = T + self.T_init = T / 4 + self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4] + + def sleep(self, time1): + print('进入{}s等待时间'.format(self.sleep_time)) + while (time.time() - time1) <= self.sleep_time: + pass + + + def calculate_time(self, i): + self.time_consumption = time.time() - self.time1 + print('vehicle_count',vehicle_count) + return (vehicle_count[i] + vehicle_count[(i + 2) % 4] / vehicle_sum) * (self.T - self.time_consumption) + + def update(self): + pass + + def transform_single(self, i): + """ + 更新所有绿灯时间 + 信号灯i:绿->黄->3s->红 + 信号灯i+1:红->绿 + """ + self.update() + self.signaling(i, self.yellow, self.yellow_time) + time.sleep(self.yellow_time) + self.signaling(i, self.red) + self.signaling((i + 1) % direction, self.green, self.T_direction[(i + 1) % direction]) + + + def time(self): + day = Time() + day.loop(day_time, T_day) + night = Time() + night.loop(night_time, T_nightday) + + def emergency_vehicle(self): + pass + # TODO 在硬件处直接异步控制??? + + def distribution_time(self): + if self.time_switch: + self.time() + if self.emergency_vehicle_switch: + self.emergency_vehicle() + if self.weather_switch: + self.weather() + self.get_num() + self.signaling(0, self.green, self.T_init) + while True: + for i in range(direction): + T_remain = self.T_direction[i] + while True: + self.time1 = time.time() + self.sleep(self.time1) + # TODO:此处需要考虑计算时间 + self.cal.total_count() + T_remain -= (time.time() - self.time1) + T_now_remain = self.calculate_time(i) + + if T_remain >= self.delta_t: + if T_now_remain <= T_remain: + T_remain = T_now_remain + self.update() + else: + self.transform_single(i) + break + self.update_init() + + +def main(): + a = Algorithm(T, delta_t) + a.distribution_time() + + +if __name__ == '__main__': + main()