From 42b2fb2abcf6eb17ca6ca687a39a684b46fb01ac Mon Sep 17 00:00:00 2001 From: gaozixi <3409364440@qq.com> Date: Wed, 14 May 2025 06:55:04 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0WiFi=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E5=92=8CTCP=E6=95=B0=E6=8D=AE=E4=BC=A0=E8=BE=93?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=8C=E9=87=8D=E6=9E=84=E5=A3=B0=E6=BA=90?= =?UTF-8?q?=E5=AE=9A=E4=BD=8D=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/mic-code/init-code/main.py | 281 +++++++++++++++++++++++++--- src/mic-code/init-code/main_raw.py | 38 ++++ src/mic-code/init-code/main_wifi.py | 259 ------------------------- 3 files changed, 289 insertions(+), 289 deletions(-) create mode 100644 src/mic-code/init-code/main_raw.py delete mode 100644 src/mic-code/init-code/main_wifi.py diff --git a/src/mic-code/init-code/main.py b/src/mic-code/init-code/main.py index d09d9e6..6ad6595 100644 --- a/src/mic-code/init-code/main.py +++ b/src/mic-code/init-code/main.py @@ -1,38 +1,259 @@ -from Maix import MIC_ARRAY as mic -import lcd,time +# 声源定位+WiFi传输集成程序 +from fpioa_manager import fm +from Maix import GPIO, MIC_ARRAY as mic +from board import board_info +import network, socket, time +from machine import UART import math -mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20, i2s_ws=19, i2s_sclk=18, sk9822_dat=24, sk9822_clk=25) + +#----------------------------- +# 1. WiFi和服务器配置 +#----------------------------- +wifi_ap_ssid = "junzekeji" # 你的WiFi SSID +wifi_ap_passwd = "234567890l" # 你的WiFi密码 +PC_IP = "192.168.32.6" # 运行sound_map.py的电脑IP +UDP_PORT = 12345 # 与sound_map.py里配置的端口一致 +TCP_PORT = 12345 # 替换你想要的端口 + +# 全局变量 +uart = None +wifi_en = None +nic = None +udp_sock = None +tcp_sock = None + +#----------------------------- +# 2. 引脚映射和WiFi使能 +#----------------------------- +def init_wifi_hardware(): + global wifi_en + # GPIOHS0 用于WiFi使能控制 + fm.register(8, fm.fpioa.GPIOHS0, force=True) + wifi_en = GPIO(GPIO.GPIOHS0, GPIO.OUT) + # 配置UART用于与ESP8285通信 + fm.register(board_info.WIFI_RX, fm.fpioa.UART2_TX, force=True) + fm.register(board_info.WIFI_TX, fm.fpioa.UART2_RX, force=True) + print("WiFi硬件引脚初始化完成。") + +def wifi_enable(en): + # 控制WiFi模块电源 + global wifi_en + if wifi_en: + wifi_en.value(en) + +#----------------------------- +# 3. 复位WiFi并连接到路由器 +#----------------------------- +def wifi_reset_and_connect(): + global uart, nic + # 复位WiFi模块 + wifi_enable(0) # 先关闭 + time.sleep_ms(200) + wifi_enable(1) # 再打开 + time.sleep(2) # 等待稳定 + + # 初始化UART,低波特率 + uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=4096) + _ = uart.read() # 清空缓存 + + # 提升波特率到921600 + uart.write("AT+UART_CUR=921600,8,1,0,0\r\n") + time.sleep_ms(100) + print("AT+UART_CUR响应:", uart.read()) + uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=4096) + + # 测试AT指令是否正常 + uart.write("AT\r\n") + time.sleep_ms(100) + at_response = uart.read() + print("AT响应:", at_response) + if not at_response or not at_response.endswith(b"OK\r\n"): + print("AT命令失败,WiFi重置失败。") + return False + + # 初始化网络对象 + try: + nic = network.ESP8285(uart) + except Exception as e: + print("ESP8285初始化失败:", e) + return False + + # 连接到WiFi网络 + print("尝试连接到WiFi: {}...".format(wifi_ap_ssid)) + try: + nic.connect(wifi_ap_ssid, wifi_ap_passwd) + attempts = 0 + while not nic.isconnected() and attempts < 20: # 最多等待20秒 + print("等待连接... {}/20".format(attempts + 1)) + time.sleep(1) + attempts += 1 + + if nic.isconnected(): + print("WiFi连接成功!") + print("IP信息:", nic.ifconfig()) + return True + else: + print("WiFi连接超时或失败。") + return False + except Exception as e: + print("连接WiFi时发生异常:", e) + return False + +#----------------------------- +# 4. 声源定位函数 +#----------------------------- def get_mic_dir(): - AngleX=0 - AngleY=0 - AngleR=0 - Angle=0 - AngleAddPi=0 - mic_list=[] - imga = mic.get_map() # 获取声音源分布图像 - b = mic.get_dir(imga) # 计算、获取声源方向 + AngleX = 0 + AngleY = 0 + AngleR = 0 + Angle = 0 + AngleAddPi = 0 + mic_list = [] + imga = mic.get_map() # 获取声音源分布图像 + b = mic.get_dir(imga) # 计算声源方向 for i in range(len(b)): - if b[i]>=2: - AngleX+= b[i]*math.sin(i*math.pi/6) - AngleY+= b[i]*math.cos(i*math.pi/6) - AngleX=round(AngleX,6) #计算坐标转换值 - AngleY=round(AngleY,6) - if AngleY<0:AngleAddPi=180 - if AngleX<0 and AngleY > 0:AngleAddPi=360 - if AngleX!=0 or AngleY!=0: #参数修正 - if AngleY==0: - Angle=90 if AngleX>0 else 270 #填补X轴角度 + if b[i] >= 2: + AngleX += b[i] * math.sin(i * math.pi / 6) + AngleY += b[i] * math.cos(i * math.pi / 6) + AngleX = round(AngleX, 6) + AngleY = round(AngleY, 6) + if AngleY < 0: + AngleAddPi = 180 + if AngleX < 0 and AngleY > 0: + AngleAddPi = 360 + if AngleX != 0 or AngleY != 0: + if AngleY == 0: + Angle = 90 if AngleX > 0 else 270 else: - Angle=AngleAddPi+round(math.degrees(math.atan(AngleX/AngleY)),4) #计算角度 - AngleR=round(math.sqrt(AngleY*AngleY+AngleX*AngleX),4) #计算强度 + Angle = AngleAddPi + round(math.degrees(math.atan(AngleX / AngleY)), 4) + AngleR = round(math.sqrt(AngleY * AngleY + AngleX * AngleX), 4) mic_list.append(AngleX) mic_list.append(AngleY) mic_list.append(AngleR) mic_list.append(Angle) - a = mic.set_led(b,(0,0,255))# 配置 RGB LED 颜色值 - return mic_list #返回列表,X坐标,Y坐标,强度,角度 -while True: - A=get_mic_dir() - if A: - print(A) - time.sleep_ms(100) + mic.set_led(b, (0, 0, 255)) + + return mic_list + +#----------------------------- +# 5. 初始化TCP Socket +#----------------------------- +def setup_tcp_socket(): + global tcp_sock + try: + tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + tcp_sock.connect((PC_IP, TCP_PORT)) + print("TCP连接成功。") + return True + except Exception as e: + print("TCP连接失败:", e) + return False + +#----------------------------- +# 6. 主循环:连接WiFi、发送数据 +#----------------------------- +def main_loop(): + global nic, tcp_sock + + # 初始化麦克风阵列 + mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20, + i2s_ws=19, i2s_sclk=18, + sk9822_dat=24, sk9822_clk=25) + print("麦克风阵列初始化完成。") + + # 初始化WiFi硬件 + init_wifi_hardware() + + # 不断尝试连接WiFi和设置TCP + while True: + if nic and nic.isconnected(): + if not tcp_sock: + if not setup_tcp_socket(): + print("TCP设置失败,2秒后重试...") + time.sleep(2) + nic.disconnect() + nic = None + continue + # 如果WiFi和TCP都已准备好,跳出循环 + break + else: + print("尝试连接WiFi...") + if wifi_reset_and_connect(): + if not setup_tcp_socket(): + print("TCP设置失败,断开WiFi并重试...") + nic.disconnect() + nic = None + time.sleep(2) + else: + print("WiFi连接失败,2秒后重试...") + time.sleep(2) + + print("WiFi和TCP准备就绪,开始发送声源数据到 {}:{}".format(PC_IP, TCP_PORT)) + + # 发送消息计数 + msg_count = 0 + + # 主循环:获取声源数据并发送 + while True: + # 检查WiFi连接状态 + if not nic or not nic.isconnected(): + print("WiFi连接断开,尝试重新连接...") + if tcp_sock: + tcp_sock.close() + tcp_sock = None + nic = None + # 重新启动连接流程 + main_loop() + return + + # 获取声源数据 + source_data = get_mic_dir() + if source_data: + # 解析数据: X, Y, 强度, 角度 + x, y, strength, angle = source_data + + # 打印数据用于调试 + print("麦克风定位: X={:.2f}, Y={:.2f}, 强度={:.2f}, 角度={:.2f}".format( + x, y, strength, angle)) + + # 格式化为CSV字符串发送 + data_string = "{:.4f},{:.4f},{:.4f},{:.4f}".format(x, y, strength, angle) + + try: + # 发送到PC + tcp_sock.send(data_string.encode('utf-8')) + msg_count += 1 + # 每50条显示一次发送状态 + if msg_count % 50 == 0: + print("已发送第 {} 条数据".format(msg_count)) + except Exception as e: + print("TCP发送失败:", e) + # 关闭并重新连接 + tcp_sock.close() + tcp_sock = None + nic.disconnect() + nic = None + print("将尝试重新建立连接...") + time.sleep(1) + main_loop() + return + + # 控制采样间隔 + time.sleep_ms(100) + +# 主程序入口 +if __name__ == "__main__": + try: + main_loop() + except KeyboardInterrupt: + print("用户中断程序。") + finally: + # 清理资源 + if tcp_sock: + tcp_sock.close() + print("TCP socket已关闭。") + if nic and nic.isconnected(): + nic.disconnect() + print("WiFi已断开。") + wifi_enable(0) # 关闭WiFi模块 + print("程序结束。") diff --git a/src/mic-code/init-code/main_raw.py b/src/mic-code/init-code/main_raw.py new file mode 100644 index 0000000..d09d9e6 --- /dev/null +++ b/src/mic-code/init-code/main_raw.py @@ -0,0 +1,38 @@ +from Maix import MIC_ARRAY as mic +import lcd,time +import math +mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20, i2s_ws=19, i2s_sclk=18, sk9822_dat=24, sk9822_clk=25) +def get_mic_dir(): + AngleX=0 + AngleY=0 + AngleR=0 + Angle=0 + AngleAddPi=0 + mic_list=[] + imga = mic.get_map() # 获取声音源分布图像 + b = mic.get_dir(imga) # 计算、获取声源方向 + for i in range(len(b)): + if b[i]>=2: + AngleX+= b[i]*math.sin(i*math.pi/6) + AngleY+= b[i]*math.cos(i*math.pi/6) + AngleX=round(AngleX,6) #计算坐标转换值 + AngleY=round(AngleY,6) + if AngleY<0:AngleAddPi=180 + if AngleX<0 and AngleY > 0:AngleAddPi=360 + if AngleX!=0 or AngleY!=0: #参数修正 + if AngleY==0: + Angle=90 if AngleX>0 else 270 #填补X轴角度 + else: + Angle=AngleAddPi+round(math.degrees(math.atan(AngleX/AngleY)),4) #计算角度 + AngleR=round(math.sqrt(AngleY*AngleY+AngleX*AngleX),4) #计算强度 + mic_list.append(AngleX) + mic_list.append(AngleY) + mic_list.append(AngleR) + mic_list.append(Angle) + a = mic.set_led(b,(0,0,255))# 配置 RGB LED 颜色值 + return mic_list #返回列表,X坐标,Y坐标,强度,角度 +while True: + A=get_mic_dir() + if A: + print(A) + time.sleep_ms(100) diff --git a/src/mic-code/init-code/main_wifi.py b/src/mic-code/init-code/main_wifi.py deleted file mode 100644 index 6ad6595..0000000 --- a/src/mic-code/init-code/main_wifi.py +++ /dev/null @@ -1,259 +0,0 @@ -# 声源定位+WiFi传输集成程序 -from fpioa_manager import fm -from Maix import GPIO, MIC_ARRAY as mic -from board import board_info -import network, socket, time -from machine import UART -import math - -#----------------------------- -# 1. WiFi和服务器配置 -#----------------------------- -wifi_ap_ssid = "junzekeji" # 你的WiFi SSID -wifi_ap_passwd = "234567890l" # 你的WiFi密码 -PC_IP = "192.168.32.6" # 运行sound_map.py的电脑IP -UDP_PORT = 12345 # 与sound_map.py里配置的端口一致 -TCP_PORT = 12345 # 替换你想要的端口 - -# 全局变量 -uart = None -wifi_en = None -nic = None -udp_sock = None -tcp_sock = None - -#----------------------------- -# 2. 引脚映射和WiFi使能 -#----------------------------- -def init_wifi_hardware(): - global wifi_en - # GPIOHS0 用于WiFi使能控制 - fm.register(8, fm.fpioa.GPIOHS0, force=True) - wifi_en = GPIO(GPIO.GPIOHS0, GPIO.OUT) - # 配置UART用于与ESP8285通信 - fm.register(board_info.WIFI_RX, fm.fpioa.UART2_TX, force=True) - fm.register(board_info.WIFI_TX, fm.fpioa.UART2_RX, force=True) - print("WiFi硬件引脚初始化完成。") - -def wifi_enable(en): - # 控制WiFi模块电源 - global wifi_en - if wifi_en: - wifi_en.value(en) - -#----------------------------- -# 3. 复位WiFi并连接到路由器 -#----------------------------- -def wifi_reset_and_connect(): - global uart, nic - # 复位WiFi模块 - wifi_enable(0) # 先关闭 - time.sleep_ms(200) - wifi_enable(1) # 再打开 - time.sleep(2) # 等待稳定 - - # 初始化UART,低波特率 - uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=4096) - _ = uart.read() # 清空缓存 - - # 提升波特率到921600 - uart.write("AT+UART_CUR=921600,8,1,0,0\r\n") - time.sleep_ms(100) - print("AT+UART_CUR响应:", uart.read()) - uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=4096) - - # 测试AT指令是否正常 - uart.write("AT\r\n") - time.sleep_ms(100) - at_response = uart.read() - print("AT响应:", at_response) - if not at_response or not at_response.endswith(b"OK\r\n"): - print("AT命令失败,WiFi重置失败。") - return False - - # 初始化网络对象 - try: - nic = network.ESP8285(uart) - except Exception as e: - print("ESP8285初始化失败:", e) - return False - - # 连接到WiFi网络 - print("尝试连接到WiFi: {}...".format(wifi_ap_ssid)) - try: - nic.connect(wifi_ap_ssid, wifi_ap_passwd) - attempts = 0 - while not nic.isconnected() and attempts < 20: # 最多等待20秒 - print("等待连接... {}/20".format(attempts + 1)) - time.sleep(1) - attempts += 1 - - if nic.isconnected(): - print("WiFi连接成功!") - print("IP信息:", nic.ifconfig()) - return True - else: - print("WiFi连接超时或失败。") - return False - except Exception as e: - print("连接WiFi时发生异常:", e) - return False - -#----------------------------- -# 4. 声源定位函数 -#----------------------------- -def get_mic_dir(): - AngleX = 0 - AngleY = 0 - AngleR = 0 - Angle = 0 - AngleAddPi = 0 - mic_list = [] - imga = mic.get_map() # 获取声音源分布图像 - b = mic.get_dir(imga) # 计算声源方向 - for i in range(len(b)): - if b[i] >= 2: - AngleX += b[i] * math.sin(i * math.pi / 6) - AngleY += b[i] * math.cos(i * math.pi / 6) - AngleX = round(AngleX, 6) - AngleY = round(AngleY, 6) - if AngleY < 0: - AngleAddPi = 180 - if AngleX < 0 and AngleY > 0: - AngleAddPi = 360 - if AngleX != 0 or AngleY != 0: - if AngleY == 0: - Angle = 90 if AngleX > 0 else 270 - else: - Angle = AngleAddPi + round(math.degrees(math.atan(AngleX / AngleY)), 4) - AngleR = round(math.sqrt(AngleY * AngleY + AngleX * AngleX), 4) - mic_list.append(AngleX) - mic_list.append(AngleY) - mic_list.append(AngleR) - mic_list.append(Angle) - mic.set_led(b, (0, 0, 255)) - - return mic_list - -#----------------------------- -# 5. 初始化TCP Socket -#----------------------------- -def setup_tcp_socket(): - global tcp_sock - try: - tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - tcp_sock.connect((PC_IP, TCP_PORT)) - print("TCP连接成功。") - return True - except Exception as e: - print("TCP连接失败:", e) - return False - -#----------------------------- -# 6. 主循环:连接WiFi、发送数据 -#----------------------------- -def main_loop(): - global nic, tcp_sock - - # 初始化麦克风阵列 - mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20, - i2s_ws=19, i2s_sclk=18, - sk9822_dat=24, sk9822_clk=25) - print("麦克风阵列初始化完成。") - - # 初始化WiFi硬件 - init_wifi_hardware() - - # 不断尝试连接WiFi和设置TCP - while True: - if nic and nic.isconnected(): - if not tcp_sock: - if not setup_tcp_socket(): - print("TCP设置失败,2秒后重试...") - time.sleep(2) - nic.disconnect() - nic = None - continue - # 如果WiFi和TCP都已准备好,跳出循环 - break - else: - print("尝试连接WiFi...") - if wifi_reset_and_connect(): - if not setup_tcp_socket(): - print("TCP设置失败,断开WiFi并重试...") - nic.disconnect() - nic = None - time.sleep(2) - else: - print("WiFi连接失败,2秒后重试...") - time.sleep(2) - - print("WiFi和TCP准备就绪,开始发送声源数据到 {}:{}".format(PC_IP, TCP_PORT)) - - # 发送消息计数 - msg_count = 0 - - # 主循环:获取声源数据并发送 - while True: - # 检查WiFi连接状态 - if not nic or not nic.isconnected(): - print("WiFi连接断开,尝试重新连接...") - if tcp_sock: - tcp_sock.close() - tcp_sock = None - nic = None - # 重新启动连接流程 - main_loop() - return - - # 获取声源数据 - source_data = get_mic_dir() - if source_data: - # 解析数据: X, Y, 强度, 角度 - x, y, strength, angle = source_data - - # 打印数据用于调试 - print("麦克风定位: X={:.2f}, Y={:.2f}, 强度={:.2f}, 角度={:.2f}".format( - x, y, strength, angle)) - - # 格式化为CSV字符串发送 - data_string = "{:.4f},{:.4f},{:.4f},{:.4f}".format(x, y, strength, angle) - - try: - # 发送到PC - tcp_sock.send(data_string.encode('utf-8')) - msg_count += 1 - # 每50条显示一次发送状态 - if msg_count % 50 == 0: - print("已发送第 {} 条数据".format(msg_count)) - except Exception as e: - print("TCP发送失败:", e) - # 关闭并重新连接 - tcp_sock.close() - tcp_sock = None - nic.disconnect() - nic = None - print("将尝试重新建立连接...") - time.sleep(1) - main_loop() - return - - # 控制采样间隔 - time.sleep_ms(100) - -# 主程序入口 -if __name__ == "__main__": - try: - main_loop() - except KeyboardInterrupt: - print("用户中断程序。") - finally: - # 清理资源 - if tcp_sock: - tcp_sock.close() - print("TCP socket已关闭。") - if nic and nic.isconnected(): - nic.disconnect() - print("WiFi已断开。") - wifi_enable(0) # 关闭WiFi模块 - print("程序结束。")