本文介绍一些 Python 中常用的语音能力的包,以及如何通过调用云服务商的 API 进行语音识别

录音

主要使用 pyaudio 包,它可以以字节流的方式录制/播放音频

安装:

pip install pyaudio

列出可以录音的设备

import pyaudio

p = pyaudio.PyAudio()

# Get the number of audio I/O devices
devices = p.get_device_count()

for i in range(devices):
    device_info = p.get_device_info_by_index(i)
    if device_info.get('maxInputChannels') > 0:
		print(f"{device_info.get('index')}: {device_info.get('name')}")

开始录音 5 秒,这里将录到的音频存到一个 io.BytesIO 对象中

import io

FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=44100 # sample rate
CHUNK=1024 # number of frames per buffer
RECORD_SECONDS = 5

p = pyaudio.PyAudio()
stream = p.open(
    format=FORMAT, # format of audio samples
    channels=CHANNELS, # audio channels(1: mono, 2: stereo)
    rate=RATE, # sample rate
    frames_per_buffer=CHUNK, # number of frames per buffer
    input=True,
)

print("Recording...")
buffer = io.BytesIO()
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS))
    data = stream.read(CHUNK)
    buffer.write(data)

stream.stop_stream()
stream.close()
p.terminate()

保存音频文件

使用标准库中的 wave 包将音频字节保存到 wav 文件中,它会将 wav 格式写入文件头部,详见文档:The Python Standard Library - wave

import wave

with wave.open(output, 'wb') as wf:
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
    wf.setframerate(RATE)
    wf.writeframes(data)
print(f"Recording saved as {output}")

如果不需要写到文件中,只希望它帮忙把 wav 文件的头部加到字节流前,可以如下操作

output = io.BytesIO()
with wave.open(output, 'wb') as wf:
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
    wf.setframerate(RATE)
    wf.writeframes(data)
output.getvalue()

语音识别

腾讯云

腾讯云的语音识别服务有多种,这里只尝试了“一句话识别”和“录音文件识别极速版”

一句话识别

参考文档,将录音字节转为 base64 编码后,通过 API 接口发送,可以得到识别结果

注意,这里的字节数据是要包含对应文件格式文件头的,也就是说如果是上文中 pyaudio 中得到字节流需要先用 wave 模块补上文件头,否则腾讯云接口会报格式识别错误。

import base64
import json

from tencentcloud.common.common_client import CommonClient
from tencentcloud.common import credential
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.profile.client_profile import ClientProfile

try:
    cred = credential.Credential(SECRET_ID, SECRET_KEY)

    httpProfile = HttpProfile()
    httpProfile.endpoint = "asr.tencentcloudapi.com"
    clientProfile = ClientProfile()
    clientProfile.httpProfile = httpProfile

    encoded = base64.b64encode(sentence_bytes).decode()
    print(encoded)

    data = {
        'EngSerViceType': '16k_zh',
        'SourceType': 1,
        'VoiceFormat': 'wav',
        'Data': encoded,
        'DataLen': len(sentence_bytes)
    }
    params = json.dumps(data)
    common_client = CommonClient("asr", "2019-06-14", cred, "", profile=clientProfile)
    print(common_client.call_json("SentenceRecognition", json.loads(params)))
except TencentCloudSDKException as err:
    print(err)

录音文件识别极速版

参考官方示例代码

import requests
import hmac
import hashlib
import base64
import time
import json
from tencentcloud.common.profile.http_profile import HttpProfile

class FlashRecognitionRequest:
    def __init__(self, engine_type):
        self.engine_type = engine_type
        self.speaker_diarization = 0
        self.hotword_id = ""
        self.customization_id = ""
        self.filter_dirty = 0
        self.filter_modal = 0
        self.filter_punc = 0
        self.convert_num_mode = 1
        self.word_info = 0
        self.voice_format = ""
        self.first_channel_only = 1
        self.reinforce_hotword = 0
        self.sentence_max_length = 0

    def set_first_channel_only(self, first_channel_only):
        self.first_channel_only = first_channel_only

    def set_speaker_diarization(self, speaker_diarization):
        self.speaker_diarization = speaker_diarization

    def set_filter_dirty(self, filter_dirty):
        self.filter_dirty = filter_dirty

    def set_filter_modal(self, filter_modal):
        self.filter_modal = filter_modal

    def set_filter_punc(self, filter_punc):
        self.filter_punc = filter_punc

    def set_convert_num_mode(self, convert_num_mode):
        self.convert_num_mode = convert_num_mode

    def set_word_info(self, word_info):
        self.word_info = word_info

    def set_hotword_id(self, hotword_id):
        self.hotword_id = hotword_id

    def set_customization_id(self, customization_id):
        self.customization_id = customization_id

    def set_voice_format(self, voice_format):
        self.voice_format = voice_format

    def set_sentence_max_length(self, sentence_max_length):
        self.sentence_max_length = sentence_max_length

    def set_reinforce_hotword(self, reinforce_hotword):
        self.reinforce_hotword = reinforce_hotword
        
