本文介绍一些 Python 中常用的语音能力的包,以及如何通过调用云服务商的 API 进行语音识别
录音
主要使用 pyaudio 包,它可以以字节流的方式录制/播放音频
安装:
pip install pyaudio
列出可以录音的设备
import pyaudio
p = pyaudio.PyAudio()
# Get the number of audio I/O devices
devices = p.get_device_count()
for i in range(devices):
device_info = p.get_device_info_by_index(i)
if device_info.get('maxInputChannels') > 0:
print(f"{device_info.get('index')}: {device_info.get('name')}")
开始录音 5 秒,这里将录到的音频存到一个 io.BytesIO 对象中
import io
FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=44100 # sample rate
CHUNK=1024 # number of frames per buffer
RECORD_SECONDS = 5
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT, # format of audio samples
channels=CHANNELS, # audio channels(1: mono, 2: stereo)
rate=RATE, # sample rate
frames_per_buffer=CHUNK, # number of frames per buffer
input=True,
)
print("Recording...")
buffer = io.BytesIO()
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS))
data = stream.read(CHUNK)
buffer.write(data)
stream.stop_stream()
stream.close()
p.terminate()
保存音频文件
使用标准库中的 wave 包将音频字节保存到 wav 文件中,它会将 wav 格式写入文件头部,详见文档:The Python Standard Library - wave
import wave
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
print(f"Recording saved as {output}")
如果不需要写到文件中,只希望它帮忙把 wav 文件的头部加到字节流前,可以如下操作
output = io.BytesIO()
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
output.getvalue()
语音识别
腾讯云
腾讯云的语音识别服务有多种,这里只尝试了“一句话识别”和“录音文件识别极速版”
一句话识别
参考文档,将录音字节转为 base64 编码后,通过 API 接口发送,可以得到识别结果
注意,这里的字节数据是要包含对应文件格式文件头的,也就是说如果是上文中 pyaudio 中得到字节流需要先用 wave 模块补上文件头,否则腾讯云接口会报格式识别错误。
import base64
import json
from tencentcloud.common.common_client import CommonClient
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.profile.client_profile import ClientProfile
try:
cred = credential.Credential(SECRET_ID, SECRET_KEY)
httpProfile = HttpProfile()
httpProfile.endpoint = "asr.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
encoded = base64.b64encode(sentence_bytes).decode()
print(encoded)
data = {
'EngSerViceType': '16k_zh',
'SourceType': 1,
'VoiceFormat': 'wav',
'Data': encoded,
'DataLen': len(sentence_bytes)
}
params = json.dumps(data)
common_client = CommonClient("asr", "2019-06-14", cred, "", profile=clientProfile)
print(common_client.call_json("SentenceRecognition", json.loads(params)))
except TencentCloudSDKException as err:
print(err)
录音文件识别极速版
参考官方示例代码,
import requests
import hmac
import hashlib
import base64
import time
import json
from tencentcloud.common.profile.http_profile import HttpProfile
class FlashRecognitionRequest:
def __init__(self, engine_type):
self.engine_type = engine_type
self.speaker_diarization = 0
self.hotword_id = ""
self.customization_id = ""
self.filter_dirty = 0
self.filter_modal = 0
self.filter_punc = 0
self.convert_num_mode = 1
self.word_info = 0
self.voice_format = ""
self.first_channel_only = 1
self.reinforce_hotword = 0
self.sentence_max_length = 0
def set_first_channel_only(self, first_channel_only):
self.first_channel_only = first_channel_only
def set_speaker_diarization(self, speaker_diarization):
self.speaker_diarization = speaker_diarization
def set_filter_dirty(self, filter_dirty):
self.filter_dirty = filter_dirty
def set_filter_modal(self, filter_modal):
self.filter_modal = filter_modal
def set_filter_punc(self, filter_punc):
self.filter_punc = filter_punc
def set_convert_num_mode(self, convert_num_mode):
self.convert_num_mode = convert_num_mode
def set_word_info(self, word_info):
self.word_info = word_info
def set_hotword_id(self, hotword_id):
self.hotword_id = hotword_id
def set_customization_id(self, customization_id):
self.customization_id = customization_id
def set_voice_format(self, voice_format):
self.voice_format = voice_format
def set_sentence_max_length(self, sentence_max_length):
self.sentence_max_length = sentence_max_length
def set_reinforce_hotword(self, reinforce_hotword):
self.reinforce_hotword = reinforce_hotword
class FlashRecognizer:
def __init__(self):
pass
def _format_sign_string(self, param):
signstr = "POSTasr.cloud.tencent.com/asr/flash/v1/"
for t in param:
if 'appid' in t:
signstr += str(t[1])
break
signstr += "?"
for x in param:
tmp = x
if 'appid' in x:
continue
for t in tmp:
signstr += str(t)
signstr += "="
signstr = signstr[:-1]
signstr += "&"
signstr = signstr[:-1]
return signstr
def _build_header(self):
header = dict()
header["Host"] = "asr.cloud.tencent.com"
return header
def _sign(self, signstr, secret_key):
hmacstr = hmac.new(secret_key.encode('utf-8'),
signstr.encode('utf-8'), hashlib.sha1).digest()
s = base64.b64encode(hmacstr)
s = s.decode('utf-8')
return s
def _build_req_with_signature(self, secret_key, params, header):
query = sorted(params.items(), key=lambda d: d[0])
signstr = self._format_sign_string(query)
signature = self._sign(signstr, secret_key)
header["Authorization"] = signature
requrl = "https://"
requrl += signstr[4::]
return requrl
def _create_query_arr(self, req):
query_arr = dict()
query_arr['appid'] = APP_ID
query_arr['secretid'] = SECRET_ID
query_arr['timestamp'] = str(int(time.time()))
query_arr['engine_type'] = req.engine_type
query_arr['voice_format'] = req.voice_format
query_arr['speaker_diarization'] = req.speaker_diarization
query_arr['hotword_id'] = req.hotword_id
query_arr['customization_id'] = req.customization_id
query_arr['filter_dirty'] = req.filter_dirty
query_arr['filter_modal'] = req.filter_modal
query_arr['filter_punc'] = req.filter_punc
query_arr['convert_num_mode'] = req.convert_num_mode
query_arr['word_info'] = req.word_info
query_arr['first_channel_only'] = req.first_channel_only
query_arr['reinforce_hotword'] = req.reinforce_hotword
query_arr['sentence_max_length'] = req.sentence_max_length
return query_arr
def recognize(self, req, data):
header = self._build_header()
query_arr = self._create_query_arr(req)
req_url = self._build_req_with_signature(SECRET_KEY, query_arr, header)
r = requests.post(req_url, headers=header, data=data)
return r.text
recognizer = FlashRecognizer()
# 新建识别请求
req = FlashRecognitionRequest('16k_zh')
req.set_filter_modal(0)
req.set_filter_punc(0)
req.set_filter_dirty(0)
req.set_voice_format("wav")
req.set_word_info(0)
req.set_convert_num_mode(1)
#执行识别
resultData = recognizer.recognize(req, audio_bytes)
resp = json.loads(resultData)
request_id = resp["request_id"]
code = resp["code"]
if code != 0:
print("recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"])
return ''
#一个channl_result对应一个声道的识别结果
#大多数音频是单声道,对应一个channl_result
try:
result = resp["flash_result"][0]['text']
except Exception as e:
print(f'parse error: {e}')
与“一句话识别”类似,上传数据同样需要带上对应格式的文件头
科大讯飞
这里试用了科大讯飞的实时语音转写接口,通过 websocket 的方式,推送字节流到 websocket server,并接受识别结果
这里参考了官方示例,并用 async/await 方式改写了程序
from datetime import datetime
import time
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import asyncio
import base64
import hashlib
import hmac
import websockets
import json
STATUS_FIRST_FRAME = 0 # 第一帧的标识
STATUS_CONTINUE_FRAME = 1 # 中间帧标识
STATUS_LAST_FRAME = 2 # 最后一帧的标识
def get_url(app_key: str, app_secret: str) -> str:
url = 'wss://ws-api.xfyun.cn/v2/iat'
# 生成RFC1123格式的时间戳
now = datetime.now()
date = format_date_time(time.mktime(now.timetuple()))
# 拼接字符串
signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
signature_origin += "date: " + date + "\n"
signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
# 进行hmac-sha256进行加密
signature_sha = hmac.new(app_secret.encode('utf-8'), signature_origin.encode('utf-8'),
digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
app_key, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# 将请求的鉴权参数组合为字典
v = {
"authorization": authorization,
"date": date,
"host": "ws-api.xfyun.cn"
}
# 拼接鉴权参数,生成url
url = url + '?' + urlencode(v)
return url
async def iflytek_recognition(data: bytes):
url = get_url(app_key=APP_KEY, app_secret=APP_SECRET)
async with websockets.connect(url) as ws:
frameSize = 8000 # 每一帧的音频大小
intervel = 0.04 # 发送音频间隔(单位:s)
status = STATUS_FIRST_FRAME # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
common_args = {"app_id": APP_ID}
business_args = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1, "vad_eos":10000}
i = 0
while True:
buf = data[i*frameSize:(i+1)*frameSize]
i += 1
print(len(buf))
# 文件结束
if not buf:
status = STATUS_LAST_FRAME
# 第一帧处理
# 发送第一帧音频,带business 参数
# appid 必须带上,只需第一帧发送
if status == STATUS_FIRST_FRAME:
d = {"common": common_args,
"business": business_args,
"data": {"status": 0, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}}
d = json.dumps(d)
await ws.send(d)
status = STATUS_CONTINUE_FRAME
# 中间帧处理
elif status == STATUS_CONTINUE_FRAME:
d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}}
await ws.send(json.dumps(d))
# 最后一帧处理
elif status == STATUS_LAST_FRAME:
d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
"audio": str(base64.b64encode(buf), 'utf-8'),
"encoding": "raw"}}
await ws.send(json.dumps(d))
break
# 模拟音频采样间隔
time.sleep(intervel)
message = await ws.recv()
result = ''
try:
code = json.loads(message)["code"]
sid = json.loads(message)["sid"]
if code != 0:
errMsg = json.loads(message)["message"]
print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
else:
data = json.loads(message)["data"]["result"]["ws"]
# print(json.loads(message))
for i in data:
for w in i["cw"]:
result += w["w"]
print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False)))
except Exception as e:
print("receive msg,but parse exception:", e)
return result
async def main():
with open('sample.wav', 'rb') as f:
await iflytek_recognition(f.read())
一个简单 Demo
这里写一个通过键盘空格键控制录音开关,并打印语音识别结果的 demo
键盘监控使用 pynput 包,它通过一个线程监听键盘事件并响应
安装
pip install pynput
完整代码如下
import pyaudio
import wave
from pynput import keyboard
import threading
import io
from list_devices import list_devices
from tencent import sentence_recognition, flash_recognition
from iflytek import iflytek_recognition
import asyncio
FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=16000 # sample rate
CHUNK=1024 # number of frames per buffer
is_recording = False
device_index = 0
def save(data: bytes, output: str):
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
print(f"Recording saved as {output}")
def to_wav_bytes(data: bytes) -> bytes:
output = io.BytesIO()
with wave.open(output, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
wf.setframerate(RATE)
wf.writeframes(data)
return output.getvalue()
def do_record(device_index: int, output: str):
global is_recording
print("Recording...")
p = pyaudio.PyAudio()
buffer = io.BytesIO()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
frames_per_buffer=CHUNK,
input=True,
input_device_index=device_index,
)
while is_recording:
data = stream.read(CHUNK)
buffer.write(data)
stream.stop_stream()
stream.close()
print("Finished.")
p.terminate()
data = buffer.getvalue()
# result = sentence_recognition(to_wav_bytes(data))
result = flash_recognition(to_wav_bytes(data))
# result = asyncio.run(iflytek_recognition(data))
print(result)
save(data, output)
def on_release(key):
global is_recording, device_index
try:
if key == keyboard.Key.space:
if not is_recording:
is_recording = True
threading.Thread(target=do_record, args=(device_index, 'output.wav')).start()
else:
is_recording = False
elif key == keyboard.Key.esc:
is_recording = False
# Stop listener
return False
except AttributeError:
print('special key {0} pressed'.format(
key))
def main():
global device_index
list_devices()
device_index = int(input("Please select input device:"))
with keyboard.Listener(on_release=on_release) as listener:
listener.join()
if __name__ == '__main__':
main()
尝试下来,腾讯云的录音文件识别极速版速度非常快,符合日常简单应用的语音入口的场景