调用科大讯飞语音听写功能,通过Python实现语音识别,将实时语音转换为文字。
为了完成这一过程,我在科大讯飞官网上下载了语音听写的SDK,并创建了几个Python脚本和文件夹。具体来说,我创建了两个Python文件,get_audio.py
和 iat_demo.py
,以及一个名为audios
的文件夹来存储录音文件。整个文件结构如下:
Python资源共享群:626017123
asr_SDK
├─ Readme.html
├─ audios
│ └─ input.wav
├─ bin
│ ├─ gm_continuous_digit.abnf
│ ├─ ise_cn
│ ├─ ise_en
│ ├─ msc
│ ├─ msc.dll
│ ├─ msc_x64.dll
│ ├─ source.txt
│ ├─ userwords.txt
│ └─ wav
├─ doc
├─ get_audio.py
├─ iat_demo.py
├─ include
├─ libs
├─ release.txt
└─ samples
这里使用了pyaudio
库来录制音频。具体步骤包括下载必要的依赖库,并根据需要进行修改。get_audio.py
的完整代码如下:
```python import pyaudio import wave
in_path = "./audios/input.wav"
def getaudio(filepath): aa = input("是否开始录音? (y/n) ") if aa.lower() == "y": CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 11025 RECORDSECONDS = 5 WAVEOUTPUTFILENAME = filepath
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK)
print("*" * 5, "开始录音:请在5秒内输入语音", "*" * 5)
frames = []
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream.read(CHUNK)
frames.append(data)
print("*" * 5, "录音结束n")
stream.stop_stream()
stream.close()
p.terminate()
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
elif aa.lower() == "n":
exit()
else:
print("语音录入失败,请重新开始")
get_audio(in_path)
```
这段代码可以循环执行,每次重新录音都会覆盖之前的音频文件。
接下来,我们利用科大讯飞的语音听写Web API进行语音识别。iat_demo.py
的完整代码如下:
```python import websocket import requests import datetime import hashlib import base64 import hmac import json import os import re import ssl import wave from urllib.parse import urlencode import logging import time
class Ws_Param(object): def init(self, host): self.Host = host self.HttpProto = "HTTP/1.1" self.HttpMethod = "GET" self.RequestUri = "/v2/iat" self.APPID = "5d312675" self.Algorithm = "hmac-sha256" self.url = f"wss://{self.Host}{self.RequestUri}"
def create_url(self):
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
APIKey = 'a6aabfcca4ae28f9b6a448f705b7e432'
APISecret = 'e649956e14eeb085d1b0dce77a671131'
signature_origin = f"host: {self.Host}ndate: {date}nGET {self.RequestUri} HTTP/1.1"
signature_sha = hmac.new(APISecret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode('utf-8')
authorization_origin = f'api_key="{APIKey}", algorithm="{self.Algorithm}", headers="host date request-line", signature="{signature_sha}"'
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode('utf-8')
v = {
"authorization": authorization,
"date": date,
"host": self.Host
}
return self.url + '?' + urlencode(v)
def on_message(ws, message): msg = json.loads(message) try: code = msg["code"] sid = msg["sid"]
if code != 0:
errMsg = msg["message"]
print(f"sid:{sid} call error:{errMsg} code is:{code}")
else:
result = msg["data"]["result"]["ws"]
print(f"sid:{sid} call success!")
print(f"result is:{json.dumps(result, ensure_ascii=False, sort_keys=True, indent=4)}")
except Exception as e:
print("接收消息,但解析异常:", e)
def on_error(ws, error): print("### error:", error)
def on_close(ws): print("### closed ###")
def on_open(ws): def run(*args): frameSize = 1280 intervel = 0.04 status = 0
with open(wsParam.AudioFile, "rb") as fp:
while True:
buf = fp.read(frameSize)
if not buf:
status = 2
elif status == 0:
d = {
"common": wsParam.CommonArgs,
"business": wsParam.BusinessArgs,
"data": {"status": 0, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}
}
ws.send(json.dumps(d))
status = 1
elif status == 1:
d = {
"data": {"status": 1, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}
}
ws.send(json.dumps(d))
elif status == 2:
d = {
"data": {"status": 2, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}
}
ws.send(json.dumps(d))
time.sleep(1)
break
time.sleep(intervel)
ws.close()
thread.start_new_thread(run, ())
if name == "main": wsParam = WsParam("ws-api.xfyun.cn") websocket.enableTrace(False) wsUrl = wsParam.createurl() ws = websocket.WebSocketApp(wsUrl, onmessage=onmessage, onerror=onerror, onclose=onclose) ws.onopen = onopen ws.runforever(sslopt={"certreqs": ssl.CERT_NONE}) ```
在程序文件夹内,右键点击iat_demo
,选择Edit with IDLE -> Edit with IDLE3.7(32 bit)
打开。然后按下F5键快速启动。如果启动过程中提示缺少某些第三方库,请自行安装。启动成功后,根据提示操作即可。
这是我修改后的版本,希望对你有所帮助。