feat: 添加WiFi连接和TCP数据传输功能,重构声源定位逻辑

gaojunzhe
gaozixi 1 year ago
parent fe1721ebed
commit 42b2fb2abc

@ -1,38 +1,259 @@
from Maix import MIC_ARRAY as mic
import lcd,time
# 声源定位+WiFi传输集成程序
from fpioa_manager import fm
from Maix import GPIO, MIC_ARRAY as mic
from board import board_info
import network, socket, time
from machine import UART
import math
mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20, i2s_ws=19, i2s_sclk=18, sk9822_dat=24, sk9822_clk=25)
#-----------------------------
# 1. WiFi和服务器配置
#-----------------------------
wifi_ap_ssid = "junzekeji" # 你的WiFi SSID
wifi_ap_passwd = "234567890l" # 你的WiFi密码
PC_IP = "192.168.32.6" # 运行sound_map.py的电脑IP
UDP_PORT = 12345 # 与sound_map.py里配置的端口一致
TCP_PORT = 12345 # 替换你想要的端口
# 全局变量
uart = None
wifi_en = None
nic = None
udp_sock = None
tcp_sock = None
#-----------------------------
# 2. 引脚映射和WiFi使能
#-----------------------------
def init_wifi_hardware():
global wifi_en
# GPIOHS0 用于WiFi使能控制
fm.register(8, fm.fpioa.GPIOHS0, force=True)
wifi_en = GPIO(GPIO.GPIOHS0, GPIO.OUT)
# 配置UART用于与ESP8285通信
fm.register(board_info.WIFI_RX, fm.fpioa.UART2_TX, force=True)
fm.register(board_info.WIFI_TX, fm.fpioa.UART2_RX, force=True)
print("WiFi硬件引脚初始化完成。")
def wifi_enable(en):
# 控制WiFi模块电源
global wifi_en
if wifi_en:
wifi_en.value(en)
#-----------------------------
# 3. 复位WiFi并连接到路由器
#-----------------------------
def wifi_reset_and_connect():
global uart, nic
# 复位WiFi模块
wifi_enable(0) # 先关闭
time.sleep_ms(200)
wifi_enable(1) # 再打开
time.sleep(2) # 等待稳定
# 初始化UART低波特率
uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=4096)
_ = uart.read() # 清空缓存
# 提升波特率到921600
uart.write("AT+UART_CUR=921600,8,1,0,0\r\n")
time.sleep_ms(100)
print("AT+UART_CUR响应:", uart.read())
uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=4096)
# 测试AT指令是否正常
uart.write("AT\r\n")
time.sleep_ms(100)
at_response = uart.read()
print("AT响应:", at_response)
if not at_response or not at_response.endswith(b"OK\r\n"):
print("AT命令失败WiFi重置失败。")
return False
# 初始化网络对象
try:
nic = network.ESP8285(uart)
except Exception as e:
print("ESP8285初始化失败:", e)
return False
# 连接到WiFi网络
print("尝试连接到WiFi: {}...".format(wifi_ap_ssid))
try:
nic.connect(wifi_ap_ssid, wifi_ap_passwd)
attempts = 0
while not nic.isconnected() and attempts < 20: # 最多等待20秒
print("等待连接... {}/20".format(attempts + 1))
time.sleep(1)
attempts += 1
if nic.isconnected():
print("WiFi连接成功!")
print("IP信息:", nic.ifconfig())
return True
else:
print("WiFi连接超时或失败。")
return False
except Exception as e:
print("连接WiFi时发生异常:", e)
return False
#-----------------------------
# 4. 声源定位函数
#-----------------------------
def get_mic_dir():
AngleX=0
AngleY=0
AngleR=0
Angle=0
AngleAddPi=0
mic_list=[]
imga = mic.get_map() # 获取声音源分布图像
b = mic.get_dir(imga) # 计算、获取声源方向
AngleX = 0
AngleY = 0
AngleR = 0
Angle = 0
AngleAddPi = 0
mic_list = []
imga = mic.get_map() # 获取声音源分布图像
b = mic.get_dir(imga) # 计算声源方向
for i in range(len(b)):
if b[i]>=2:
AngleX+= b[i]*math.sin(i*math.pi/6)
AngleY+= b[i]*math.cos(i*math.pi/6)
AngleX=round(AngleX,6) #计算坐标转换值
AngleY=round(AngleY,6)
if AngleY<0:AngleAddPi=180
if AngleX<0 and AngleY > 0:AngleAddPi=360
if AngleX!=0 or AngleY!=0: #参数修正
if AngleY==0:
Angle=90 if AngleX>0 else 270 #填补X轴角度
if b[i] >= 2:
AngleX += b[i] * math.sin(i * math.pi / 6)
AngleY += b[i] * math.cos(i * math.pi / 6)
AngleX = round(AngleX, 6)
AngleY = round(AngleY, 6)
if AngleY < 0:
AngleAddPi = 180
if AngleX < 0 and AngleY > 0:
AngleAddPi = 360
if AngleX != 0 or AngleY != 0:
if AngleY == 0:
Angle = 90 if AngleX > 0 else 270
else:
Angle=AngleAddPi+round(math.degrees(math.atan(AngleX/AngleY)),4) #计算角度
AngleR=round(math.sqrt(AngleY*AngleY+AngleX*AngleX),4) #计算强度
Angle = AngleAddPi + round(math.degrees(math.atan(AngleX / AngleY)), 4)
AngleR = round(math.sqrt(AngleY * AngleY + AngleX * AngleX), 4)
mic_list.append(AngleX)
mic_list.append(AngleY)
mic_list.append(AngleR)
mic_list.append(Angle)
a = mic.set_led(b,(0,0,255))# 配置 RGB LED 颜色值
return mic_list #返回列表X坐标Y坐标强度角度
while True:
A=get_mic_dir()
if A:
print(A)
time.sleep_ms(100)
mic.set_led(b, (0, 0, 255))
return mic_list
#-----------------------------
# 5. 初始化TCP Socket
#-----------------------------
def setup_tcp_socket():
global tcp_sock
try:
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_sock.connect((PC_IP, TCP_PORT))
print("TCP连接成功。")
return True
except Exception as e:
print("TCP连接失败:", e)
return False
#-----------------------------
# 6. 主循环连接WiFi、发送数据
#-----------------------------
def main_loop():
global nic, tcp_sock
# 初始化麦克风阵列
mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20,
i2s_ws=19, i2s_sclk=18,
sk9822_dat=24, sk9822_clk=25)
print("麦克风阵列初始化完成。")
# 初始化WiFi硬件
init_wifi_hardware()
# 不断尝试连接WiFi和设置TCP
while True:
if nic and nic.isconnected():
if not tcp_sock:
if not setup_tcp_socket():
print("TCP设置失败2秒后重试...")
time.sleep(2)
nic.disconnect()
nic = None
continue
# 如果WiFi和TCP都已准备好跳出循环
break
else:
print("尝试连接WiFi...")
if wifi_reset_and_connect():
if not setup_tcp_socket():
print("TCP设置失败断开WiFi并重试...")
nic.disconnect()
nic = None
time.sleep(2)
else:
print("WiFi连接失败2秒后重试...")
time.sleep(2)
print("WiFi和TCP准备就绪开始发送声源数据到 {}:{}".format(PC_IP, TCP_PORT))
# 发送消息计数
msg_count = 0
# 主循环:获取声源数据并发送
while True:
# 检查WiFi连接状态
if not nic or not nic.isconnected():
print("WiFi连接断开尝试重新连接...")
if tcp_sock:
tcp_sock.close()
tcp_sock = None
nic = None
# 重新启动连接流程
main_loop()
return
# 获取声源数据
source_data = get_mic_dir()
if source_data:
# 解析数据: X, Y, 强度, 角度
x, y, strength, angle = source_data
# 打印数据用于调试
print("麦克风定位: X={:.2f}, Y={:.2f}, 强度={:.2f}, 角度={:.2f}".format(
x, y, strength, angle))
# 格式化为CSV字符串发送
data_string = "{:.4f},{:.4f},{:.4f},{:.4f}".format(x, y, strength, angle)
try:
# 发送到PC
tcp_sock.send(data_string.encode('utf-8'))
msg_count += 1
# 每50条显示一次发送状态
if msg_count % 50 == 0:
print("已发送第 {} 条数据".format(msg_count))
except Exception as e:
print("TCP发送失败:", e)
# 关闭并重新连接
tcp_sock.close()
tcp_sock = None
nic.disconnect()
nic = None
print("将尝试重新建立连接...")
time.sleep(1)
main_loop()
return
# 控制采样间隔
time.sleep_ms(100)
# 主程序入口
if __name__ == "__main__":
try:
main_loop()
except KeyboardInterrupt:
print("用户中断程序。")
finally:
# 清理资源
if tcp_sock:
tcp_sock.close()
print("TCP socket已关闭。")
if nic and nic.isconnected():
nic.disconnect()
print("WiFi已断开。")
wifi_enable(0) # 关闭WiFi模块
print("程序结束。")

