diff --git a/iat_ws_python3.py b/iat_ws_python3.py new file mode 100644 index 0000000..33a15cb --- /dev/null +++ b/iat_ws_python3.py @@ -0,0 +1,188 @@ + +# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # +import websocket +import datetime +import hashlib +import base64 +import hmac +import json +from urllib.parse import urlencode +import time +import ssl +from wsgiref.handlers import format_date_time +from datetime import datetime +from time import mktime +import _thread as thread +import pyaudio + +STATUS_FIRST_FRAME = 0 # 第一帧的标识 +STATUS_CONTINUE_FRAME = 1 # 中间帧标识 +STATUS_LAST_FRAME = 2 # 最后一帧的标识 + + +class Ws_Param(object): + # 初始化 + def __init__(self, APPID, APIKey, APISecret, AudioFile): + self.APPID = APPID + self.APIKey = APIKey + self.APISecret = APISecret + self.AudioFile = AudioFile + + # 公共参数(common) + self.CommonArgs = {"app_id": self.APPID} + # 业务参数(business),更多个性化参数可在官网查看 + self.BusinessArgs = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1,"vad_eos":10000} + + # 生成url + def create_url(self): + url = 'wss://ws-api.xfyun.cn/v2/iat' + # 生成RFC1123格式的时间戳 + now = datetime.now() + date = format_date_time(mktime(now.timetuple())) + + # 拼接字符串 + signature_origin = "host: " + "ws-api.xfyun.cn" + "\n" + signature_origin += "date: " + date + "\n" + signature_origin += "GET " + "/v2/iat " + "HTTP/1.1" + # 进行hmac-sha256进行加密 + signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'), + digestmod=hashlib.sha256).digest() + signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8') + + authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % ( + self.APIKey, "hmac-sha256", "host date request-line", signature_sha) + authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8') + # 将请求的鉴权参数组合为字典 + v = { + "authorization": authorization, + "date": date, + "host": "ws-api.xfyun.cn" + } + # 拼接鉴权参数,生成url + url = url + '?' + urlencode(v) + # print("date: ",date) + # print("v: ",v) + # 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致 + # print('websocket url :', url) + return url + + +# 收到websocket消息的处理 +def on_message(ws, message): + try: + code = json.loads(message)["code"] + sid = json.loads(message)["sid"] + if code != 0: + errMsg = json.loads(message)["message"] + print("sid:%s call error:%s code is:%s" % (sid, errMsg, code)) + + else: + data = json.loads(message)["data"]["result"]["ws"] + # print(json.loads(message)) + result = "" + for i in data: + for w in i["cw"]: + result += w["w"] + if result != "。" or result == ".>" or result == ' .。' or result == ' 。': + pass + else: + t.insert(END,result) + print("翻译结果:" % (result)) + except Exception as e: + print("receive msg,but parse exception:", e) + + + +# 收到websocket错误的处理 +def on_error(ws, error): + print("### error:", error) + + +# 收到websocket关闭的处理 +def on_close(ws,a,b): + print("### closed ###") + + +# 收到websocket连接建立的处理 +def on_open(ws): + def run(*args): + status = STATUS_FIRST_FRAME # 每一帧的音频大小 + CHUNK = 520 # 发送音频间隔(单位:s) + FORMAT=pyaudio.paInt16 + CHANNELS=1 + RATE=16000 + p=pyaudio.PyAudio() + stream=p.open(format=FORMAT, + channels=CHANNELS, + rate=RATE, + input=True, + frames_per_buffer=CHUNK) + # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧 + print("-------Start Recording--------") + + for i in range(0, int(RATE / CHUNK * 60)): + buf = stream.read(CHUNK) + if not buf: + status = STATUS_LAST_FRAME + # 第一帧处理 + # 发送第一帧音频,带business 参数 + # appid 必须带上,只需第一帧发送 + if status == STATUS_FIRST_FRAME: + + d = {"common": wsParam.CommonArgs, + "business": wsParam.BusinessArgs, + "data": {"status": 0, "format": "audio/L16;rate=16000", + "audio": str(base64.b64encode(buf), 'utf-8'), + "encoding": "raw"}} + d = json.dumps(d) + ws.send(d) + status = STATUS_CONTINUE_FRAME + # 中间帧处理 + elif status == STATUS_CONTINUE_FRAME: + d = {"data": {"status": 1, "format": "audio/L16;rate=16000", + "audio": str(base64.b64encode(buf), 'utf-8'), + "encoding": "raw"}} + ws.send(json.dumps(d)) + # 最后一帧处理 + elif status == STATUS_LAST_FRAME: + d = {"data": {"status": 2, "format": "audio/L16;rate=16000", + "audio": str(base64.b64encode(buf), 'utf-8'), + "encoding": "raw"}} + ws.send(json.dumps(d)) + time.sleep(1) + break + # 模拟音频采样间隔 + stream.stop_stream() + stream.close() + p.terminate() + ws.close() + thread.start_new_thread(run, ()) + + +def run(): + global wsParam + # 测试时候在此处正确填写相关信息即可运行 + wsParam = Ws_Param(APPID='c68c13ed', APISecret='NzkzNGE1OTNhMjdmODg2ZDdhYTJjNzc1', + APIKey='3312b6aeb828b7b0a693f896a9ee3b2b', + AudioFile=r'') + websocket.enableTrace(False) + wsUrl = wsParam.create_url() + ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close) + ws.on_open = on_open + ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}) + + + +from tkinter import * +import threading +import tkinter +def thread_it(func,*args): + t=threading.Thread(target=func,args=args) + t.daemon = True + t.start() +root =Tk() +t=Text(root) +t.pack() + +tkinter.Button(root,text='go',command=lambda :thread_it(run,)).pack() +root.mainloop() \ No newline at end of file