# 接收机,相当于本机(192.168.49.10,端口号为 5010) #,上行报文指接收机发送给上位机 import tkinter as tk from threading import Thread import socket import time class SendApp: def __init__(self, window): self.window = window self.window.title("Send端") # 发送区域 self.send_frame = tk.Frame(window) self.send_frame.pack(padx=10, pady=10) self.send_label = tk.Label(self.send_frame, text="发送数据") self.send_label.pack(side=tk.LEFT) self.send_entry = tk.Entry(self.send_frame) self.send_entry.pack(side=tk.LEFT) self.send_button = tk.Button(self.send_frame, text="发送", command=self.send_message) self.send_button.pack(side=tk.LEFT) # 接收区域 self.recv_frame = tk.Frame(window) self.recv_frame.pack(padx=10, pady=10) self.recv_text = tk.Text(self.recv_frame, height=40, width=50) self.recv_text.pack(expand=True, fill="both") #创建udp对象 self.sk = socket.socket(type=socket.SOCK_DGRAM) self.sk.bind(("127.0.0.1", 5010)) # 开始接收数据的线程 self.thread = Thread(target=self.receive_data) #运行解析下行报文函数 self.thread.daemon = True self.thread.start() def send_message(self): msg = self.send_entry.get() #发送数据 self.sk.sendto(msg.encode(), ("127.0.0.1", 9000)) time.sleep(1) def receive_data(self): while True: #接收数据 msg, addr = self.sk.recvfrom(1024) message_to_display = "Received: " + msg.decode() + " from " + str(addr) self.recv_text.insert(tk.END, message_to_display + "\n") self.analysis_message(msg) # 下行报文18字节固定长度 """帧格式: 0~3 帧头 BYTE[8:10] 4 固定为0x58443341 4 指令类型 UINT8 1 0x00:开始停止 0x01:工况查询 0x02:设备重置 0x03:授时指令 0x04:频率设置 0x05:本机地址设置 0x06:本机位置设置 0x07:开启关闭测试模式 5~14 指令内容 BYTE[10] 10 根据指令类型填充,当指令类型为0x01、0x02时可不填充 15~17 帧尾 BYTE[3] 3 固定为0x334441 """ #解析下行报文 def analysis_message(self, msg): getmessage = msg.decode() #如果帧头不是0x58443341,则报文错误 if getmessage[0:8] != "58443341": first_type = "错误类型:帧头错误" self.recv_text.insert(tk.END, first_type + "\n") return #解析指令类型 #指令为00,开始停止 if getmessage[8:10] == "00": instruction_type = "指令类型:开始停止" self.recv_text.insert(tk.END, instruction_type + "\n") if getmessage[10:14] == "AAAA": start_stop = "开始上传解调数据" self.recv_text.insert(tk.END, start_stop + "\n") elif getmessage[10:14] == "5555": start_stop = "停止传输解调数据" self.recv_text.insert(tk.END, start_stop + "\n") else: start_stop = "指令" self.recv_text.insert(tk.END, start_stop + "\n") elif getmessage[8:10] == "01": instruction_type = "指令类型:工况查询" self.recv_text.insert(tk.END, instruction_type + "\n") elif getmessage[8:10] == "02": instruction_type = "指令类型:设备重置" self.recv_text.insert(tk.END, instruction_type + "\n") elif getmessage[8:10] == "03": instruction_type = "指令类型:授时指令" self.recv_text.insert(tk.END, instruction_type + "\n") """ 授时指令内容: 0 年 UINT8 1 00-99,对应0x00-0x63,基数为2000,例如:2001年对应0x01,2099年对应0x63。 1 月 UINT8 1 01-12,对应0x01-0x0C 2 日 UINT8 1 01-31,对应0x01-0x1F 3 时 UINT8 1 00-23,对应0x00-0x17 4 分 UINT8 1 00-59,对应0x00-0x3B 5 秒 UINT8 1 00-59,对应0x00-0x3B 6-9 保留 UINT8 4 """ year = int(getmessage[10:12],16)+2000 month = int(getmessage[12:14],16) day = int(getmessage[14:16],16) hour = int(getmessage[16:18],16) minute = int(getmessage[18:20],16) second = int(getmessage[20:22],16) self.recv_text.insert(tk.END, "设置时间为: " + str(year) + "年" + str(month) + "月" + str(day) + "日" + str(hour) + "时" + str(minute) + "分" + str(second) + "秒\n") elif getmessage[8:10] == "04": instruction_type = "指令类型:频率设置" self.recv_text.insert(tk.END, instruction_type + "\n") """ 0 通道号 UINT8 1 0-3,表示ACARS的通道0-3 1-4 频率 UINT32 4 单位KHz 5-9 保留 """ channel = int(getmessage[10:12],16) get_frequency = getmessage[12:18] byte_string = bytes.fromhex(get_frequency) frequency = int.from_bytes(byte_string, byteorder='little') self.recv_text.insert(tk.END, "ACARS通道" + str(channel) + "的频率为" + str(frequency) + "KHz\n") elif getmessage[8:10] == "05": instruction_type = "指令类型:本机地址设置" self.recv_text.insert(tk.END, instruction_type + "\n") """ 0-3 本机地址 UINT32 4 四个字节,分别对应第一到第四段号码 4-9 保留 """ #四个字节,分别对应第一到第四段号码 address1 = int(getmessage[10:12],16) address2 = int(getmessage[12:14],16) address3 = int(getmessage[14:16],16) address4 = int(getmessage[16:18],16) ip_address = str(address1)+"."+str(address2)+"."+str(address3)+"."+str(address4) self.recv_text.insert(tk.END, "本机地址为" + str(ip_address) + "\n") elif getmessage[8:10] == "06": instruction_type = "指令类型:本机位置设置" self.recv_text.insert(tk.END, instruction_type + "\n") """ 0-3 经度 UINT32 4 单位:0.0001度,范围-90.000090.0000,北纬为正,南纬为负。如0xFFFB4D4D表示南纬30.7891度。0xFFFF表示无效。 4-7 纬度 UINT32 4 单位:0.0001度,范围-180.0000180.0000。东经为正,西经为负。如0x001128D7表示东经112.4567度。0xFFFF表示无效: 8-9 保留 """ get_longitude = getmessage[10:18] if get_longitude[0:3] == "FFFF": longitude = "无效" else: byte_string = bytes.fromhex(get_longitude) longitude = int.from_bytes(byte_string, byteorder='little',signed=True) if(longitude < 0): long_tag = "南纬: " longitude = str((0-longitude)/10000) else: long_tag = "北纬: " longitude = str((longitude)/10000) get_latitude = getmessage[18:26] if get_latitude[0:3] == "FFFF": latitude = "无效" else: byte_string = bytes.fromhex(get_latitude) latitude = int.from_bytes(byte_string, byteorder='little',signed=True) if(latitude < 0): lat_tag = " 西经: " latitude = str((0-latitude)/10000) else: lat_tag = " 东经:" latitude = str((latitude)/10000) if(longitude == "无效" or latitude == "无效"): self.recv_text.insert(tk.END, "无效位置\n") else: self.recv_text.insert(tk.END, "本机位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(latitude) + "\n") elif getmessage[8:10] == "07": instruction_type = "指令类型:开启关闭测试模式" self.recv_text.insert(tk.END, instruction_type + "\n") if getmessage[10:14] == "AAAA": start_stop = "开启测试模式" self.recv_text.insert(tk.END, start_stop + "\n") elif getmessage[10:14] == "5555": start_stop = "关闭测试模式" self.recv_text.insert(tk.END, start_stop + "\n") else: start_stop = "开关测试指令错误" self.recv_text.insert(tk.END, start_stop + "\n") else: instruction_type = "指令类型:错误" self.recv_text.insert(tk.END, instruction_type + "\n") #如果帧尾不是0x334441,则报文错误 if getmessage[30:36] != "334441": last_type = "错误类型:帧尾错误" self.recv_text.insert(tk.END, last_type + "\n") return if __name__ == "__main__": root = tk.Tk() app = SendApp(root) root.mainloop()