@ -0,0 +1,38 @@
from Maix import MIC_ARRAY as mic
import lcd,time
import math
mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20, i2s_ws=19, i2s_sclk=18, sk9822_dat=24, sk9822_clk=25)
def get_mic_dir():
AngleX=0
AngleY=0
AngleR=0
Angle=0
AngleAddPi=0
mic_list=[]
imga = mic.get_map() # 获取声音源分布图像
b = mic.get_dir(imga) # 计算、获取声源方向
for i in range(len(b)):
if b[i]>=2:
AngleX+= b[i]*math.sin(i*math.pi/6)
AngleY+= b[i]*math.cos(i*math.pi/6)
AngleX=round(AngleX,6) #计算坐标转换值
AngleY=round(AngleY,6)
if AngleY<0:AngleAddPi=180
if AngleX<0 and AngleY > 0:AngleAddPi=360
if AngleX!=0 or AngleY!=0: #参数修正
if AngleY==0:
Angle=90 if AngleX>0 else 270 #填补X轴角度
else:
Angle=AngleAddPi+round(math.degrees(math.atan(AngleX/AngleY)),4) #计算角度
AngleR=round(math.sqrt(AngleY*AngleY+AngleX*AngleX),4) #计算强度
mic_list.append(AngleX)
mic_list.append(AngleY)
mic_list.append(AngleR)
mic_list.append(Angle)
a = mic.set_led(b,(0,0,255))# 配置 RGB LED 颜色值
return mic_list #返回列表X坐标Y坐标强度角度
while True:
A=get_mic_dir()
if A:
print(A)
time.sleep_ms(100)

