任务调度

schedule

install

pip install schedule

usage

import schedule

# add schedule job
schedule.every(10).seconds.do(lambda: print("running"))

# run scheduler
while True:
    schedule.run_pending()
    time.sleep(1)

add job with parameters

def func(name: str):
    print(f"My name is {name}")
    
schedule.every(5).seconds.do(func, name="Tom")
while True:
    schedule.run_pending()
    time.sleep(1)

Apscheduler

Install

pip install apscheduler

Triggers:任务触发逻辑

  • cron:cron 格式触发
  • interval:固定时间间隔触发
  • date:在某固定日期触发一次
  • combine:组合条件触发

Scheduler

  • BlockingScheduler: 阻塞式,当程序只运行这个 scheduler 时使用
  • BackgroundScheduler:调度器在后台运行

Executor

  • ThreadPoolExecutor:默认使用多线程执行器
  • ProcessPoolExecutor:如果是 CPU 密集型任务可以使用多进程执行器

Job store:如果任务调度信息存在内存中,当程序退出后会丢失,可以其他存储器进行持久化存储

  • MemoryJobStore: 默认使用内存存储
  • SQLAlchemyJobStore
  • MongoDBJobStore
  • etc.

创建 scheduler

# 创建 BlockingScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()

# 创建 BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()

# 自定义 job store、executor、job defaults、time zone
import pytz
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

scheduler = BackgroundScheduler(
    jobstores={'mongo': MongoDBJobStore()},
    executors={'processpool': ProcessPoolExecutor(5)},
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    },
    timezone=pytz.utc
)
    

新增任务

from datetime import datetime
def tick():
    print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

# add job with cron trigger
scheduler.add_job(tick, "cron", second=0) # every minute at second 0

先创建 triggers 再创建任务

from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

c_trigger = CronTrigger(second=0) # every minute at second 0
i_trigger = IntervalTrigger(seconds=10) # every 10 seconds

scheduler.add_job(tick, c_trigger)
scheduler.add_job(tick, i_trigger)

带参数任务

def whoami(name: str, age: int):
    print(f"My name is {name}, age is {age}")

scheduler.add_job(whoami, i_trigger, args=("Tom",))
# or
scheduler.add_job(whoami, i_trigger, kwargs={"name":"Tom", "age":13})

启动调度器

# start
scheduler.start()

# shutdown
scheduler.shutdown()

网络模块

urllib

urllib.request

import urllib.request, json

url = 'http://your.url'
data = {"name": "Tom", "age": "16"}
req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json; charset=utf-8')
jsondata = json.dumps(data)
jsondataasbytes = jsondata.encode('utf-8')
req.add_header('Content-Length', len(jsondataasbytes))
res = urllib.request.urlopen(req, jsondataasbytes)
print(res.getcode(), res.msg)

urllib2

import urllib, urllib2
url = 'http://your.url'
data = {"name": "Tom", "age": "16"}
res = urllib2.urlopen(url, data)
print(res.getcode(), res.msg)

requests

url = "http://www.baidu.com"
r = requests.get(url)
print(r.status_code)
print(r.text)

# headers
headers = {"user-agent": "Chrome/90.0.4430.85 Safari/537.36"}
r = requests.get(url, headers=headers)

# proxies
proxies = {
    "http": "127.0.0.1:1087",
    "https": "127.0.0.1:1087"
}
r = requests.get(url, proxies=proxies)

# socks proxies
#pip install requests[socks]
proxies = {
    'http': 'socks5://127.0.0.1:1087',
    'https': 'socks5://127.0.0.1:1087'
}
r = requests.get(url, proxies=proxies)

# session
with requests.Session() as s:
    r = s.get('https://httpbin.org/cookies')
    print(r.text)

HTTPX

HTTPX

HTTPX has both async and sync compatibility, which means you can use it in both asynchronous and synchronous programs. The Requests library, on the other hand, is only synchronous

import httpx

with httpx.Client() as client:
    r = client.get(url)
    print(r.text)

send async requests

async with httpx.AsyncClient() as client:
    r = await client.get(url)
    return r.json()

set timeout