class FlashRecognizer:
    def __init__(self):
        pass

    def _format_sign_string(self, param):
        signstr = "POSTasr.cloud.tencent.com/asr/flash/v1/"
        for t in param:
            if 'appid' in t:
                signstr += str(t[1])
                break
        signstr += "?"
        for x in param:
            tmp = x
            if 'appid' in x:
                continue
            for t in tmp:
                signstr += str(t)
                signstr += "="
            signstr = signstr[:-1]
            signstr += "&"
        signstr = signstr[:-1]
        return signstr

    def _build_header(self):
        header = dict()
        header["Host"] = "asr.cloud.tencent.com"
        return header

    def _sign(self, signstr, secret_key):
        hmacstr = hmac.new(secret_key.encode('utf-8'),
                           signstr.encode('utf-8'), hashlib.sha1).digest()
        s = base64.b64encode(hmacstr)
        s = s.decode('utf-8')
        return s

    def _build_req_with_signature(self, secret_key, params, header):
        query = sorted(params.items(), key=lambda d: d[0])
        signstr = self._format_sign_string(query)
        signature = self._sign(signstr, secret_key)
        header["Authorization"] = signature
        requrl = "https://"
        requrl += signstr[4::]
        return requrl

    def _create_query_arr(self, req):
        query_arr = dict()
        query_arr['appid'] = APP_ID
        query_arr['secretid'] = SECRET_ID
        query_arr['timestamp'] = str(int(time.time()))
        query_arr['engine_type'] = req.engine_type
        query_arr['voice_format'] = req.voice_format
        query_arr['speaker_diarization'] = req.speaker_diarization
        query_arr['hotword_id'] = req.hotword_id
        query_arr['customization_id'] = req.customization_id
        query_arr['filter_dirty'] = req.filter_dirty
        query_arr['filter_modal'] = req.filter_modal
        query_arr['filter_punc'] = req.filter_punc
        query_arr['convert_num_mode'] = req.convert_num_mode
        query_arr['word_info'] = req.word_info
        query_arr['first_channel_only'] = req.first_channel_only
        query_arr['reinforce_hotword'] = req.reinforce_hotword
        query_arr['sentence_max_length'] = req.sentence_max_length
        return query_arr

    def recognize(self, req, data):
        header = self._build_header()
        query_arr = self._create_query_arr(req)
        req_url = self._build_req_with_signature(SECRET_KEY, query_arr, header)
        r = requests.post(req_url, headers=header, data=data)
        return r.text


recognizer = FlashRecognizer()

# 新建识别请求
req = FlashRecognitionRequest('16k_zh')
req.set_filter_modal(0)
req.set_filter_punc(0)
req.set_filter_dirty(0)
req.set_voice_format("wav")
req.set_word_info(0)
req.set_convert_num_mode(1)

#执行识别
resultData = recognizer.recognize(req, audio_bytes)
resp = json.loads(resultData)
request_id = resp["request_id"]
code = resp["code"]
if code != 0:
    print("recognize faild! request_id: ", request_id, " code: ", code, ", message: ", resp["message"])
    return ''

#一个channl_result对应一个声道的识别结果
#大多数音频是单声道,对应一个channl_result
try:
    result = resp["flash_result"][0]['text']
except Exception as e:
    print(f'parse error: {e}')

与“一句话识别”类似,上传数据同样需要带上对应格式的文件头

科大讯飞

这里试用了科大讯飞的实时语音转写接口,通过 websocket 的方式,推送字节流到 websocket server,并接受识别结果

这里参考了官方示例,并用 async/await 方式改写了程序

from datetime import datetime
import time
from urllib.parse import urlencode
from wsgiref.handlers import format_date_time
import asyncio
import base64
import hashlib
import hmac
import websockets
import json

STATUS_FIRST_FRAME = 0  # 第一帧的标识
STATUS_CONTINUE_FRAME = 1  # 中间帧标识
STATUS_LAST_FRAME = 2  # 最后一帧的标识

def get_url(app_key: str, app_secret: str) -> str:
    url = 'wss://ws-api.xfyun.cn/v2/iat'
    # 生成RFC1123格式的时间戳
    now = datetime.now()
    date = format_date_time(time.mktime(now.timetuple()))

    # 拼接字符串
    signature_origin = "host: " + "ws-api.xfyun.cn" + "\n"
    signature_origin += "date: " + date + "\n"
    signature_origin += "GET " + "/v2/iat " + "HTTP/1.1"
    # 进行hmac-sha256进行加密
    signature_sha = hmac.new(app_secret.encode('utf-8'), signature_origin.encode('utf-8'),
                                digestmod=hashlib.sha256).digest()
    signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')

    authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
        app_key, "hmac-sha256", "host date request-line", signature_sha)
    authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
    # 将请求的鉴权参数组合为字典
    v = {
        "authorization": authorization,
        "date": date,
        "host": "ws-api.xfyun.cn"
    }
        # 拼接鉴权参数,生成url
    url = url + '?' + urlencode(v)
    return url