@ -1,259 +0,0 @@
# 声源定位+WiFi传输集成程序
from fpioa_manager import fm
from Maix import GPIO, MIC_ARRAY as mic
from board import board_info
import network, socket, time
from machine import UART
import math
#-----------------------------
# 1. WiFi和服务器配置
#-----------------------------
wifi_ap_ssid = "junzekeji" # 你的WiFi SSID
wifi_ap_passwd = "234567890l" # 你的WiFi密码
PC_IP = "192.168.32.6" # 运行sound_map.py的电脑IP
UDP_PORT = 12345 # 与sound_map.py里配置的端口一致
TCP_PORT = 12345 # 替换你想要的端口
# 全局变量
uart = None
wifi_en = None
nic = None
udp_sock = None
tcp_sock = None
#-----------------------------
# 2. 引脚映射和WiFi使能
#-----------------------------
def init_wifi_hardware():
global wifi_en
# GPIOHS0 用于WiFi使能控制
fm.register(8, fm.fpioa.GPIOHS0, force=True)
wifi_en = GPIO(GPIO.GPIOHS0, GPIO.OUT)
# 配置UART用于与ESP8285通信
fm.register(board_info.WIFI_RX, fm.fpioa.UART2_TX, force=True)
fm.register(board_info.WIFI_TX, fm.fpioa.UART2_RX, force=True)
print("WiFi硬件引脚初始化完成。")
def wifi_enable(en):
# 控制WiFi模块电源
global wifi_en
if wifi_en:
wifi_en.value(en)
#-----------------------------
# 3. 复位WiFi并连接到路由器
#-----------------------------
def wifi_reset_and_connect():
global uart, nic
# 复位WiFi模块
wifi_enable(0) # 先关闭
time.sleep_ms(200)
wifi_enable(1) # 再打开
time.sleep(2) # 等待稳定
# 初始化UART低波特率
uart = UART(UART.UART2, 115200, timeout=1000, read_buf_len=4096)
_ = uart.read() # 清空缓存
# 提升波特率到921600
uart.write("AT+UART_CUR=921600,8,1,0,0\r\n")
time.sleep_ms(100)
print("AT+UART_CUR响应:", uart.read())
uart = UART(UART.UART2, 921600, timeout=1000, read_buf_len=4096)
# 测试AT指令是否正常
uart.write("AT\r\n")
time.sleep_ms(100)
at_response = uart.read()
print("AT响应:", at_response)
if not at_response or not at_response.endswith(b"OK\r\n"):
print("AT命令失败WiFi重置失败。")
return False
# 初始化网络对象
try:
nic = network.ESP8285(uart)
except Exception as e:
print("ESP8285初始化失败:", e)
return False
# 连接到WiFi网络
print("尝试连接到WiFi: {}...".format(wifi_ap_ssid))
try:
nic.connect(wifi_ap_ssid, wifi_ap_passwd)
attempts = 0
while not nic.isconnected() and attempts < 20: # 最多等待20秒
print("等待连接... {}/20".format(attempts + 1))
time.sleep(1)
attempts += 1
if nic.isconnected():
print("WiFi连接成功!")
print("IP信息:", nic.ifconfig())
return True
else:
print("WiFi连接超时或失败。")
return False
except Exception as e:
print("连接WiFi时发生异常:", e)
return False
#-----------------------------
# 4. 声源定位函数
#-----------------------------
def get_mic_dir():
AngleX = 0
AngleY = 0
AngleR = 0
Angle = 0
AngleAddPi = 0
mic_list = []
imga = mic.get_map() # 获取声音源分布图像
b = mic.get_dir(imga) # 计算声源方向
for i in range(len(b)):
if b[i] >= 2:
AngleX += b[i] * math.sin(i * math.pi / 6)
AngleY += b[i] * math.cos(i * math.pi / 6)
AngleX = round(AngleX, 6)
AngleY = round(AngleY, 6)
if AngleY < 0:
AngleAddPi = 180
if AngleX < 0 and AngleY > 0:
AngleAddPi = 360
if AngleX != 0 or AngleY != 0:
if AngleY == 0:
Angle = 90 if AngleX > 0 else 270
else:
Angle = AngleAddPi + round(math.degrees(math.atan(AngleX / AngleY)), 4)
AngleR = round(math.sqrt(AngleY * AngleY + AngleX * AngleX), 4)
mic_list.append(AngleX)
mic_list.append(AngleY)
mic_list.append(AngleR)
mic_list.append(Angle)
mic.set_led(b, (0, 0, 255))
return mic_list
#-----------------------------
# 5. 初始化TCP Socket
#-----------------------------
def setup_tcp_socket():
global tcp_sock
try:
tcp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_sock.connect((PC_IP, TCP_PORT))
print("TCP连接成功。")
return True
except Exception as e:
print("TCP连接失败:", e)
return False
#-----------------------------
# 6. 主循环连接WiFi、发送数据
#-----------------------------
def main_loop():
global nic, tcp_sock
# 初始化麦克风阵列
mic.init(i2s_d0=23, i2s_d1=22, i2s_d2=21, i2s_d3=20,
i2s_ws=19, i2s_sclk=18,
sk9822_dat=24, sk9822_clk=25)
print("麦克风阵列初始化完成。")
# 初始化WiFi硬件
init_wifi_hardware()
# 不断尝试连接WiFi和设置TCP
while True:
if nic and nic.isconnected():
if not tcp_sock:
if not setup_tcp_socket():
print("TCP设置失败2秒后重试...")
time.sleep(2)
nic.disconnect()
nic = None
continue
# 如果WiFi和TCP都已准备好跳出循环
break
else:
print("尝试连接WiFi...")
if wifi_reset_and_connect():
if not setup_tcp_socket():
print("TCP设置失败断开WiFi并重试...")
nic.disconnect()
nic = None
time.sleep(2)
else:
print("WiFi连接失败2秒后重试...")
time.sleep(2)
print("WiFi和TCP准备就绪开始发送声源数据到 {}:{}".format(PC_IP, TCP_PORT))
# 发送消息计数
msg_count = 0
# 主循环:获取声源数据并发送
while True:
# 检查WiFi连接状态
if not nic or not nic.isconnected():
print("WiFi连接断开尝试重新连接...")
if tcp_sock:
tcp_sock.close()
tcp_sock = None
nic = None
# 重新启动连接流程
main_loop()
return
# 获取声源数据
source_data = get_mic_dir()
if source_data:
# 解析数据: X, Y, 强度, 角度
x, y, strength, angle = source_data
# 打印数据用于调试
print("麦克风定位: X={:.2f}, Y={:.2f}, 强度={:.2f}, 角度={:.2f}".format(
x, y, strength, angle))
# 格式化为CSV字符串发送
data_string = "{:.4f},{:.4f},{:.4f},{:.4f}".format(x, y, strength, angle)
try:
# 发送到PC
tcp_sock.send(data_string.encode('utf-8'))
msg_count += 1
# 每50条显示一次发送状态
if msg_count % 50 == 0:
print("已发送第 {} 条数据".format(msg_count))
except Exception as e:
print("TCP发送失败:", e)
# 关闭并重新连接
tcp_sock.close()
tcp_sock = None
nic.disconnect()
nic = None
print("将尝试重新建立连接...")
time.sleep(1)
main_loop()
return
# 控制采样间隔
time.sleep_ms(100)
# 主程序入口
if __name__ == "__main__":
try:
main_loop()
except KeyboardInterrupt:
print("用户中断程序。")
finally:
# 清理资源
if tcp_sock:
tcp_sock.close()
print("TCP socket已关闭。")
if nic and nic.isconnected():
nic.disconnect()
print("WiFi已断开。")
wifi_enable(0) # 关闭WiFi模块
print("程序结束。")
Loading…
Cancel
Save