ganshihao21 5 months ago
parent 998de651f7
commit 6d54f48d8d

@ -109,7 +109,6 @@ def my_record():
stream = pa.open(format=paInt16, channels=channels, stream = pa.open(format=paInt16, channels=channels,
rate=framerate, input=True, frames_per_buffer=num_samples) # 打开流 rate=framerate, input=True, frames_per_buffer=num_samples) # 打开流
my_buf = [] # 初始化缓冲区 my_buf = [] # 初始化缓冲区
# count = 0
t = time.time() t = time.time()
print('开始录音...') # 打印开始录音信息 print('开始录音...') # 打印开始录音信息
while time.time() < t + 4: while time.time() < t + 4:
@ -124,6 +123,7 @@ def speech2text(speech_data, token, dev_pid=1537):
""" """
音频转文字 音频转文字
""" """
#音频格式
FORMAT = 'wav' FORMAT = 'wav'
RATE = '16000' RATE = '16000'
CHANNEL = 1 CHANNEL = 1
@ -139,7 +139,7 @@ def speech2text(speech_data, token, dev_pid=1537):
'token': token, 'token': token,
'dev_pid': dev_pid 'dev_pid': dev_pid
} }
# 语音转文字接口 该接口可能每个人不一样,取决于你需要哪种语音识别功能,本文使用的是 语音识别极速版 # 语音转文字接口 语音识别极速版
url = 'https://vop.baidu.com/pro_api' url = 'https://vop.baidu.com/pro_api'
headers = {'Content-Type': 'application/json'} # 请求头 headers = {'Content-Type': 'application/json'} # 请求头