async def iflytek_recognition(data: bytes):
    url = get_url(app_key=APP_KEY, app_secret=APP_SECRET)
    async with websockets.connect(url) as ws:
        frameSize = 8000  # 每一帧的音频大小
        intervel = 0.04  # 发送音频间隔(单位:s)
        status = STATUS_FIRST_FRAME  # 音频的状态信息,标识音频是第一帧,还是中间帧、最后一帧
        
        common_args = {"app_id": APP_ID}
        business_args = {"domain": "iat", "language": "zh_cn", "accent": "mandarin", "vinfo":1, "vad_eos":10000}
        
        i = 0
        while True:
            buf = data[i*frameSize:(i+1)*frameSize]
            i += 1
            print(len(buf))
            # 文件结束
            if not buf:
                status = STATUS_LAST_FRAME
            # 第一帧处理
            # 发送第一帧音频,带business 参数
            # appid 必须带上,只需第一帧发送
            if status == STATUS_FIRST_FRAME:

                d = {"common": common_args,
                        "business": business_args,
                        "data": {"status": 0, "format": "audio/L16;rate=16000",
                                "audio": str(base64.b64encode(buf), 'utf-8'),
                                "encoding": "raw"}}
                d = json.dumps(d)
                await ws.send(d)
                status = STATUS_CONTINUE_FRAME
            # 中间帧处理
            elif status == STATUS_CONTINUE_FRAME:
                d = {"data": {"status": 1, "format": "audio/L16;rate=16000",
                                "audio": str(base64.b64encode(buf), 'utf-8'),
                                "encoding": "raw"}}
                await ws.send(json.dumps(d))
            # 最后一帧处理
            elif status == STATUS_LAST_FRAME:
                d = {"data": {"status": 2, "format": "audio/L16;rate=16000",
                                "audio": str(base64.b64encode(buf), 'utf-8'),
                                "encoding": "raw"}}
                await ws.send(json.dumps(d))
                break
            # 模拟音频采样间隔
            time.sleep(intervel)
        message = await ws.recv()
    result = ''
    try:
        code = json.loads(message)["code"]
        sid = json.loads(message)["sid"]
        if code != 0:
            errMsg = json.loads(message)["message"]
            print("sid:%s call error:%s code is:%s" % (sid, errMsg, code))
        else:
            data = json.loads(message)["data"]["result"]["ws"]
            # print(json.loads(message))
            for i in data:
                for w in i["cw"]:
                    result += w["w"]
            print("sid:%s call success!,data is:%s" % (sid, json.dumps(data, ensure_ascii=False)))
    except Exception as e:
        print("receive msg,but parse exception:", e)
    return result

async def main():
    with open('sample.wav', 'rb') as f:
        await iflytek_recognition(f.read())

一个简单 Demo

这里写一个通过键盘空格键控制录音开关,并打印语音识别结果的 demo

键盘监控使用 pynput 包,它通过一个线程监听键盘事件并响应

安装

pip install pynput

完整代码如下

import pyaudio
import wave
from pynput import keyboard
import threading
import io

from list_devices import list_devices
from tencent import sentence_recognition, flash_recognition
from iflytek import iflytek_recognition
import asyncio

FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=16000 # sample rate
CHUNK=1024 # number of frames per buffer

is_recording = False
device_index = 0

def save(data: bytes, output: str):
    with wave.open(output, 'wb') as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
        wf.setframerate(RATE)
        wf.writeframes(data)
    print(f"Recording saved as {output}")


def to_wav_bytes(data: bytes) -> bytes:
    output = io.BytesIO()
    with wave.open(output, 'wb') as wf:
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(pyaudio.get_sample_size(pyaudio.paInt16))
        wf.setframerate(RATE)
        wf.writeframes(data)
    return output.getvalue()

def do_record(device_index: int, output: str):
    global is_recording
    print("Recording...")
    p = pyaudio.PyAudio()
    buffer = io.BytesIO()
    stream = p.open(
        format=FORMAT,
        channels=CHANNELS,
        rate=RATE,
        frames_per_buffer=CHUNK,
        input=True,
        input_device_index=device_index,
    )
        
    while is_recording:
        data = stream.read(CHUNK)
        buffer.write(data)
    stream.stop_stream()
    stream.close()
        
        
    print("Finished.")
    p.terminate()
    
    
    data = buffer.getvalue()
    # result = sentence_recognition(to_wav_bytes(data))
    result = flash_recognition(to_wav_bytes(data))
    # result = asyncio.run(iflytek_recognition(data))
    print(result)
    save(data, output)
    

def on_release(key):
    global is_recording, device_index
    try:
        if key == keyboard.Key.space:
            if not is_recording:
                is_recording = True
                threading.Thread(target=do_record, args=(device_index, 'output.wav')).start()
            else:
                is_recording = False
        elif key == keyboard.Key.esc:
            is_recording = False
            # Stop listener
            return False
            
    except AttributeError:
        print('special key {0} pressed'.format(
            key))

def main():
    global device_index
    list_devices()
    device_index = int(input("Please select input device:"))
    with keyboard.Listener(on_release=on_release) as listener:
        listener.join()


if __name__ == '__main__':
    main()

尝试下来,腾讯云的录音文件识别极速版速度非常快,符合日常简单应用的语音入口的场景