with httpx.Client(timeout=10) as client:
    r = client.get(url)
    print(r.text)

aiohttp

aiohttp:发送http请求

  1. 创建一个ClientSession对象
  2. 通过ClientSession对象去发送请求(get, post, delete等)
  3. await 异步等待返回结果
import aiohttp
import asyncio

async def main():
    url = 'http://your.url'
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            print(res.status)
            print(await res.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

The aiohttp Request Lifecycle

fastapi

安装

pip install fastapi
pip install "uvicorn[standard]"

编辑main.py

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def root():
    return {"message": "Hello World"}

调试

uvicorn main:app --reload --host 0.0.0.0 --port 8080

# if main.py in other dir
uvicorn src.main:app --reload --host 0.0.0.0 --port 8080

处理上报 Body

Request Body

from fastapi import FastAPI
from pydantic import BaseModel


class Item(BaseModel):
    name: str
    description: str | None = None
    price: float
    tax: float | None = None


app = FastAPI()


@app.post("/items/")
async def create_item(item: Item):
    return item

异步调用后台任务

Background Tasks

from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

注意:write_notification 不要定义成 async 函数

websockets

Server:

import asyncio
from websockets.server import serve

async def echo(websocket):
    async for message in websocket:
        await websocket.send(message)

async def main():
    async with serve(echo, "localhost", 8765):
        await asyncio.Future()  # run forever

asyncio.run(main())

Client:

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        name = input("What's your name? ")
        await websocket.send(name)
        print(f">>> {name}")
        greeting = await websocket.recv()
        print(f"<<< {greeting}")

asyncio.run(hello())

数据库

records

安装

pip install pymysql
pip install records

MySQL

import records

mysql_url = f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

PostgreSQL

安装:pip install psycopg2-binary

post_url = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

查询

QUERY_SELECT = '''
    SELECT name, age FROM my_table;
'''

with records.Database(mysql_url) as db:
    rows = db.query(QUERY_SELECT)
    
print(rows.all()) # 得到所有数据
print(rows.all(as_dict=True)) # 字典形式展示
print(rows.first()) # 获取第一个
print(rows.first(as_ordereddict=True)) # 排序字典
print(rows.one()) # 查询唯一的一个

# 转json
json_rows = rows.export('yaml')
print(json_rows)
    
# 转excel
with open('users.xlsx', 'wb') as f:
    f.write(rows.export('xlsx'))

插入

QUERY_INSERT = '''
    INSERT INTO my_table (name, age) VALUES (:name, :age);
'''

with records.Database(mysql_url) as db:
    db.query(QUERY_INSERT, "Tom", 12)
    
    # or 
    user = {"name": "Tom", "age": 20}
    db.query(QUERY_INSERT, **user)
    
    # bulk insert
    users = [
        {"name":"Tom", "age": 13},
        {"name":"Jack", "age": 15},
        {"name":"Wang", "age": 16}
    ]
    db.bulk_query('INSERT INTO table(name, age) values (:name, :age)', users)

事务

# transaction
with db.transaction() as tx:
    user = {"name": "Jimmy", "age": 20}
    tx.query('INSERT INTO table(name, age) values (:name, :age)', **user)
    # 下面是错误的 sql 语句,有错误,则上面的 sql 语句不会成功执行。
    tx.query('blabla')

测试

Faker

Install

pip install Faker

Usage

from faker import Faker
fake = Faker()
print(fake.name())
print(fake.address())

Locust

性能测试

Install:

pip install locust

Usage:

from locust import HttpUser, task, between

# 定义用户类:继承HttpUser类,并定义用户的属性和行为
class MyUser(HttpUser):
    wait_time = between(5, 15)  # 定义用户思考时间(即请求间隔)在5到15秒之间随机

    # @task装饰器定义了一个任务,即用户执行的HTTP请求。
    @task
    def my_task(self):
        self.client.get("/")  # 发起GET请求

运行Locust测试:

locust -f path/to/your/testfile.py

模版

Jinja2

Template Designer Documentation

# Comment
{# comment #}

# Variable
{{ foo.bar }}
{{ foo['bar'] }}

# For
{% for field in fields %} {{field.name}} {% endfor %}

# If
{% if kenny.sick %}
    Kenny is sick.
{% elif kenny.dead %}
    You killed Kenny!  You bastard!!!
{% else %}
    Kenny looks okay --- so far
{% endif %}

{% if field.desc is not none and field.desc is defined %} {{field.desc}} {% endif %}

{% if loop.index is divisibleby 3 %}
{% endif %}

CLI

rich

Github - rich: a Python library for rich text and beautiful formatting in the terminal.

textual

Github - textual: a Rapid Application Development framework for Python.

Typer

Official Site - Typer:a library for building CLI applications

Install: pip install "typer[all]"

Quickstart:

import typer

def main(name: str):
    print(f"Hello {name}")
    
if __name__ == '__main__':
    typer.run(main)

Multiple commands

import typer

app = typer.Typer()

@app.command()
def hello(name: str):
    print(f"Hello {name}")


@app.command()
def goodbye(name: str, formal: bool = False):
    if formal:
        print(f"Goodbye Ms. {name}. Have a good day.")
    else:
        print(f"Bye {name}!")


if __name__ == "__main__":
    app()

Run program from the command line:

python main.py --help

python main.py hello Camila

Argument with choices:

from enum import Enum
import typer

class NeuralNetwork(str, Enum):
    simple = "simple"
    conv = "conv"
    lstm = "lstm"

def main(network: NeuralNetwork = NeuralNetwork.simple):
    print(f"Training neural network of type: {network.value}")

python-fire

Github - python-fire

The Python Fire Guide

python code example.py

import fire

def hello(name):
  return 'Hello {name}!'.format(name=name)

def add(a, b):
    return a+b

if __name__ == '__main__':
  fire.Fire()

run program from the command line:

python example.py hello World
python example.py hello --name=World

python example.py add 10 20

WebUI

Gradio

Install

pip install gradio

Quickstart

import gradio as gr

def greet(name, intensity):
    return "Hello, " + name + "!" * int(intensity)

demo = gr.Interface(
    fn=greet,
    inputs=["text", "slider"],
    outputs=["text"],
)

demo.launch()

Streamlit

Streamlit - Documentation

Install:pip install streamlit

Quickstart

import streamlit as st
import pandas as pd
import numpy as np

st.title('Uber pickups in NYC')

DATE_COLUMN = 'date/time'
DATA_URL = ('https://s3-us-west-2.amazonaws.com/'
            'streamlit-demo-data/uber-raw-data-sep14.csv.gz')

@st.cache_data
def load_data(nrows):
    data = pd.read_csv(DATA_URL, nrows=nrows)
    lowercase = lambda x: str(x).lower()
    data.rename(lowercase, axis='columns', inplace=True)
    data[DATE_COLUMN] = pd.to_datetime(data[DATE_COLUMN])
    return data

data_load_state = st.text('Loading data...')
data = load_data(10000)
data_load_state.text("Done! (using st.cache_data)")

if st.checkbox('Show raw data'):
    st.subheader('Raw data')
    st.write(data)

st.subheader('Number of pickups by hour')
hist_values = np.histogram(data[DATE_COLUMN].dt.hour, bins=24, range=(0,24))[0]
st.bar_chart(hist_values)

# Some number in the range 0-23
hour_to_filter = st.slider('hour', 0, 23, 17)
filtered_data = data[data[DATE_COLUMN].dt.hour == hour_to_filter]

st.subheader('Map of all pickups at %s:00' % hour_to_filter)
st.map(filtered_data)

音频处理

References:

Library Load/Write Audio Playback Record Manipulate
wave WAV
soundfile WAV/MP3
sounddevice yes yes
pyaudio yes yes
librosa yes yes

wave

Python 标准库中的库,用于处理 WAV 格式的文件

The Python Standard Library - wave

Usage

import wave

# Read wav file into bytes
with wave.open('path/to/audio_file.wav', 'rb') as wf:
    frames = wf.readframes(n_frames)
    
# Save bytes into wav file
with wave.open('path/to/audio_file.wav', 'rb') as wf:
    wf.setnchannels(1) # audio channels(1: mono, 2: stereo)
    wf.setsampwidth(2) # 1: pyaudio.paInt8, 2: pyaudio.paInt16, 3: pyaudio.paInt24, 4: pyaudio.paInt32
    wf.setframerate(16000) # sample rate
    wf.writeframes(b''.join(frames))

pyaudio

可以以字节流的方式录制/播放音频

Install

pip install pyaudio

Mac

brew install portaudio
pip install pyaudio

列出音频设备

import pyaudio

p = pyaudio.PyAudio()

# Get the number of audio I/O devices
devices = p.get_device_count()

for i in range(devices):
    device_info = p.get_device_info_by_index(i)
    if device_info.get('maxInputChannels') > 0:
        print(f"Microphone: {device_info.get('name')} , Device Index: {device_info.get('index')}")

Record

FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=44100 # sample rate
CHUNK=1024 # number of frames per buffer
RECORD_SECONDS = 5

p = pyaudio.PyAudio()
stream = p.open(
    format=FORMAT, # format of audio samples
    channels=CHANNELS, # audio channels(1: mono, 2: stereo)
    rate=RATE, # sample rate
    frames_per_buffer=CHUNK, # number of frames per buffer
    input=True,
)

print("Recording...")
frames = []
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS))
    data = stream.read(CHUNK)
    frames.append(data)

