You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
12 KiB

8 months ago
# 接收机相当于本机192.168.49.10,端口号为 5010
8 months ago
#,上行报文指接收机发送给上位机
import tkinter as tk
from threading import Thread
8 months ago
import socket
import time
8 months ago
8 months ago
class SendApp:
def __init__(self, window):
self.window = window
self.window.title("Send端")
8 months ago
8 months ago
# 创建发送区域的主框架
8 months ago
self.send_frame = tk.Frame(window)
self.send_frame.pack(padx=10, pady=10)
8 months ago
8 months ago
# 创建专门的发送数据子框架
self.send_data_frame = tk.Frame(self.send_frame)
self.send_data_frame.pack(side=tk.TOP, fill=tk.X, expand=True)
8 months ago
8 months ago
self.send_label = tk.Label(self.send_data_frame, text="发送数据")
self.send_label.pack(side=tk.LEFT)
self.send_entry = tk.Entry(self.send_data_frame, width=90)
8 months ago
self.send_entry.pack(side=tk.LEFT)
8 months ago
# 输入帧头
self.first_message_label = tk.Label(self.send_frame, text="帧头:")
self.first_message_label.pack(side=tk.LEFT)
self.first_message_entry = tk.Entry(self.send_frame)
self.first_message_entry.pack(side=tk.LEFT)
# 流水号
self.flow_msg_label = tk.Label(self.send_frame, text="流水号:")
self.flow_msg_label.pack(side=tk.LEFT)
self.flow_msg_entry = tk.Entry(self.send_frame)
self.flow_msg_entry.pack(side=tk.LEFT)
# 时标
self.time_msg_label = tk.Label(self.send_frame, text="时标:")
self.time_msg_label.pack(side=tk.LEFT)
self.time_msg_entry = tk.Entry(self.send_frame)
self.time_msg_entry.pack(side=tk.LEFT)
# 时标(秒内计数器)
self.sec_time_msg_label = tk.Label(self.send_frame, text="时标(秒内计数器):")
self.sec_time_msg_label.pack(side=tk.LEFT)
self.sec_time_msg_entry = tk.Entry(self.send_frame)
self.sec_time_msg_entry.pack(side=tk.LEFT)
# 报文类型
self.type_msg_label = tk.Label(self.send_frame, text="报文类型:")
self.type_msg_label.pack(side=tk.LEFT)
self.type_msg_entry = tk.Entry(self.send_frame)
self.type_msg_entry.pack(side=tk.LEFT)
# 数据长度
self.length_msg_label = tk.Label(self.send_frame, text="数据长度:")
self.length_msg_label.pack(side=tk.LEFT)
self.length_msg_entry = tk.Entry(self.send_frame)
self.length_msg_entry.pack(side=tk.LEFT)
# 数据段
self.data_msg_label = tk.Label(self.send_frame, text="数据段:")
self.data_msg_label.pack(side=tk.LEFT)
self.data_msg_entry = tk.Entry(self.send_frame)
self.data_msg_entry.pack(side=tk.LEFT)
# 输入帧尾
self.last_message_label = tk.Label(self.send_frame, text="帧尾:")
self.last_message_label.pack(side=tk.LEFT)
self.last_message_entry = tk.Entry(self.send_frame)
self.last_message_entry.pack(side=tk.LEFT)
# 创建一个包含发送按钮的框架
self.send_button_frame = tk.Frame(self.send_frame)
self.send_button_frame.pack(fill=tk.X, padx=10, pady=10) # 框架会填充整个水平方向
# 将发送按钮放入这个新的框架,并让它在框架中居中
self.send_button = tk.Button(self.send_button_frame, text="发送", command=self.send_message)
self.send_button.pack(side=tk.TOP, pady=10) # 这里不需要使用 anchor 参数,因为框架是填充整个水平方向的
8 months ago
# 接收区域
self.recv_frame = tk.Frame(window)
self.recv_frame.pack(padx=10, pady=10)
self.recv_text = tk.Text(self.recv_frame, height=40, width=50)
self.recv_text.pack(expand=True, fill="both")
8 months ago
# 创建udp对象
8 months ago
self.sk = socket.socket(type=socket.SOCK_DGRAM)
self.sk.bind(("127.0.0.1", 5010))
# 开始接收数据的线程
self.thread = Thread(target=self.receive_data)
8 months ago
# 运行解析下行报文函数
8 months ago
self.thread.daemon = True
self.thread.start()
8 months ago
def send_message(self):
msg = self.send_entry.get()
8 months ago
if msg == "":
head = self.first_message_entry.get()
flow = self.flow_msg_entry.get()
times = self.time_msg_entry.get()
time_count = self.time_msg_entry.get()
types = self.type_msg_entry.get()
length = self.length_msg_entry.get()
data = self.data_msg_entry.get()
last = self.last_message_entry.get()
msg = head + flow + times + time_count + types + length + data + last
8 months ago
#发送数据
self.sk.sendto(msg.encode(), ("127.0.0.1", 9000))
time.sleep(1)
def receive_data(self):
while True:
#接收数据
msg, addr = self.sk.recvfrom(1024)
message_to_display = "Received: " + msg.decode() + " from " + str(addr)
self.recv_text.insert(tk.END, message_to_display + "\n")
self.analysis_message(msg)
# 下行报文18字节固定长度
"""帧格式:
0~3 帧头 BYTE[8:10] 4 固定为0x58443341
4 指令类型 UINT8 1 0x00:开始停止 0x01:工况查询 0x02:设备重置 0x03:授时指令 0x04:频率设置
0x05:本机地址设置 0x06:本机位置设置 0x07:开启关闭测试模式
5~14 指令内容 BYTE[10] 10 根据指令类型填充当指令类型为0x010x02时可不填充
15~17 帧尾 BYTE[3] 3 固定为0x334441
"""
#解析下行报文
def analysis_message(self, msg):
getmessage = msg.decode()
#如果帧头不是0x58443341则报文错误
if getmessage[0:8] != "58443341":
first_type = "错误类型:帧头错误"
self.recv_text.insert(tk.END, first_type + "\n")
return
#解析指令类型
#指令为00开始停止
if getmessage[8:10] == "00":
instruction_type = "指令类型:开始停止"
self.recv_text.insert(tk.END, instruction_type + "\n")
if getmessage[10:14] == "AAAA":
start_stop = "开始上传解调数据"
self.recv_text.insert(tk.END, start_stop + "\n")
elif getmessage[10:14] == "5555":
start_stop = "停止传输解调数据"
self.recv_text.insert(tk.END, start_stop + "\n")
else:
start_stop = "指令"
self.recv_text.insert(tk.END, start_stop + "\n")
elif getmessage[8:10] == "01":
instruction_type = "指令类型:工况查询"
self.recv_text.insert(tk.END, instruction_type + "\n")
elif getmessage[8:10] == "02":
instruction_type = "指令类型:设备重置"
self.recv_text.insert(tk.END, instruction_type + "\n")
elif getmessage[8:10] == "03":
instruction_type = "指令类型:授时指令"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
授时指令内容
0 UINT8 1 00-99对应0x00-0x63,基数为2000例如2001年对应0x01,2099年对应0x63
1 UINT8 1 01-12对应0x01-0x0C
2 UINT8 1 01-31对应0x01-0x1F
3 UINT8 1 00-23对应0x00-0x17
4 UINT8 1 00-59对应0x00-0x3B
5 UINT8 1 00-59对应0x00-0x3B
6-9 保留 UINT8 4
"""
year = int(getmessage[10:12],16)+2000
month = int(getmessage[12:14],16)
day = int(getmessage[14:16],16)
hour = int(getmessage[16:18],16)
minute = int(getmessage[18:20],16)
second = int(getmessage[20:22],16)
self.recv_text.insert(tk.END, "设置时间为: " + str(year) + "" + str(month) + "" + str(day) + "" + str(hour) + "" + str(minute) + "" + str(second) + "\n")
elif getmessage[8:10] == "04":
instruction_type = "指令类型:频率设置"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
0 通道号 UINT8 1 0-3,表示ACARS的通道0-3
1-4 频率 UINT32 4 单位KHz
5-9 保留
"""
channel = int(getmessage[10:12],16)
get_frequency = getmessage[12:18]
byte_string = bytes.fromhex(get_frequency)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "ACARS通道" + str(channel) + "的频率为" + str(frequency) + "KHz\n")
elif getmessage[8:10] == "05":
instruction_type = "指令类型:本机地址设置"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
0-3 本机地址 UINT32 4 四个字节分别对应第一到第四段号码
4-9 保留
"""
#四个字节,分别对应第一到第四段号码
address1 = int(getmessage[10:12],16)
address2 = int(getmessage[12:14],16)
address3 = int(getmessage[14:16],16)
address4 = int(getmessage[16:18],16)
ip_address = str(address1)+"."+str(address2)+"."+str(address3)+"."+str(address4)
self.recv_text.insert(tk.END, "本机地址为" + str(ip_address) + "\n")
elif getmessage[8:10] == "06":
instruction_type = "指令类型:本机位置设置"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
0-3 经度 UINT32 4 单位0.0001范围-90.000090.0000北纬为正南纬为负如0xFFFB4D4D表示南纬30.78910xFFFF表示无效
4-7 纬度 UINT32 4 单位0.0001范围-180.0000180.0000东经为正西经为负如0x001128D7表示东经112.45670xFFFF表示无效
8-9 保留
"""
get_longitude = getmessage[10:18]
if get_longitude[0:3] == "FFFF":
longitude = "无效"
else:
byte_string = bytes.fromhex(get_longitude)
longitude = int.from_bytes(byte_string, byteorder='little',signed=True)
if(longitude < 0):
long_tag = "南纬: "
longitude = str((0-longitude)/10000)
else:
long_tag = "北纬: "
longitude = str((longitude)/10000)
get_latitude = getmessage[18:26]
if get_latitude[0:3] == "FFFF":
latitude = "无效"
else:
byte_string = bytes.fromhex(get_latitude)
latitude = int.from_bytes(byte_string, byteorder='little',signed=True)
if(latitude < 0):
lat_tag = " 西经: "
latitude = str((0-latitude)/10000)
else:
lat_tag = " 东经:"
latitude = str((latitude)/10000)
if(longitude == "无效" or latitude == "无效"):
self.recv_text.insert(tk.END, "无效位置\n")
else:
self.recv_text.insert(tk.END, "本机位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(latitude) + "\n")
elif getmessage[8:10] == "07":
instruction_type = "指令类型:开启关闭测试模式"
self.recv_text.insert(tk.END, instruction_type + "\n")
if getmessage[10:14] == "AAAA":
start_stop = "开启测试模式"
self.recv_text.insert(tk.END, start_stop + "\n")
elif getmessage[10:14] == "5555":
start_stop = "关闭测试模式"
self.recv_text.insert(tk.END, start_stop + "\n")
else:
start_stop = "开关测试指令错误"
self.recv_text.insert(tk.END, start_stop + "\n")
else:
instruction_type = "指令类型:错误"
self.recv_text.insert(tk.END, instruction_type + "\n")
#如果帧尾不是0x334441则报文错误
if getmessage[30:36] != "334441":
last_type = "错误类型:帧尾错误"
self.recv_text.insert(tk.END, last_type + "\n")
return
8 months ago
if __name__ == "__main__":
root = tk.Tk()
app = SendApp(root)
root.mainloop()