@ -18,6 +18,7 @@ DETECT_DING = os.path.join(TOP_DIR, "resources/ding.wav")
DETECT_DONG = os.path.join(TOP_DIR, "resources/dong.wav") DETECT_DONG = os.path.join(TOP_DIR, "resources/dong.wav")
# 环形缓冲区用于保存从PortAudio接收到的音频数据
class RingBuffer(object): class RingBuffer(object):
"""Ring buffer to hold audio from PortAudio""" """Ring buffer to hold audio from PortAudio"""
@ -80,42 +81,60 @@ class HotwordDetector(object):
:param audio_gain: multiply input volume by this factor. :param audio_gain: multiply input volume by this factor.
""" """
#初始化检测器
#decoder_model一个列表包含要检测的唤醒词模型文件路径
#resource资源文件路径
#sensitivity检测器的灵敏度一个浮点数或浮点数列表。值越大检测器越敏感。如果提供了一个空列表则将使用模型中的默认灵敏度。
#audio_gain将输入音量乘以这个因子。
def __init__(self, decoder_model, def __init__(self, decoder_model,
resource=RESOURCE_FILE, resource=RESOURCE_FILE,
sensitivity=[], sensitivity=[],
audio_gain=1): audio_gain=1):
# 定义音频回调函数,用于将音频数据添加到环形缓冲区中
def audio_callback(in_data, frame_count, time_info, status): def audio_callback(in_data, frame_count, time_info, status):
self.ring_buffer.extend(in_data) self.ring_buffer.extend(in_data)
play_data = chr(0) * len(in_data) play_data = chr(0) * len(in_data)
return play_data, pyaudio.paContinue return play_data, pyaudio.paContinue
# 判断decoder_model是否为列表如果不是则将其转换为列表
tm = type(decoder_model) tm = type(decoder_model)
ts = type(sensitivity) ts = type(sensitivity)
if tm is not list: if tm is not list:
decoder_model = [decoder_model] decoder_model = [decoder_model]
if ts is not list: if ts is not list:
sensitivity = [sensitivity] sensitivity = [sensitivity]
# 将decoder_model转换为字符串
model_str = ",".join(decoder_model) model_str = ",".join(decoder_model)
# 创建SnowboyDetect对象传入资源文件名和模型字符串
self.detector = snowboydetect.SnowboyDetect( self.detector = snowboydetect.SnowboyDetect(
resource_filename=resource.encode(), model_str=model_str.encode()) resource_filename=resource.encode(), model_str=model_str.encode())
# 设置音频增益
self.detector.SetAudioGain(audio_gain) self.detector.SetAudioGain(audio_gain)
# 获取hotword的数量
self.num_hotwords = self.detector.NumHotwords() self.num_hotwords = self.detector.NumHotwords()
# 如果decoder_model中的模型数量大于1而sensitivity只有一个则将sensitivity乘以hotword的数量
if len(decoder_model) > 1 and len(sensitivity) == 1: if len(decoder_model) > 1 and len(sensitivity) == 1:
sensitivity = sensitivity*self.num_hotwords sensitivity = sensitivity*self.num_hotwords
# 如果sensitivity不为空则检查hotword的数量是否与sensitivity的数量相等
if len(sensitivity) != 0: if len(sensitivity) != 0:
assert self.num_hotwords == len(sensitivity), \ assert self.num_hotwords == len(sensitivity), \
"number of hotwords in decoder_model (%d) and sensitivity " \ "number of hotwords in decoder_model (%d) and sensitivity " \
"(%d) does not match" % (self.num_hotwords, len(sensitivity)) "(%d) does not match" % (self.num_hotwords, len(sensitivity))
# 将sensitivity转换为字符串
sensitivity_str = ",".join([str(t) for t in sensitivity]) sensitivity_str = ",".join([str(t) for t in sensitivity])
# 如果sensitivity不为空则设置sensitivity
if len(sensitivity) != 0: if len(sensitivity) != 0:
self.detector.SetSensitivity(sensitivity_str.encode()) self.detector.SetSensitivity(sensitivity_str.encode())
# 创建环形缓冲区,用于存储音频数据
self.ring_buffer = RingBuffer( self.ring_buffer = RingBuffer(
self.detector.NumChannels() * self.detector.SampleRate() * 5) self.detector.NumChannels() * self.detector.SampleRate() * 5)
# 创建PyAudio对象
self.audio = pyaudio.PyAudio() self.audio = pyaudio.PyAudio()
# 打开音频输入流
self.stream_in = self.audio.open( self.stream_in = self.audio.open(
input=True, output=False, input=True, output=False,
format=self.audio.get_format_from_width( format=self.audio.get_format_from_width(
@ -125,6 +144,10 @@ class HotwordDetector(object):
frames_per_buffer=2048, frames_per_buffer=2048,
stream_callback=audio_callback) stream_callback=audio_callback)
#启动语音检测器
#detected_callback一个函数或函数列表。如果只有一个函数则表示只有一个模型如果有多个函数则表示有多个模型。
#interrupt_check一个函数用于检查是否需要停止主循环。如果返回True则停止主循环。
#sleep_time每个循环等待的时间单位为秒。
def start(self, detected_callback=play_audio_file, def start(self, detected_callback=play_audio_file,
interrupt_check=lambda: False, interrupt_check=lambda: False,
sleep_time=0.03): sleep_time=0.03):
@ -144,40 +167,53 @@ class HotwordDetector(object):
:param float sleep_time: how much time in second every loop waits. :param float sleep_time: how much time in second every loop waits.
:return: None :return: None
""" """
# 如果中断检查函数返回True则返回
if interrupt_check(): if interrupt_check():
logger.debug("detect voice return") logger.debug("detect voice return")
return return
# 获取detected_callback的类型
tc = type(detected_callback) tc = type(detected_callback)
# 如果detected_callback不是列表则将其转换为列表
if tc is not list: if tc is not list:
detected_callback = [detected_callback] detected_callback = [detected_callback]
# 如果detected_callback只有一个元素但是模型中有多个热词则将callback重复多次
if len(detected_callback) == 1 and self.num_hotwords > 1: if len(detected_callback) == 1 and self.num_hotwords > 1:
detected_callback *= self.num_hotwords detected_callback *= self.num_hotwords
# 检查热词的数量是否与callback的数量相等
assert self.num_hotwords == len(detected_callback), \ assert self.num_hotwords == len(detected_callback), \
"Error: hotwords in your models (%d) do not match the number of " \ "Error: hotwords in your models (%d) do not match the number of " \
"callbacks (%d)" % (self.num_hotwords, len(detected_callback)) "callbacks (%d)" % (self.num_hotwords, len(detected_callback))
logger.debug("detecting...") logger.debug("detecting...")
# 循环检测
while True: while True:
# 如果中断检查函数返回True则跳出循环
if interrupt_check(): if interrupt_check():
logger.debug("detect voice break") logger.debug("detect voice break")
break break
# 从ring_buffer中获取数据
data = self.ring_buffer.get() data = self.ring_buffer.get()
# 如果数据长度为0则休眠
if len(data) == 0: if len(data) == 0:
time.sleep(sleep_time) time.sleep(sleep_time)
continue continue
# 调用detector的RunDetection函数
ans = self.detector.RunDetection(data) ans = self.detector.RunDetection(data)
# 如果ans小于0则输出错误信息
if ans == -1: if ans == -1:
logger.warning( logger.warning(
"Error initializing streams or reading audio data") "Error initializing streams or reading audio data")
# 如果ans大于0则输出热词检测到的信息
elif ans > 0: elif ans > 0:
message = "Keyword " + str(ans) + " detected at time: " message = "Keyword " + str(ans) + " detected at time: "
message += time.strftime("%Y-%m-%d %H:%M:%S", message += time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(time.time())) time.localtime(time.time()))
logger.info(message) logger.info(message)
# 如果callback不为空则调用callback函数
callback = detected_callback[ans-1] callback = detected_callback[ans-1]
if callback is not None: if callback is not None:
callback() callback()
@ -186,9 +222,10 @@ class HotwordDetector(object):
def terminate(self): def terminate(self):
""" """
Terminate audio stream. Users cannot call start() again to detect. 终止音频流用户不能再次调用start()来检测
:return: None :return: None
""" """
self.stream_in.stop_stream() self.stream_in.stop_stream() # 停止音频输入流
self.stream_in.close() self.stream_in.close() # 关闭音频输入流
self.audio.terminate() self.audio.terminate() # 终止音频模块

Loading…
Cancel
Save