stream.stop_stream()
stream.close()
p.terminate()

# save the recording
WAVE_OUTPUT_FILENAME = "recorded_audio.wav"
with wave.open(WAVE_OUTPUT_FILENAME, 'wb') as wf:
    wf.setnchannels(CHANNELS)
    wf.setsampwidth(p.get_sample_size(FORMAT))
    wf.setframerate(RATE)
    wf.writeframes(b''.join(frames))
print("Recording saved as", WAVE_OUTPUT_FILENAME)

Playback

wf = wave.open(filename, 'rb')
stream = p.open(format = p.get_format_from_width(wf.getsampwidth()),
    channels = wf.getnchannels(),
    rate = wf.getframerate(),
    output = True) # 'output = True' indicates that the sound will be played rather than recorded

# Read data in chunks
data = wf.readframes(chunk)

# Play the sound by writing the audio data to the stream
while data != '':
    stream.write(data)
    data = wf.readframes(chunk)
    
# Close and terminate the stream
stream.close()
p.terminate()

soundfile

Install

pip install soundfile

Usage

import soundfile as sf

# read wav file into numpy.ndarray
wav, sample_rate = sf.read('path/to/audio_file.wav')

# write numpy.ndarray to wav file
soundfile.write('path/to/audio_file.wav', wav, sample_rate)

