Compare commits

...

No commits in common. 'main' and 'master' have entirely different histories.
main ... master

8
.idea/.gitignore vendored

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.11 (UDP_test)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.9" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.11 (UDP_test)" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/UDP_test.iml" filepath="$PROJECT_DIR$/.idea/UDP_test.iml" />
</modules>
</component>
</project>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

@ -1,2 +0,0 @@
# udp_test

@ -0,0 +1,665 @@
# 相当于天线默认目标地址为192.168.49.160,端口号为 8080
# 下行报文指上位机发送给接收机
import tkinter as tk
from threading import Thread
import socket
class ReceiveApp:
def __init__(self, window):
self.window = window
self.window.title("Receive端")
# 接收区域
self.recv_frame = tk.Frame(window)
self.recv_frame.pack(padx=10, pady=10)
self.recv_label = tk.Label(self.recv_frame, text="接收数据")
self.recv_label.pack()
self.recv_text = tk.Text(self.recv_frame, height=15, width=50)
self.recv_text.pack()
# 发送区域
self.send_frame = tk.Frame(window)
self.send_frame.pack(padx=10, pady=10)
self.style_label = tk.Label(self.send_frame, text="指令类型:")
self.style_label.pack(side=tk.LEFT)
self.style_entry = tk.Entry(self.send_frame)
self.style_entry.pack(side=tk.LEFT)
self.content_label = tk.Label(self.send_frame, text="指令内容:")
self.content_label.pack(side=tk.LEFT)
self.content_entry = tk.Entry(self.send_frame)
self.content_entry.pack(side=tk.LEFT)
#获取完整指令
self.all_msg = tk.Label(self.send_frame, text="完整指令:")
self.all_msg.pack(side=tk.LEFT)
self.all_msg = tk.Entry(self.send_frame)
self.all_msg.pack(side=tk.LEFT)
self.send_button = tk.Button(self.send_frame, text="发送", command=self.send_message)
self.send_button.pack(side=tk.LEFT)
self.sk = socket.socket(type=socket.SOCK_DGRAM)
self.sk.bind(("127.0.0.1", 9000))
# 开始接收数据的线程
self.thread = Thread(target=self.receive_data)
self.thread.daemon = True
self.thread.start()
# 下行报文18字节固定长度
"""帧格式:
0~3 帧头 BYTE[4] 4 固定为0x58443341
4 指令类型 UINT8 1 0x00:开始停止 0x01:工况查询 0x02:设备重置 0x03:授时指令 0x04:频率设置
0x05:本机地址设置 0x06:本机位置设置 0x07:开启关闭测试模式
5~14 指令内容 BYTE[10] 10 根据指令类型填充当指令类型为0x010x02时可不填充
15~17 帧尾 BYTE[3] 3 固定为0x334441
"""
def send_message(self):
# 1.创建udp对象
style = self.style_entry.get()
print(style)
content = self.content_entry.get()
print(content)
if style != "" or content != "":
head = "58443341"
tail = "334441"
msg = head + style + content + tail
else:
all_message = self.all_msg.get()
msg = all_message.split(" ")
msg = "".join(msg)
print(msg)
# sendto( 二进制字节流ip端口号 )
self.sk.sendto(msg.encode(), ("127.0.0.1", 5010))
def receive_data(self):
while True:
msg, addr = self.sk.recvfrom(1024)
message_to_display = "Received: " + msg.decode() + " from " + str(addr)
self.recv_text.insert(tk.END, message_to_display + "\n")
self.analysis_message(msg)
# 上行报文采用不定长帧
"""帧格式:
0~3 帧头 BYTE[4] 4 固定为0x58443341
4~7 流水号 UINT32 4 0x00000000~0xFFFFFFFF循环用于判断帧连续
8~13 时标年月日时分秒 BYTE[6] 6 若报文类别为0x010x020x03此时间为FPGA采到第一bit数据时间或者第一个脉冲的起始时间若报文类别为0xFF此时间为组包时的时间
14~17 时标秒内计数器 UINT32 4
18 报文类型 BYTE 1 0x01AIS通道1信息0x02AIS通道2信息0x03ACARS通道1信息0x04ACARS通道2信息0x05ACARS通道3信息0x06ACARS通道4信息0x07ADS-B信息0x08IFF信息0xFF工况19~20数据长度UINT16 2指数据段的长度不包括帧头帧尾和其它字段
21~20+n 数据段 BYTE[n] n 根据报文类型区分
21+n~23+n 帧尾 BYTE[3] 3 固定为0x334441
"""
# 解析下行报文
def analysis_message(self, msg):
global water
getmessage = msg.decode()
# 如果帧头不是0x58443341则报文错误
if getmessage[0:8] != "58443341":
first_type = "错误类型:帧头错误"
self.recv_text.insert(tk.END, first_type + "\n")
return
else:
first_type = "帧头正确"
self.recv_text.insert(tk.END, first_type + "\n")
#判断是否连续
Water = '{:08X}'.format(water)
water = water + 1
if getmessage[8:16] == Water:
k = "帧连续"
self.recv_text.insert(tk.END, k + "\n")
else:
k = "帧不连续,有漏帧"
self.recv_text.insert(tk.END, k + "\n")
return
#汇报时间
"""
授时指令内容
0 UINT8 1 00-99对应0x00-0x63,基数为2000例如2001年对应0x01,2099年对应0x63
1 UINT8 1 01-12对应0x01-0x0C
2 UINT8 1 01-31对应0x01-0x1F
3 UINT8 1 00-23对应0x00-0x17
4 UINT8 1 00-59对应0x00-0x3B
5 UINT8 1 00-59对应0x00-0x3B
6-9 保留 UINT8 4
"""
year = int(getmessage[16:18], 16) + 2000
month = int(getmessage[18:20], 16)
day = int(getmessage[20:22], 16)
hour = int(getmessage[22:24], 16)
minute = int(getmessage[24:26], 16)
second = int(getmessage[26:28], 16)
self.recv_text.insert(tk.END, "当前时间为: " + str(year) + "" + str(month) + "" + str(day) + "" + str(
hour) + "" + str(minute) + "" + str(second) + "\n")
#秒内计数器
one = int(getmessage[28:30],16)
two = int(getmessage[30:32],16)
three = int(getmessage[32:34], 16)
four = int(getmessage[34:36], 16)
self.recv_text.insert(tk.END, "秒内计数器: " + str(one) + "." + str(two) + "." + str(three) + "." + str(four) + "\n")
#报文类型
if getmessage[36:38] == "01":
k = "AIS通道1信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "02":
k = "AIS通道2信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "03":
k = "ACARS通道1信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "04":
k = "ACARS通道2信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "05":
k = "ACARS通道3信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "06":
k = "ACARS通道4信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "07":
k = "ADS-B信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "08":
k = "IFF信息"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
elif getmessage[36:38] == "FF":
k = "工况"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
else:
k = "错误"
self.recv_text.insert(tk.END,"报文类型:" + k + "\n")
#数据长度
self.recv_text.insert(tk.END, "数据长度:" + getmessage[38:42] + "\n")
#AIS信息
if getmessage[36:38] == "01" or getmessage[36:38] == "02":
if getmessage[42:44] == "01" or getmessage[42:44] == "02" or getmessage[42:44] == "03":
k = "A类船位置报告消息"
self.recv_text.insert(tk.END, "消息ID" + k + "\n")
elif getmessage[42:44] == "05":
k = "A类船的静态消息"
self.recv_text.insert(tk.END, "消息ID" + k + "\n")
elif getmessage[42:44] == "12":
k = "B类船位置报告消息"
self.recv_text.insert(tk.END, "消息ID" + k + "\n")
elif getmessage[42:44] == "24":
k = "B类船的静态消息"
self.recv_text.insert(tk.END, "消息ID" + k + "\n")
else:
k = "错误"
self.recv_text.insert(tk.END, "消息ID" + k + "\n")
#用户ID
self.recv_text.insert(tk.END, "用户ID" + getmessage[44:54] + "\n")
#IMO编号
self.recv_text.insert(tk.END, "IMO编号" + getmessage[54:62] + "\n")
#呼号
self.recv_text.insert(tk.END, "呼号:" + getmessage[62:76] + "\n")
#船名
self.recv_text.insert(tk.END, "船名:" + getmessage[76:116] + "\n")
#船舶和货物类型
if getmessage[116:118] == "00":
k = "捕捞"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "01":
k = "拖船"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "02":
k = "拖船且推带长度超过200米或宽度超过25米"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "03":
k = "从事挖掘或水下作业"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "04":
k = "从事潜水作业"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "05":
k = "从事军事行动"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "06":
k = "帆船"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "07":
k = "游艇"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "21":
k = "运载DG、HS或者MP、IMO危险品或X2类污染物"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "32":
k = "运载DG、HS或者MP、IMO危险品或Y2类污染物"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "43":
k = "运载DG、HS或者MP、IMO危险品或Z2类污染物"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "50":
k = "引航船舶"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "51":
k = "搜救船舶"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "52":
k = "拖轮"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "53":
k = "港口补给船"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "54":
k = "安装有防污染设施或设备的船舶"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "55":
k = "执法船舶"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "58":
k = "医疗运输船舶"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "59":
k = "非武装冲突参与国的船舶和航空器"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "60":
k = "客轮"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "70":
k = "货轮"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
elif getmessage[116:118] == "80":
k = "油轮"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
else:
k = "错误"
self.recv_text.insert(tk.END, "船舶和货物类型:" + k + "\n")
# 船舶宽度
self.recv_text.insert(tk.END, "船舶宽度:" + getmessage[118:122] + "\n")
# 船舶长度
self.recv_text.insert(tk.END, "船舶长度:" + getmessage[122:126] + "\n")
#预计到达时间
month = int(getmessage[126:128], 16)
day = int(getmessage[128:130], 16)
hour = int(getmessage[130:132], 16)
minute = int(getmessage[132:134], 16)
self.recv_text.insert(tk.END, "预计到达时间为: " + str(month) + "" + str(day) + "" + str(
hour) + "" + str(minute) + "" "\n")
#目前最大静态吃水
k = int(getmessage[134:136],16)
k = k / 10
self.recv_text.insert(tk.END, "目前最大静态吃水:" + str(k) + "\n")
#目的地
self.recv_text.insert(tk.END, "目的地:" + getmessage[136:176] + "\n")
#导航状态
if getmessage[176:178] == "00":
k = "发动机使用中"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "01":
k = "锚泊"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "02":
k = "未操纵"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "03":
k = "有限适航性"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "04":
k = "受船舶吃水限制"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "05":
k = "系泊"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "06":
k = "搁浅"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "07":
k = "从事捕捞"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "08":
k = "航行中"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
elif getmessage[176:178] == "15":
k = "无效信息"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
else:
k = "错误"
self.recv_text.insert(tk.END, "导航状态:" + k + "\n")
#地面航速
k = int(getmessage[178:182],16) / 10
self.recv_text.insert(tk.END, "地面航速:" + str(k) + "\n")
#经纬度
get_longitude = getmessage[182:190]
if get_longitude[0:3] == "FFFF":
longitude = "无效"
else:
byte_string = bytes.fromhex(get_longitude)
longitude = int.from_bytes(byte_string, byteorder='little', signed=True)
if (longitude < 0):
long_tag = "南纬: "
longitude = str((0 - longitude) / 10000)
else:
long_tag = "北纬: "
longitude = str((longitude) / 10000)
get_latitude = getmessage[190:198]
if get_latitude[0:3] == "FFFF":
latitude = "无效"
else:
byte_string = bytes.fromhex(get_latitude)
latitude = int.from_bytes(byte_string, byteorder='little', signed=True)
if (latitude < 0):
lat_tag = " 西经: "
latitude = str((0 - latitude) / 10000)
else:
lat_tag = " 东经:"
latitude = str((latitude) / 10000)
if (longitude == "无效" or latitude == "无效"):
self.recv_text.insert(tk.END, "无效位置\n")
else:
self.recv_text.insert(tk.END, "本机位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(
latitude) + "\n")
#地面航线
k = int(getmessage[198:202],16) / 10
self.recv_text.insert(tk.END, "地面航线:" + str(k) + "\n")
# 实际航向
k = int(getmessage[202:206], 16)
self.recv_text.insert(tk.END, "实际航向:" + str(k) + "\n")
#帧尾
if getmessage[206:212] != "334441":
last_type = "错误类型:帧尾错误"
self.recv_text.insert(tk.END, last_type + "\n")
else:
last_type = "帧尾正确"
self.recv_text.insert(tk.END, last_type + "\n")
return
#ACARS数据
if getmessage[36:38] == "03" or getmessage[36:38] == "04" or getmessage[36:38] == "05" or getmessage[36:38] == "06":
if getmessage[40:42] == "01":
t = "飞机发往地面"
self.recv_text.insert(tk.END, "消息ID" + t + "\n")
elif getmessage[42:44] == "01":
t = "A类广播模式"
self.recv_text.insert(tk.END, "消息ID" + t + "\n")
elif getmessage[42:44] == "02":
t = "B类点对点模式"
self.recv_text.insert(tk.END, "消息ID" + t + "\n")
elif getmessage[44:58] == "FFFFFFFFFFFFFF" or getmessage[70:78] == "FFFFFFFFFFFFFF" or getmessage[78:86] == "FFFFFFFFFFFFFF":
t = "无效信息"
self.recv_text.insert(tk.END, "消息ID" + t + "\n")
else:
t = "错误"
self.recv_text.insert(tk.END, "消息ID" + t + "\n")
self.recv_text.insert(tk.END, "飞机注册码:" + getmessage[44:58] + "\n")
self.recv_text.insert(tk.END, "航班号:" + getmessage[58:70] + "\n")
self.recv_text.insert(tk.END, "起飞机场:" + getmessage[70:78] + "\n")
self.recv_text.insert(tk.END, "目的机场:" + getmessage[78:86] + "\n")
month = int(getmessage[86:88], 16)
day = int(getmessage[88:90], 16)
hour = int(getmessage[90:92], 16)
minute = int(getmessage[92:94], 16)
self.recv_text.insert(tk.END, "预计到达时间为: " + str(month) + "" + str(day) + "" + str(
hour) + "" + str(minute) + "" "\n")
t = int(getmessage[94:98], 16) / 10
self.recv_text.insert(tk.END, "空速:" + t + "\n")
get_longitude = getmessage[98:116]
if get_longitude[0:3] == "FFFF":
longitude = "无效"
else:
byte_string = bytes.fromhex(get_longitude)
longitude = int.from_bytes(byte_string, byteorder='little', signed=True)
if (longitude < 0):
long_tag = "南纬: "
longitude = str((0 - longitude) / 10000)
else:
long_tag = "北纬: "
longitude = str((longitude) / 10000)
get_latitude = getmessage[116:124]
if get_latitude[0:3] == "FFFF":
latitude = "无效"
else:
byte_string = bytes.fromhex(get_latitude)
latitude = int.from_bytes(byte_string, byteorder='little', signed=True)
if (latitude < 0):
lat_tag = " 西经: "
latitude = str((0 - latitude) / 10000)
else:
lat_tag = " 东经:"
latitude = str((latitude) / 10000)
if (longitude == "无效" or latitude == "无效"):
self.recv_text.insert(tk.END, "无效位置\n")
else:
self.recv_text.insert(tk.END, "位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(
latitude) + "\n")
get_height = getmessage[124:132]
t = get_height
self.recv_text.insert(tk.END, "高度:" + t + "\n")
#ADS-B数据
if getmessage[36:38]== "07":
t = getmessage[44:50]
self.recv_text.insert(tk.END, "飞机地址码:" + t + "\n")
if getmessage[50:52] == "00":
t = "无 ADS-B 发射器类型信息"
elif getmessage[50:52] == "01":
t = "轻型15500 磅);"
elif getmessage[50:52] == "02":
t = "小型15500 到 75000 磅);"
elif getmessage[50:52] == "03":
t = "大型75000 到 300000 磅);"
elif getmessage[50:52] == "04":
t = "高漩涡式大型(如 B-757 飞机)"
elif getmessage[50:52] == "05":
t = "重型300000 磅);"
elif getmessage[50:52] == "06":
t = "高性能5g 加速度且400 哩/小时);"
elif getmessage[50:52] == "07":
t = "旋冀飞机;"
self.recv_text.insert(tk.END, "飞机类别:" + t + "\n")
if getmessage[52:54] == "00":
t = "向上"
elif getmessage[52:54] == "01":
t = "向下"
self.recv_text.insert(tk.END, "飞垂直速度方向:" + t + "\n")
t = int(getmessage[54:58], 16) / 10
self.recv_text.insert(tk.END, "垂直速度:" + t + "\n")
t = int(getmessage[58:62], 16) / 10
self.recv_text.insert(tk.END, "空速:" + t + "\n")
t = int(getmessage[62:66], 16) / 100
self.recv_text.insert(tk.END, "航向:" + t + "\n")
t = int(getmessage[66:70], 16) / 100
self.recv_text.insert(tk.END, "目标航向:" + t + "\n")
get_longitude = getmessage[70:78]
if get_longitude[0:3] == "FFFF":
longitude = "无效"
else:
byte_string = bytes.fromhex(get_longitude)
longitude = int.from_bytes(byte_string, byteorder='little', signed=True)
if (longitude < 0):
long_tag = "南纬: "
longitude = str((0 - longitude) / 10000)
else:
long_tag = "北纬: "
longitude = str((longitude) / 10000)
get_latitude = getmessage[78:86]
if get_latitude[0:3] == "FFFF":
latitude = "无效"
else:
byte_string = bytes.fromhex(get_latitude)
latitude = int.from_bytes(byte_string, byteorder='little', signed=True)
if (latitude < 0):
lat_tag = " 西经: "
latitude = str((0 - latitude) / 10000)
else:
lat_tag = " 东经:"
latitude = str((latitude) / 10000)
if (longitude == "无效" or latitude == "无效"):
self.recv_text.insert(tk.END, "无效位置\n")
else:
self.recv_text.insert(tk.END, "位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(
latitude) + "\n")
get_height = getmessage[86:94]
t = get_height
self.recv_text.insert(tk.END, "高度:" + t + "\n")
get_height = getmessage[94:102]
t = get_height
self.recv_text.insert(tk.END, "目标高度:" + t + "\n")
t = getmessage[102:118]
self.recv_text.insert(tk.END, "航班号:" + t + "\n")
#工况信息
if getmessage[36:38] == "FF":
get_frequency1 = getmessage[42:50]
byte_string = bytes.fromhex(get_frequency1)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道1中心频率为" + str(frequency) + "KHz\n")
get_frequency2 = getmessage[58:66]
byte_string = bytes.fromhex(get_frequency2)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道2中心频率为" + str(frequency) + "KHz\n")
get_frequency3 = getmessage[74:82]
byte_string = bytes.fromhex(get_frequency3)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道3中心频率为" + str(frequency) + "KHz\n")
get_frequency4 = getmessage[90:98]
byte_string = bytes.fromhex(get_frequency4)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道4中心频率为" + str(frequency) + "KHz\n")
get_frequency5 = getmessage[106:114]
byte_string = bytes.fromhex(get_frequency5)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道5中心频率为" + str(frequency) + "KHz\n")
get_frequency6 = getmessage[122:130]
byte_string = bytes.fromhex(get_frequency6)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道6中心频率为" + str(frequency) + "KHz\n")
get_frequency7 = getmessage[138:146]
byte_string = bytes.fromhex(get_frequency7)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "通道7中心频率为" + str(frequency) + "KHz\n")
get_fpga = getmessage[154:170]
#去掉头部的0
fpga = get_fpga.lstrip('0')
self.recv_text.insert(tk.END, "FPGA版本号为" + str(fpga) + "\n")
get_arm = getmessage[170:186]
arm = get_arm.lstrip('0')
self.recv_text.insert(tk.END, "ARM版本号为" + str(arm) + "\n")
#四个字节,分别对应第一到第四段号码
address1 = int(getmessage[186:188],16)
address2 = int(getmessage[188:190],16)
address3 = int(getmessage[190:192],16)
address4 = int(getmessage[192:194],16)
ip_address = str(address1)+"."+str(address2)+"."+str(address3)+"."+str(address4)
self.recv_text.insert(tk.END, "接收机地址为" + str(ip_address) + "\n")
get_port = getmessage[194:198]
self.recv_text.insert(tk.END, "接收机端口号为" + str(get_port) + "\n")
#198-202 纬度
get_longitude = getmessage[198:206]
if get_longitude[0:3] == "FFFF":
longitude = "无效"
else:
byte_string = bytes.fromhex(get_longitude)
longitude = int.from_bytes(byte_string, byteorder='little',signed=True)
if(longitude < 0):
long_tag = "南纬: "
longitude = str((0-longitude)/10000)
else:
long_tag = "北纬: "
longitude = str((longitude)/10000)
get_latitude = getmessage[206:214]
if get_latitude[0:3] == "FFFF":
latitude = "无效"
else:
byte_string = bytes.fromhex(get_latitude)
latitude = int.from_bytes(byte_string, byteorder='little',signed=True)
if(latitude < 0):
lat_tag = " 西经: "
latitude = str((0-latitude)/10000)
else:
lat_tag = " 东经:"
latitude = str((latitude)/10000)
if(longitude == "无效" or latitude == "无效"):
self.recv_text.insert(tk.END, "无效位置\n")
else:
self.recv_text.insert(tk.END, "本机位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(latitude) + "\n")
cpu_temp = getmessage[214:216]
self.recv_text.insert(tk.END, "CPU温度为" + str(cpu_temp) + "\n")
#帧尾
if getmessage[216:222] != "334441":
last_type = "错误类型:帧尾错误"
self.recv_text.insert(tk.END, last_type + "\n")
return
else:
last_type = "帧尾正确"
self.recv_text.insert(tk.END, last_type + "\n")\
if __name__ == "__main__":
water = 0
root = tk.Tk()
app = ReceiveApp(root)
root.mainloop()

@ -0,0 +1,279 @@
# 接收机相当于本机192.168.49.10,端口号为 5010
#,上行报文指接收机发送给上位机
import tkinter as tk
from threading import Thread
import socket
import time
class SendApp:
def __init__(self, window):
self.window = window
self.window.title("Send端")
# 创建发送区域的主框架
self.send_frame = tk.Frame(window)
self.send_frame.pack(padx=10, pady=10)
# 创建专门的发送数据子框架
self.send_data_frame = tk.Frame(self.send_frame)
self.send_data_frame.pack(side=tk.TOP, fill=tk.X, expand=True)
self.send_label = tk.Label(self.send_data_frame, text="发送数据")
self.send_label.pack(side=tk.LEFT)
self.send_entry = tk.Entry(self.send_data_frame, width=50)
self.send_entry.pack(side=tk.LEFT)
# 创建元数据子框架
self.metadata_frame = tk.LabelFrame(self.send_frame, text='元数据', padx=10, pady=10)
self.metadata_frame.pack(side=tk.TOP, fill=tk.X, expand=True)
# 输入帧头
self.first_message_label = tk.Label(self.metadata_frame, text="帧头:")
self.first_message_label.grid(row=0, column=0)
self.first_message_entry = tk.Entry(self.metadata_frame)
self.first_message_entry.grid(row=0, column=1)
# 流水号
self.flow_msg_label = tk.Label(self.metadata_frame, text="流水号:")
self.flow_msg_label.grid(row=0, column=2)
self.flow_msg_entry = tk.Entry(self.metadata_frame)
self.flow_msg_entry.grid(row=0, column=3)
#实际时间
self.actual_time_label = tk.Label(self.metadata_frame, text="实际时间:")
self.actual_time_label.grid(row=0, column=6)
self.actual_time_entry = tk.Entry(self.metadata_frame)
self.actual_time_entry.grid(row=0, column=7)
# 时标
self.time_msg_label = tk.Label(self.metadata_frame, text="时标:")
self.time_msg_label.grid(row=0, column=4)
self.time_msg_entry = tk.Entry(self.metadata_frame)
self.time_msg_entry.grid(row=0, column=5)
# 输入帧尾
self.last_message_label = tk.Label(self.metadata_frame, text="帧尾:")
self.last_message_label.grid(row=1, column=0)
self.last_message_entry = tk.Entry(self.metadata_frame)
self.last_message_entry.grid(row=1, column=1)
# 报文类型 & 数据长度 & 数据段
self.payload_frame = tk.LabelFrame(self.send_frame, text='有效负载', padx=10, pady=10)
self.payload_frame.pack(side=tk.TOP, fill=tk.X, expand=True)
# 报文类型
self.type_msg_label = tk.Label(self.payload_frame, text="报文类型:")
self.type_msg_label.grid(row=0, column=0)
self.type_msg_entry = tk.Entry(self.payload_frame)
self.type_msg_entry.grid(row=0, column=1)
# 数据长度
self.length_msg_label = tk.Label(self.payload_frame, text="数据长度:")
self.length_msg_label.grid(row=0, column=2)
self.length_msg_entry = tk.Entry(self.payload_frame)
self.length_msg_entry.grid(row=0, column=3)
# 数据段
self.data_msg_label = tk.Label(self.payload_frame, text="数据段:")
self.data_msg_label.grid(row=1, column=0)
self.data_msg_entry = tk.Entry(self.payload_frame)
self.data_msg_entry.grid(row=1, column=1)
# 创建发送按钮
self.send_button = tk.Button(self.send_frame, text="发送", command=self.send_message)
self.send_button.pack(side=tk.LEFT, pady=10)
# 接收区域
self.recv_frame = tk.Frame(window)
self.recv_frame.pack(padx=10, pady=10)
self.recv_text = tk.Text(self.recv_frame, height=40, width=50)
self.recv_text.pack(expand=True, fill="both")
# 创建udp对象
self.sk = socket.socket(type=socket.SOCK_DGRAM)
self.sk.bind(("127.0.0.1", 5010))
# 开始接收数据的线程
self.thread = Thread(target=self.receive_data)
# 运行解析下行报文函数
self.thread.daemon = True
self.thread.start()
def send_message(self):
msg = self.send_entry.get()
msg = msg.replace(" ", "")
if msg == "":
head = self.first_message_entry.get()
flow = self.flow_msg_entry.get()
times = self.actual_time_entry.get()
time_count = self.time_msg_entry.get()
types = self.type_msg_entry.get()
length = self.length_msg_entry.get()
data_get = self.data_msg_entry.get()
data = "".join(data_get.split(" "))
last = self.last_message_entry.get()
msg = head + flow + times + time_count + types + length + data + last
#发送数据
self.sk.sendto(msg.encode(), ("127.0.0.1", 9000))
time.sleep(1)
def receive_data(self):
while True:
#接收数据
msg, addr = self.sk.recvfrom(1024)
message_to_display = "Received: " + msg.decode() + " from " + str(addr)
self.recv_text.insert(tk.END, message_to_display + "\n")
self.analysis_message(msg)
# 下行报文18字节固定长度
"""帧格式:
0~3 帧头 BYTE[8:10] 4 固定为0x58443341
4 指令类型 UINT8 1 0x00:开始停止 0x01:工况查询 0x02:设备重置 0x03:授时指令 0x04:频率设置
0x05:本机地址设置 0x06:本机位置设置 0x07:开启关闭测试模式
5~14 指令内容 BYTE[10] 10 根据指令类型填充当指令类型为0x010x02时可不填充
15~17 帧尾 BYTE[3] 3 固定为0x334441
"""
#解析下行报文
def analysis_message(self, msg):
getmessage = msg.decode()
#如果帧头不是0x58443341则报文错误
if getmessage[0:8] != "58443341":
first_type = "错误类型:帧头错误"
self.recv_text.insert(tk.END, first_type + "\n")
return
#解析指令类型
#指令为00开始停止
if getmessage[8:10] == "00":
instruction_type = "指令类型:开始停止"
self.recv_text.insert(tk.END, instruction_type + "\n")
if getmessage[10:14] == "AAAA":
start_stop = "开始上传解调数据"
self.recv_text.insert(tk.END, start_stop + "\n")
elif getmessage[10:14] == "5555":
start_stop = "停止传输解调数据"
self.recv_text.insert(tk.END, start_stop + "\n")
else:
start_stop = "指令"
self.recv_text.insert(tk.END, start_stop + "\n")
elif getmessage[8:10] == "01":
instruction_type = "指令类型:工况查询"
self.recv_text.insert(tk.END, instruction_type + "\n")
elif getmessage[8:10] == "02":
instruction_type = "指令类型:设备重置"
self.recv_text.insert(tk.END, instruction_type + "\n")
elif getmessage[8:10] == "03":
instruction_type = "指令类型:授时指令"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
授时指令内容
0 UINT8 1 00-99对应0x00-0x63,基数为2000例如2001年对应0x01,2099年对应0x63
1 UINT8 1 01-12对应0x01-0x0C
2 UINT8 1 01-31对应0x01-0x1F
3 UINT8 1 00-23对应0x00-0x17
4 UINT8 1 00-59对应0x00-0x3B
5 UINT8 1 00-59对应0x00-0x3B
6-9 保留 UINT8 4
"""
year = int(getmessage[10:12],16)+2000
month = int(getmessage[12:14],16)
day = int(getmessage[14:16],16)
hour = int(getmessage[16:18],16)
minute = int(getmessage[18:20],16)
second = int(getmessage[20:22],16)
self.recv_text.insert(tk.END, "设置时间为: " + str(year) + "" + str(month) + "" + str(day) + "" + str(hour) + "" + str(minute) + "" + str(second) + "\n")
elif getmessage[8:10] == "04":
instruction_type = "指令类型:频率设置"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
0 通道号 UINT8 1 0-3,表示ACARS的通道0-3
1-4 频率 UINT32 4 单位KHz
5-9 保留
"""
channel = int(getmessage[10:12],16)
get_frequency = getmessage[12:18]
byte_string = bytes.fromhex(get_frequency)
frequency = int.from_bytes(byte_string, byteorder='little')
self.recv_text.insert(tk.END, "ACARS通道" + str(channel) + "的频率为" + str(frequency) + "KHz\n")
elif getmessage[8:10] == "05":
instruction_type = "指令类型:本机地址设置"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
0-3 本机地址 UINT32 4 四个字节分别对应第一到第四段号码
4-9 保留
"""
#四个字节,分别对应第一到第四段号码
address1 = int(getmessage[10:12],16)
address2 = int(getmessage[12:14],16)
address3 = int(getmessage[14:16],16)
address4 = int(getmessage[16:18],16)
ip_address = str(address1)+"."+str(address2)+"."+str(address3)+"."+str(address4)
self.recv_text.insert(tk.END, "本机地址为" + str(ip_address) + "\n")
elif getmessage[8:10] == "06":
instruction_type = "指令类型:本机位置设置"
self.recv_text.insert(tk.END, instruction_type + "\n")
"""
0-3 经度 UINT32 4 单位0.0001范围-90.000090.0000北纬为正南纬为负如0xFFFB4D4D表示南纬30.78910xFFFF表示无效
4-7 纬度 UINT32 4 单位0.0001范围-180.0000180.0000东经为正西经为负如0x001128D7表示东经112.45670xFFFF表示无效
8-9 保留
"""
get_longitude = getmessage[10:18]
if get_longitude[0:3] == "FFFF":
longitude = "无效"
else:
byte_string = bytes.fromhex(get_longitude)
longitude = int.from_bytes(byte_string, byteorder='little',signed=True)
if(longitude < 0):
long_tag = "南纬: "
longitude = str((0-longitude)/10000)
else:
long_tag = "北纬: "
longitude = str((longitude)/10000)
get_latitude = getmessage[18:26]
if get_latitude[0:3] == "FFFF":
latitude = "无效"
else:
byte_string = bytes.fromhex(get_latitude)
latitude = int.from_bytes(byte_string, byteorder='little',signed=True)
if(latitude < 0):
lat_tag = " 西经: "
latitude = str((0-latitude)/10000)
else:
lat_tag = " 东经:"
latitude = str((latitude)/10000)
if(longitude == "无效" or latitude == "无效"):
self.recv_text.insert(tk.END, "无效位置\n")
else:
self.recv_text.insert(tk.END, "本机位置为: " + str(long_tag) + str(longitude) + str(lat_tag) + str(latitude) + "\n")
elif getmessage[8:10] == "07":
instruction_type = "指令类型:开启关闭测试模式"
self.recv_text.insert(tk.END, instruction_type + "\n")
if getmessage[10:14] == "AAAA":
start_stop = "开启测试模式"
self.recv_text.insert(tk.END, start_stop + "\n")
elif getmessage[10:14] == "5555":
start_stop = "关闭测试模式"
self.recv_text.insert(tk.END, start_stop + "\n")
else:
start_stop = "开关测试指令错误"
self.recv_text.insert(tk.END, start_stop + "\n")
else:
instruction_type = "指令类型:错误"
self.recv_text.insert(tk.END, instruction_type + "\n")
#如果帧尾不是0x334441则报文错误
if getmessage[30:36] != "334441":
last_type = "错误类型:帧尾错误"
self.recv_text.insert(tk.END, last_type + "\n")
return
if __name__ == "__main__":
root = tk.Tk()
app = SendApp(root)
root.mainloop()

@ -0,0 +1,38 @@
AIS数据测试
帧头58443341
流水00000000
时间111111111111
时标11111111
帧尾334441
报文类型01
数据长度0081
数据段01123456789912345678123456776543211234567899123456789912345678991234567899001234123409011020121234567899123456789912345678991234567899001234123456781234567812341234
工况数据测试:
完整数据584433410000000111111111111111111111334441FF00811234567800000000 8765432100000000 1111111100000000 2222222200000000 3333333300000000 4444444400000000 5555555500000000 0000202109070001 0000000031303334 C0A8310A 8080 4D4DFBFF D7281100 30334441
帧头58443341
流水00000001
时间111111111111
时标11111111
帧尾334441
报文类型FF
数据长度0081
数据段1234567800000000 8765432100000000 1111111100000000 2222222200000000 3333333300000000 4444444400000000 5555555500000000 0000202109070001 0000000031303334 C0A8310A 8080 4D4DFBFF D7281100 30
ACARS
帧头58443341
流水00000000
时标11111111111111111111
帧尾334441
报文类型03
数据长度40
数据段0001112233445566771234123456781234657812346578071701300100123456781234657800008888
ADSB
帧头58443341
流水00000000
时标11111111111111111111
帧尾334441
报文类型07
数据长度36
数据段11223305000100010002000200123456781234657800008888000099991122334455667788
Loading…
Cancel
Save