diff --git a/signal_center_1.py b/signal_center_1.py deleted file mode 100644 index 61211a9..0000000 --- a/signal_center_1.py +++ /dev/null @@ -1,184 +0,0 @@ -import time -import datetime -import threading - - -vehicle_count = [0, 0, 0, 0] -nonmotor_vehicle = 0 -motor_vehicle = 0 -vehicle_sum = 0 -total_state = [0, 0, 0, 0] # 0红灯,1黄灯,2绿灯 -direction = 4 - -delta_t = 1 -T = 400 -T_day = 400 -scale = 3 / 4 -T_nightday = T_day * scale - -day_time = ' 04:00:00' -night_time = ' 01:00:00' - -emergency_vehicle = False - - -class Time(): - def __init__(self, day_time=86400): - self.day_time = day_time - - def loop(self, time_line, time): - now_time = datetime.datetime.now() - # 获取明天时间 - next_time = now_time + datetime.timedelta(days=+1) - """ - 请注意输入如格式\' 11:32:00','分-秒-毫秒','且前面要放一个空格 - """ - next_time = datetime.datetime.strptime( - str(next_time.date().year) + "-" + str(next_time.date().month) + "-" + str( - next_time.date().day) + time_line, "%Y-%m-%d %H:%M:%S") - - timer_start_time = (next_time - now_time).total_seconds() - if timer_start_time > self.day_time: - timer_start_time -= self.day_time - print('timer_start_time', timer_start_time) - # 定时器,参数为(多少时间后执行,单位为秒,执行的方法) - timer = threading.Timer(timer_start_time, self.func, args=(time,)) - timer.start() - - def func(self, time): - # 如果需要循环调用,就要添加以下方法 - print('改变T') - global T - T = time - timer = threading.Timer(self.day_time, self.func, args=(time,)) - timer.start() - - -class Calculate(): - """ - 得到全局变量: - nonmotor_vehicle, motor_vehicle,vehicle_sum - """ - - def init(self): - global nonmotor_vehicle, motor_vehicle - nonmotor_vehicle = motor_vehicle = 0 - - def count(self): - pass - - - def total_count(self): - motor, nonmotor = self.count() - global vehicle_sum, nonmotor_vehicle, motor_vehicle - # get_num 以开启用于接收其他检测数据的线程 - nonmotor_vehicle += nonmotor - motor_vehicle += motor - vehicle_sum = nonmotor_vehicle + motor_vehicle - self.init() - - -class Algorithm(): - def __init__(self, T=400, delta_t=3): - self.delta_t = delta_t - self.sleep_time = delta_t - self.time1 = 0 - self.time_consumption = 0 - self.T = T - self.T_init = T / 4 - self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4] - self.T_statistical = [self.T / 4, self.T / 4, self.T / 4, self.T / 4] - self.red = 'R' - self.yellow = 1 - self.green = 'G' - self.yellow_time = delta_t - self.cal = Calculate() - - self.time_switch = True - self.T_nightday = 300 - self.T_day = 400 - - self.weather_switch = True - self.extreme_weather_time = 100 - - self.emergency_vehicle_switch = True - - def update_init(self): - self.T = T - self.T_init = T / 4 - self.T_direction = [self.T / 4, self.T / 4, self.T / 4, self.T / 4] - - def sleep(self, time1): - print('进入{}s等待时间'.format(self.sleep_time)) - while (time.time() - time1) <= self.sleep_time: - pass - - - def calculate_time(self, i): - self.time_consumption = time.time() - self.time1 - print('vehicle_count',vehicle_count) - return (vehicle_count[i] + vehicle_count[(i + 2) % 4] / vehicle_sum) * (self.T - self.time_consumption) - - def update(self): - pass - - def transform_single(self, i): - """ - 更新所有绿灯时间 - 信号灯i:绿->黄->3s->红 - 信号灯i+1:红->绿 - """ - self.update() - self.signaling(i, self.yellow, self.yellow_time) - time.sleep(self.yellow_time) - self.signaling(i, self.red) - self.signaling((i + 1) % direction, self.green, self.T_direction[(i + 1) % direction]) - - - def time(self): - day = Time() - day.loop(day_time, T_day) - night = Time() - night.loop(night_time, T_nightday) - - def emergency_vehicle(self): - pass - # TODO 在硬件处直接异步控制??? - - def distribution_time(self): - if self.time_switch: - self.time() - if self.emergency_vehicle_switch: - self.emergency_vehicle() - if self.weather_switch: - self.weather() - self.get_num() - self.signaling(0, self.green, self.T_init) - while True: - for i in range(direction): - T_remain = self.T_direction[i] - while True: - self.time1 = time.time() - self.sleep(self.time1) - # TODO:此处需要考虑计算时间 - self.cal.total_count() - T_remain -= (time.time() - self.time1) - T_now_remain = self.calculate_time(i) - - if T_remain >= self.delta_t: - if T_now_remain <= T_remain: - T_remain = T_now_remain - self.update() - else: - self.transform_single(i) - break - self.update_init() - - -def main(): - a = Algorithm(T, delta_t) - a.distribution_time() - - -if __name__ == '__main__': - main()