import websocket import hashlib import base64 import hmac import json from urllib.parse import urlencode import time import ssl from wsgiref.handlers import format_date_time from datetime import datetime from time import mktime import _thread as thread import pyaudio class RealtimeTranscriber: pass STATUS_FIRST_FRAME = 0 # 第一帧的标识 STATUS_CONTINUE_FRAME = 1 # 中间帧标识 STATUS_LAST_FRAME = 2 # 最后一帧的标识 class Ws_Param(object): # 初始化 def __init__(self, APPID, APIKey, APISecret, AudioFile): self.APPID = APPID self.APIKey = APIKey self.APISecret = APISecret self.AudioFile = AudioFile # 公共参数(common) self.CommonArgs = {"app_id": self.APPID} # 业务参数(business),更多个性化参数可在官网查看 self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":10000} # 生成url def create_url(self): url = 'wss://ws-api.xfyun.cn/v2/iat' # 生成RFC1123格式的时间戳 now = datetime.now() date = format_date_time(mktime(now.timetuple())) # 拼接字符串 signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" signature_origin += "date: " + date + "\n" signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" # 进行hmac-sha256进行加密 signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest() signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( self.APIKey, "hmac-sha256", "host date request-line", signature_sha) authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') # 将请求的鉴权参数组合为字典 v = { "authorization": authorization, "date": date, "host": "ws-api.xfyun.cn" } # 拼接鉴权参数,生成url url = url + '?' + urlencode(v) # print("date: ",date) # print("v: ",v) # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 # print('websocket url :', url) return url # 收到websocket消息的处理 def on_message(ws, message): try: code = json.loads(message)["code"] sid = json.loads(message)["sid"] if code != 0: errMsg = json.loads(message)["message"] print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) else: data = json.loads(message)["data"]["result"]["ws"] # print(json.loads(message)) result = "" for i in data: for w in i["cw"]: result += w["w"] if result == "。" or result == ".。" or result == ' .。' or result == ' 。': pass else: t.insert(END,result) print("翻译结果:" % (result)) except Exception as e: print("receive msg,but parse exception:", e) # 收到websocket错误的处理 def on_error(ws, error): print("### error:", error) # 收到websocket关闭的处理 def on_close(ws,a,b): print("### closed ###") # 收到websocket连接建立的处理 def on_open(ws): def run(*args): status = STATUS_FIRST_FRAME # 每一帧的音频大小 CHUNK = 520 # 发送音频间隔(单位:s) FORMAT=pyaudio.paInt16 CHANNELS=1 RATE=16000 p=pyaudio.PyAudio() stream=p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 print("-------Start Recording--------") for i in range(0, int(RATE / CHUNK * 60)): buf = stream.read(CHUNK) if not buf: status = STATUS_LAST_FRAME # 第一帧处理 # 发送第一帧音频,带business 参数 # appid 必须带上,只需第一帧发送 if status == STATUS_FIRST_FRAME: d = {"common": wsParam.CommonArgs, "business": wsParam.BusinessArgs, "data": {"status": 0, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} d = json.dumps(d) ws.send(d) status = STATUS_CONTINUE_FRAME # 中间帧处理 elif status == STATUS_CONTINUE_FRAME: d = {"data": {"status": 1, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) # 最后一帧处理 elif status == STATUS_LAST_FRAME: d = {"data": {"status": 2, "format": "audio/L16;rate=16000", "audio": str(base64.b64encode(buf), 'utf-8'), "encoding": "raw"}} ws.send(json.dumps(d)) time.sleep(1) break # 模拟音频采样间隔 stream.stop_stream() stream.close() p.terminate() ws.close() thread.start_new_thread(run, ()) def run(): global wsParam # 测试时候在此处正确填写相关信息即可运行 wsParam = Ws_Param(APPID='c68c13ed', APISecret='NzkzNGE1OTNhMjdmODg2ZDdhYTJjNzc1', APIKey='3312b6aeb828b7b0a693f896a9ee3b2b', AudioFile=r'') websocket.enableTrace(False) wsUrl = wsParam.create_url() ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) ws.on_open = on_open ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) from tkinter import * import threading import tkinter def thread_it(func,*args): t=threading.Thread(target=func,args=args) t.daemon = True t.start() root =Tk() t=Text(root) t.pack() tkinter.Button(root,text='识别',command=lambda :thread_it(run,)).pack() root.mainloop()