sounddevice

Install

pip install sounddevice

Usage

import sounddevice as sd
import soundfile as sf

wav, sample_rate = sf.read('path/to/audio_file.wav')

# play the audio file
sd.play(wav, sample_rate)
sd.wait() # wait until the file has finished playing

# record
duration, sample_rate = 5, 44100
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=2)
sd.wait()  # Wait until recording is finished

librosa

Audio and music signal analysis

Github - librosa

Introduction to LibROSA

Install

pip install librosa

Usage:

# load audio file
wav, sample_rate = librosa.load('audio_file')
wav, sample_rate = librosa.load('audio_file', sr=None, offset=0, duration=None, mono=True)

# get duration
duration = librosa.get_duration(y=wav, sr=sample_rate)

Plotting

import matplotlib.pyplot as plt

plt.figure(figsize=(12, 4))
librosa.display.waveplot(wav, sr=sampling_rate)
plt.show()

Extract features

  • MFCC: Mel Frequency Cepstral Coefficients are a very commonly used feature for speech/music analysis
mfcc = librosa.feature.mfcc(y=audio_data, sr=sampling_rate, n_mfcc=13)

Manipulating

# resample
new_audio = librosa.resample(audio_data, orig_sr=orig_sr, target_sr=target_sr)

# trim to shorter segment
new_audio = librosa.trim(audio_data, top_db=10, trim_db=20)

# join
new_audio = librosa.concatenate([audio1, audio2, audio3], sr)

# fade in/fade out
faded_in_audio = librosa.fade(audio_data, fade_in_len) 
faded_out_audio = librosa.fade(audio_data, fade_out_len, fade_out=True)

