diff --git a/src/Guide_stick_system/voice_assistant/snow.py b/src/Guide_stick_system/voice_assistant/snow.py index eb3f4bb..b971833 100644 --- a/src/Guide_stick_system/voice_assistant/snow.py +++ b/src/Guide_stick_system/voice_assistant/snow.py @@ -109,7 +109,6 @@ def my_record(): stream = pa.open(format=paInt16, channels=channels, rate=framerate, input=True, frames_per_buffer=num_samples) # 打开流 my_buf = [] # 初始化缓冲区 - # count = 0 t = time.time() print('开始录音...') # 打印开始录音信息 while time.time() < t + 4: @@ -124,6 +123,7 @@ def speech2text(speech_data, token, dev_pid=1537): """ 音频转文字 """ + #音频格式 FORMAT = 'wav' RATE = '16000' CHANNEL = 1 @@ -139,7 +139,7 @@ def speech2text(speech_data, token, dev_pid=1537): 'token': token, 'dev_pid': dev_pid } - # 语音转文字接口 该接口可能每个人不一样,取决于你需要哪种语音识别功能,本文使用的是 语音识别极速版 + # 语音转文字接口 语音识别极速版 url = 'https://vop.baidu.com/pro_api' headers = {'Content-Type': 'application/json'} # 请求头 diff --git a/src/Guide_stick_system/voice_assistant/snowboydecoder.py b/src/Guide_stick_system/voice_assistant/snowboydecoder.py index 86e001c..1842e32 100644 --- a/src/Guide_stick_system/voice_assistant/snowboydecoder.py +++ b/src/Guide_stick_system/voice_assistant/snowboydecoder.py @@ -18,6 +18,7 @@ DETECT_DING = os.path.join(TOP_DIR, "resources/ding.wav") DETECT_DONG = os.path.join(TOP_DIR, "resources/dong.wav") +# 环形缓冲区,用于保存从PortAudio接收到的音频数据 class RingBuffer(object): """Ring buffer to hold audio from PortAudio""" @@ -80,42 +81,60 @@ class HotwordDetector(object): :param audio_gain: multiply input volume by this factor. """ + #初始化检测器 + #decoder_model:一个列表,包含要检测的唤醒词模型文件路径 + #resource:资源文件路径 + #sensitivity:检测器的灵敏度,一个浮点数或浮点数列表。值越大,检测器越敏感。如果提供了一个空列表,则将使用模型中的默认灵敏度。 + #audio_gain:将输入音量乘以这个因子。 def __init__(self, decoder_model, resource=RESOURCE_FILE, sensitivity=[], audio_gain=1): + # 定义音频回调函数,用于将音频数据添加到环形缓冲区中 def audio_callback(in_data, frame_count, time_info, status): self.ring_buffer.extend(in_data) play_data = chr(0) * len(in_data) return play_data, pyaudio.paContinue + # 判断decoder_model是否为列表,如果不是,则将其转换为列表 tm = type(decoder_model) ts = type(sensitivity) if tm is not list: decoder_model = [decoder_model] if ts is not list: sensitivity = [sensitivity] + # 将decoder_model转换为字符串 model_str = ",".join(decoder_model) + # 创建SnowboyDetect对象,传入资源文件名和模型字符串 self.detector = snowboydetect.SnowboyDetect( resource_filename=resource.encode(), model_str=model_str.encode()) + # 设置音频增益 self.detector.SetAudioGain(audio_gain) + # 获取hotword的数量 self.num_hotwords = self.detector.NumHotwords() + # 如果decoder_model中的模型数量大于1,而sensitivity只有一个,则将sensitivity乘以hotword的数量 if len(decoder_model) > 1 and len(sensitivity) == 1: sensitivity = sensitivity*self.num_hotwords + # 如果sensitivity不为空,则检查hotword的数量是否与sensitivity的数量相等 if len(sensitivity) != 0: assert self.num_hotwords == len(sensitivity), \ "number of hotwords in decoder_model (%d) and sensitivity " \ "(%d) does not match" % (self.num_hotwords, len(sensitivity)) + # 将sensitivity转换为字符串 sensitivity_str = ",".join([str(t) for t in sensitivity]) + # 如果sensitivity不为空,则设置sensitivity if len(sensitivity) != 0: self.detector.SetSensitivity(sensitivity_str.encode()) + # 创建环形缓冲区,用于存储音频数据 self.ring_buffer = RingBuffer( self.detector.NumChannels() * self.detector.SampleRate() * 5) + # 创建PyAudio对象 self.audio = pyaudio.PyAudio() + # 打开音频输入流 self.stream_in = self.audio.open( input=True, output=False, format=self.audio.get_format_from_width( @@ -125,6 +144,10 @@ class HotwordDetector(object): frames_per_buffer=2048, stream_callback=audio_callback) + #启动语音检测器 + #detected_callback:一个函数或函数列表。如果只有一个函数,则表示只有一个模型;如果有多个函数,则表示有多个模型。 + #interrupt_check:一个函数,用于检查是否需要停止主循环。如果返回True,则停止主循环。 + #sleep_time:每个循环等待的时间,单位为秒。 def start(self, detected_callback=play_audio_file, interrupt_check=lambda: False, sleep_time=0.03): @@ -144,40 +167,53 @@ class HotwordDetector(object): :param float sleep_time: how much time in second every loop waits. :return: None """ + # 如果中断检查函数返回True,则返回 if interrupt_check(): logger.debug("detect voice return") return + # 获取detected_callback的类型 tc = type(detected_callback) + # 如果detected_callback不是列表,则将其转换为列表 if tc is not list: detected_callback = [detected_callback] + # 如果detected_callback只有一个元素,但是模型中有多个热词,则将callback重复多次 if len(detected_callback) == 1 and self.num_hotwords > 1: detected_callback *= self.num_hotwords + # 检查热词的数量是否与callback的数量相等 assert self.num_hotwords == len(detected_callback), \ "Error: hotwords in your models (%d) do not match the number of " \ "callbacks (%d)" % (self.num_hotwords, len(detected_callback)) logger.debug("detecting...") + # 循环检测 while True: + # 如果中断检查函数返回True,则跳出循环 if interrupt_check(): logger.debug("detect voice break") break + # 从ring_buffer中获取数据 data = self.ring_buffer.get() + # 如果数据长度为0,则休眠 if len(data) == 0: time.sleep(sleep_time) continue + # 调用detector的RunDetection函数 ans = self.detector.RunDetection(data) + # 如果ans小于0,则输出错误信息 if ans == -1: logger.warning( "Error initializing streams or reading audio data") + # 如果ans大于0,则输出热词检测到的信息 elif ans > 0: message = "Keyword " + str(ans) + " detected at time: " message += time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())) logger.info(message) + # 如果callback不为空,则调用callback函数 callback = detected_callback[ans-1] if callback is not None: callback() @@ -186,9 +222,10 @@ class HotwordDetector(object): def terminate(self): """ - Terminate audio stream. Users cannot call start() again to detect. + 终止音频流。用户不能再次调用start()来检测。 :return: None """ - self.stream_in.stop_stream() - self.stream_in.close() - self.audio.terminate() + self.stream_in.stop_stream() # 停止音频输入流 + self.stream_in.close() # 关闭音频输入流 + self.audio.terminate() # 终止音频模块 +