# pitch shift
new_audio = librosa.effects.pitch_shift(audio_data, sr, n_steps)

# time stretch
new_audio = librosa.effects.time_stretch(audio_data, sr, new_sr)

pydub

Github - pydub

Install

pip install pydub

Usage

from pydub import AudioSegment

# read audio from file
sound1 = AudioSegment.from_file("/path/to/sound.wav", format="wav")

# save to file
sound1.export("output.wav")

# get audio bytes
b = sound1.export().read()

# convert format
sound2 = sound1.export(format="mp3")

# change sample rate
sound2 = sound1.set_frame_rate(16000) # to 16000 Hz

键盘控制

pynput

pynput Package Documentation

安装

pip install pynput

controlling the keyboard

from pynput.keyboard import Key, Controller

keyboard = Controller()

# Press and release space
keyboard.press(Key.space)
keyboard.release(Key.space)

# Type a lower case A; this will work even if no key on the
# physical keyboard is labelled 'A'
keyboard.press('a')
keyboard.release('a')

# Type two upper case As
keyboard.press('A')
keyboard.release('A')
with keyboard.pressed(Key.shift):
    keyboard.press('a')
    keyboard.release('a')

# Type 'Hello World' using the shortcut type method
keyboard.type('Hello World')

monitoring the keyboard

from pynput import keyboard

def on_press(key):
    try:
        print('alphanumeric key {0} pressed'.format(
            key.char))
    except AttributeError:
        print('special key {0} pressed'.format(
            key))

def on_release(key):
    print('{0} released'.format(
        key))
    if key == keyboard.Key.esc:
        # Stop listener
        return False

# Collect events until released
with keyboard.Listener(
        on_press=on_press,
        on_release=on_release) as listener:
    listener.join()

# ...or, in a non-blocking fashion:
listener = keyboard.Listener(
    on_press=on_press,
    on_release=on_release)
listener.start()

实用工具

img2pdf

图片转PDF: Github - img2pdf

pip3 install img2pdf
img2pdf ./*.png ./*.jpg -o out.pdf

# 批量转换
for d in `ls`; do
  echo $d
  cd $d && img2pdf ./*.png -o ../${d}.pdf && cd .. 
done

python code

with open("name.pdf","wb") as f:
	f.write(img2pdf.convert(["test1.jpg", "test2.png"]))

PyPDF2

Merge PDFs

from PyPDF2 import PdfFileMerger, PdfFileReader
import os
merger = PdfFileMerger()
path = "C:/Users/yizhen.chen/Downloads/"
for file in os.listdir(path):
    if file[-4:] == '.pdf':
        merger.append(PdfFileReader(open(file, 'rb')))
merger.write(path + "output.pdf")

Rotate PDFs

import PyPDF2
pdf_in = open('D:/yizhen.chen/desktop/Untitled.pdf', 'rb')
pdf_reader = PyPDF2.PdfFileReader(pdf_in)
pdf_writer = PyPDF2.PdfFileWriter()

for pagenum in range(pdf_reader.numPages):
    page = pdf_reader.getPage(pagenum)
    page.rotateClockwise(90)
    pdf_writer.addPage(page)
 
pdf_out = open('D:/yizhen.chen/desktop/Etude 2.pdf', 'wb')
pdf_writer.write(pdf_out)
pdf_out.close()
pdf_in.close()

Delete pages from PDF

from PyPDF2 import PdfFileWriter, PdfFileReader
import os

file_path = './'
output_path = './output.pdf'
pdf = PdfFileReader(open(file_path, 'rb'))
writer = PdfFileWriter()

for i in range(1, pdf.getNumPages()):
    writer.addPage(pdf.getPage(i))

with open(output_path, 'wb') as f:
    writer.write(f)

comtypes

  • a lightweight python COM (The Microsoft Component Object Model) package: Github - comtypes
  • only available in Windows

Install:

pip install comtypes

eyeD3

用于增加、删除、修改mp3文件中ID3元数据(即歌曲信息)

# add cover to mp3
eyeD3 --add-image=image.jpg:FRONT_COVER song.mp3