任务调度
schedule
install
pip install schedule
usage
import schedule
# add schedule job
schedule.every(10).seconds.do(lambda: print("running"))
# run scheduler
while True:
schedule.run_pending()
time.sleep(1)
add job with parameters
def func(name: str):
print(f"My name is {name}")
schedule.every(5).seconds.do(func, name="Tom")
while True:
schedule.run_pending()
time.sleep(1)
Apscheduler
Install
pip install apscheduler
Triggers:任务触发逻辑
- cron:cron 格式触发
- interval:固定时间间隔触发
- date:在某固定日期触发一次
- combine:组合条件触发
Scheduler
BlockingScheduler
: 阻塞式,当程序只运行这个 scheduler 时使用BackgroundScheduler
:调度器在后台运行
Executor
ThreadPoolExecutor
:默认使用多线程执行器ProcessPoolExecutor
:如果是 CPU 密集型任务可以使用多进程执行器
Job store:如果任务调度信息存在内存中,当程序退出后会丢失,可以其他存储器进行持久化存储
MemoryJobStore
: 默认使用内存存储SQLAlchemyJobStore
MongoDBJobStore
- etc.
创建 scheduler
# 创建 BlockingScheduler
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
# 创建 BackgroundScheduler
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
# 自定义 job store、executor、job defaults、time zone
import pytz
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
scheduler = BackgroundScheduler(
jobstores={'mongo': MongoDBJobStore()},
executors={'processpool': ProcessPoolExecutor(5)},
job_defaults = {
'coalesce': False,
'max_instances': 3
},
timezone=pytz.utc
)
新增任务
from datetime import datetime
def tick():
print(datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# add job with cron trigger
scheduler.add_job(tick, "cron", second=0) # every minute at second 0
先创建 triggers 再创建任务
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
c_trigger = CronTrigger(second=0) # every minute at second 0
i_trigger = IntervalTrigger(seconds=10) # every 10 seconds
scheduler.add_job(tick, c_trigger)
scheduler.add_job(tick, i_trigger)
带参数任务
def whoami(name: str, age: int):
print(f"My name is {name}, age is {age}")
scheduler.add_job(whoami, i_trigger, args=("Tom",))
# or
scheduler.add_job(whoami, i_trigger, kwargs={"name":"Tom", "age":13})
启动调度器
# start
scheduler.start()
# shutdown
scheduler.shutdown()
网络模块
urllib
urllib.request
import urllib.request, json
url = 'http://your.url'
data = {"name": "Tom", "age": "16"}
req = urllib.request.Request(url)
req.add_header('Content-Type', 'application/json; charset=utf-8')
jsondata = json.dumps(data)
jsondataasbytes = jsondata.encode('utf-8')
req.add_header('Content-Length', len(jsondataasbytes))
res = urllib.request.urlopen(req, jsondataasbytes)
print(res.getcode(), res.msg)
urllib2
import urllib, urllib2
url = 'http://your.url'
data = {"name": "Tom", "age": "16"}
res = urllib2.urlopen(url, data)
print(res.getcode(), res.msg)
requests
url = "http://www.baidu.com"
r = requests.get(url)
print(r.status_code)
print(r.text)
# headers
headers = {"user-agent": "Chrome/90.0.4430.85 Safari/537.36"}
r = requests.get(url, headers=headers)
# proxies
proxies = {
"http": "127.0.0.1:1087",
"https": "127.0.0.1:1087"
}
r = requests.get(url, proxies=proxies)
# socks proxies
#pip install requests[socks]
proxies = {
'http': 'socks5://127.0.0.1:1087',
'https': 'socks5://127.0.0.1:1087'
}
r = requests.get(url, proxies=proxies)
# session
with requests.Session() as s:
r = s.get('https://httpbin.org/cookies')
print(r.text)
HTTPX
HTTPX has both async and sync compatibility, which means you can use it in both asynchronous and synchronous programs. The Requests library, on the other hand, is only synchronous
import httpx
with httpx.Client() as client:
r = client.get(url)
print(r.text)
send async requests
async with httpx.AsyncClient() as client:
r = await client.get(url)
return r.json()
set timeout
with httpx.Client(timeout=10) as client:
r = client.get(url)
print(r.text)
aiohttp
aiohttp:发送http请求
- 创建一个ClientSession对象
- 通过ClientSession对象去发送请求(get, post, delete等)
- await 异步等待返回结果
import aiohttp
import asyncio
async def main():
url = 'http://your.url'
async with aiohttp.ClientSession() as session:
async with session.get(url) as res:
print(res.status)
print(await res.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
fastapi
安装
pip install fastapi
pip install "uvicorn[standard]"
编辑main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World"}
调试
uvicorn main:app --reload --host 0.0.0.0 --port 8080
# if main.py in other dir
uvicorn src.main:app --reload --host 0.0.0.0 --port 8080
处理上报 Body
from fastapi import FastAPI
from pydantic import BaseModel
class Item(BaseModel):
name: str
description: str | None = None
price: float
tax: float | None = None
app = FastAPI()
@app.post("/items/")
async def create_item(item: Item):
return item
异步调用后台任务
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
注意:write_notification
不要定义成 async 函数
websockets
Server:
import asyncio
from websockets.server import serve
async def echo(websocket):
async for message in websocket:
await websocket.send(message)
async def main():
async with serve(echo, "localhost", 8765):
await asyncio.Future() # run forever
asyncio.run(main())
Client:
import asyncio
import websockets
async def hello():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
name = input("What's your name? ")
await websocket.send(name)
print(f">>> {name}")
greeting = await websocket.recv()
print(f"<<< {greeting}")
asyncio.run(hello())
数据库
records
安装
pip install pymysql
pip install records
MySQL
import records
mysql_url = f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
PostgreSQL
安装:pip install psycopg2-binary
post_url = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
查询
QUERY_SELECT = '''
SELECT name, age FROM my_table;
'''
with records.Database(mysql_url) as db:
rows = db.query(QUERY_SELECT)
print(rows.all()) # 得到所有数据
print(rows.all(as_dict=True)) # 字典形式展示
print(rows.first()) # 获取第一个
print(rows.first(as_ordereddict=True)) # 排序字典
print(rows.one()) # 查询唯一的一个
# 转json
json_rows = rows.export('yaml')
print(json_rows)
# 转excel
with open('users.xlsx', 'wb') as f:
f.write(rows.export('xlsx'))
插入
QUERY_INSERT = '''
INSERT INTO my_table (name, age) VALUES (:name, :age);
'''
with records.Database(mysql_url) as db:
db.query(QUERY_INSERT, "Tom", 12)
# or
user = {"name": "Tom", "age": 20}
db.query(QUERY_INSERT, **user)
# bulk insert
users = [
{"name":"Tom", "age": 13},
{"name":"Jack", "age": 15},
{"name":"Wang", "age": 16}
]
db.bulk_query('INSERT INTO table(name, age) values (:name, :age)', users)
事务
# transaction
with db.transaction() as tx:
user = {"name": "Jimmy", "age": 20}
tx.query('INSERT INTO table(name, age) values (:name, :age)', **user)
# 下面是错误的 sql 语句,有错误,则上面的 sql 语句不会成功执行。
tx.query('blabla')
测试
Faker
Install
pip install Faker
Usage
from faker import Faker
fake = Faker()
print(fake.name())
print(fake.address())
Locust
性能测试
Install:
pip install locust
Usage:
from locust import HttpUser, task, between
# 定义用户类:继承HttpUser类,并定义用户的属性和行为
class MyUser(HttpUser):
wait_time = between(5, 15) # 定义用户思考时间(即请求间隔)在5到15秒之间随机
# @task装饰器定义了一个任务,即用户执行的HTTP请求。
@task
def my_task(self):
self.client.get("/") # 发起GET请求
运行Locust测试:
locust -f path/to/your/testfile.py
模版
Jinja2
# Comment
{# comment #}
# Variable
{{ foo.bar }}
{{ foo['bar'] }}
# For
{% for field in fields %} {{field.name}} {% endfor %}
# If
{% if kenny.sick %}
Kenny is sick.
{% elif kenny.dead %}
You killed Kenny! You bastard!!!
{% else %}
Kenny looks okay --- so far
{% endif %}
{% if field.desc is not none and field.desc is defined %} {{field.desc}} {% endif %}
{% if loop.index is divisibleby 3 %}
{% endif %}
CLI
rich
Github - rich: a Python library for rich text and beautiful formatting in the terminal.
textual
Github - textual: a Rapid Application Development framework for Python.
Typer
Official Site - Typer:a library for building CLI applications
Install: pip install "typer[all]"
Quickstart:
import typer
def main(name: str):
print(f"Hello {name}")
if __name__ == '__main__':
typer.run(main)
Multiple commands
import typer
app = typer.Typer()
@app.command()
def hello(name: str):
print(f"Hello {name}")
@app.command()
def goodbye(name: str, formal: bool = False):
if formal:
print(f"Goodbye Ms. {name}. Have a good day.")
else:
print(f"Bye {name}!")
if __name__ == "__main__":
app()
Run program from the command line:
python main.py --help
python main.py hello Camila
Argument with choices:
from enum import Enum
import typer
class NeuralNetwork(str, Enum):
simple = "simple"
conv = "conv"
lstm = "lstm"
def main(network: NeuralNetwork = NeuralNetwork.simple):
print(f"Training neural network of type: {network.value}")
python-fire
python code example.py
import fire
def hello(name):
return 'Hello {name}!'.format(name=name)
def add(a, b):
return a+b
if __name__ == '__main__':
fire.Fire()
run program from the command line:
python example.py hello World
python example.py hello --name=World
python example.py add 10 20
WebUI
Gradio
Install
pip install gradio
Quickstart
import gradio as gr
def greet(name, intensity):
return "Hello, " + name + "!" * int(intensity)
demo = gr.Interface(
fn=greet,
inputs=["text", "slider"],
outputs=["text"],
)
demo.launch()
Streamlit
Install:pip install streamlit
Quickstart
import streamlit as st
import pandas as pd
import numpy as np
st.title('Uber pickups in NYC')
DATE_COLUMN = 'date/time'
DATA_URL = ('https://s3-us-west-2.amazonaws.com/'
'streamlit-demo-data/uber-raw-data-sep14.csv.gz')
@st.cache_data
def load_data(nrows):
data = pd.read_csv(DATA_URL, nrows=nrows)
lowercase = lambda x: str(x).lower()
data.rename(lowercase, axis='columns', inplace=True)
data[DATE_COLUMN] = pd.to_datetime(data[DATE_COLUMN])
return data
data_load_state = st.text('Loading data...')
data = load_data(10000)
data_load_state.text("Done! (using st.cache_data)")
if st.checkbox('Show raw data'):
st.subheader('Raw data')
st.write(data)
st.subheader('Number of pickups by hour')
hist_values = np.histogram(data[DATE_COLUMN].dt.hour, bins=24, range=(0,24))[0]
st.bar_chart(hist_values)
# Some number in the range 0-23
hour_to_filter = st.slider('hour', 0, 23, 17)
filtered_data = data[data[DATE_COLUMN].dt.hour == hour_to_filter]
st.subheader('Map of all pickups at %s:00' % hour_to_filter)
st.map(filtered_data)
音频处理
References:
Library | Load/Write Audio | Playback | Record | Manipulate | |
---|---|---|---|---|---|
wave | WAV | ||||
soundfile | WAV/MP3 | ||||
sounddevice | yes | yes | |||
pyaudio | yes | yes | |||
librosa | yes | yes |
wave
Python 标准库中的库,用于处理 WAV 格式的文件
Usage
import wave
# Read wav file into bytes
with wave.open('path/to/audio_file.wav', 'rb') as wf:
frames = wf.readframes(n_frames)
# Save bytes into wav file
with wave.open('path/to/audio_file.wav', 'rb') as wf:
wf.setnchannels(1) # audio channels(1: mono, 2: stereo)
wf.setsampwidth(2) # 1: pyaudio.paInt8, 2: pyaudio.paInt16, 3: pyaudio.paInt24, 4: pyaudio.paInt32
wf.setframerate(16000) # sample rate
wf.writeframes(b''.join(frames))
pyaudio
可以以字节流的方式录制/播放音频
Install
pip install pyaudio
Mac
brew install portaudio
pip install pyaudio
列出音频设备
import pyaudio
p = pyaudio.PyAudio()
# Get the number of audio I/O devices
devices = p.get_device_count()
for i in range(devices):
device_info = p.get_device_info_by_index(i)
if device_info.get('maxInputChannels') > 0:
print(f"Microphone: {device_info.get('name')} , Device Index: {device_info.get('index')}")
Record
FORMAT = pyaudio.paInt16 # format of audio samples
CHANNELS = 1 # audio channels(1: mono, 2: stereo)
RATE=44100 # sample rate
CHUNK=1024 # number of frames per buffer
RECORD_SECONDS = 5
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT, # format of audio samples
channels=CHANNELS, # audio channels(1: mono, 2: stereo)
rate=RATE, # sample rate
frames_per_buffer=CHUNK, # number of frames per buffer
input=True,
)
print("Recording...")
frames = []
for _ in range(0, int(RATE / CHUNK * RECORD_SECONDS))
data = stream.read(CHUNK)
frames.append(data)
stream.stop_stream()
stream.close()
p.terminate()
# save the recording
WAVE_OUTPUT_FILENAME = "recorded_audio.wav"
with wave.open(WAVE_OUTPUT_FILENAME, 'wb') as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
print("Recording saved as", WAVE_OUTPUT_FILENAME)
Playback
wf = wave.open(filename, 'rb')
stream = p.open(format = p.get_format_from_width(wf.getsampwidth()),
channels = wf.getnchannels(),
rate = wf.getframerate(),
output = True) # 'output = True' indicates that the sound will be played rather than recorded
# Read data in chunks
data = wf.readframes(chunk)
# Play the sound by writing the audio data to the stream
while data != '':
stream.write(data)
data = wf.readframes(chunk)
# Close and terminate the stream
stream.close()
p.terminate()
soundfile
Install
pip install soundfile
Usage
import soundfile as sf
# read wav file into numpy.ndarray
wav, sample_rate = sf.read('path/to/audio_file.wav')
# write numpy.ndarray to wav file
soundfile.write('path/to/audio_file.wav', wav, sample_rate)
sounddevice
Install
pip install sounddevice
Usage
import sounddevice as sd
import soundfile as sf
wav, sample_rate = sf.read('path/to/audio_file.wav')
# play the audio file
sd.play(wav, sample_rate)
sd.wait() # wait until the file has finished playing
# record
duration, sample_rate = 5, 44100
recording = sd.rec(int(duration * sample_rate), samplerate=sample_rate, channels=2)
sd.wait() # Wait until recording is finished
librosa
Audio and music signal analysis
Install
pip install librosa
Usage:
# load audio file
wav, sample_rate = librosa.load('audio_file')
wav, sample_rate = librosa.load('audio_file', sr=None, offset=0, duration=None, mono=True)
# get duration
duration = librosa.get_duration(y=wav, sr=sample_rate)
Plotting
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
librosa.display.waveplot(wav, sr=sampling_rate)
plt.show()
Extract features
- MFCC: Mel Frequency Cepstral Coefficients are a very commonly used feature for speech/music analysis
mfcc = librosa.feature.mfcc(y=audio_data, sr=sampling_rate, n_mfcc=13)
Manipulating
# resample
new_audio = librosa.resample(audio_data, orig_sr=orig_sr, target_sr=target_sr)
# trim to shorter segment
new_audio = librosa.trim(audio_data, top_db=10, trim_db=20)
# join
new_audio = librosa.concatenate([audio1, audio2, audio3], sr)
# fade in/fade out
faded_in_audio = librosa.fade(audio_data, fade_in_len)
faded_out_audio = librosa.fade(audio_data, fade_out_len, fade_out=True)
# pitch shift
new_audio = librosa.effects.pitch_shift(audio_data, sr, n_steps)
# time stretch
new_audio = librosa.effects.time_stretch(audio_data, sr, new_sr)
pydub
Install
pip install pydub
Usage
from pydub import AudioSegment
# read audio from file
sound1 = AudioSegment.from_file("/path/to/sound.wav", format="wav")
# save to file
sound1.export("output.wav")
# get audio bytes
b = sound1.export().read()
# convert format
sound2 = sound1.export(format="mp3")
# change sample rate
sound2 = sound1.set_frame_rate(16000) # to 16000 Hz
键盘控制
pynput
安装
pip install pynput
controlling the keyboard
from pynput.keyboard import Key, Controller
keyboard = Controller()
# Press and release space
keyboard.press(Key.space)
keyboard.release(Key.space)
# Type a lower case A; this will work even if no key on the
# physical keyboard is labelled 'A'
keyboard.press('a')
keyboard.release('a')
# Type two upper case As
keyboard.press('A')
keyboard.release('A')
with keyboard.pressed(Key.shift):
keyboard.press('a')
keyboard.release('a')
# Type 'Hello World' using the shortcut type method
keyboard.type('Hello World')
monitoring the keyboard
from pynput import keyboard
def on_press(key):
try:
print('alphanumeric key {0} pressed'.format(
key.char))
except AttributeError:
print('special key {0} pressed'.format(
key))
def on_release(key):
print('{0} released'.format(
key))
if key == keyboard.Key.esc:
# Stop listener
return False
# Collect events until released
with keyboard.Listener(
on_press=on_press,
on_release=on_release) as listener:
listener.join()
# ...or, in a non-blocking fashion:
listener = keyboard.Listener(
on_press=on_press,
on_release=on_release)
listener.start()
实用工具
img2pdf
图片转PDF: Github - img2pdf
pip3 install img2pdf
img2pdf ./*.png ./*.jpg -o out.pdf
# 批量转换
for d in `ls`; do
echo $d
cd $d && img2pdf ./*.png -o ../${d}.pdf && cd ..
done
python code
with open("name.pdf","wb") as f:
f.write(img2pdf.convert(["test1.jpg", "test2.png"]))
PyPDF2
Merge PDFs
from PyPDF2 import PdfFileMerger, PdfFileReader
import os
merger = PdfFileMerger()
path = "C:/Users/yizhen.chen/Downloads/"
for file in os.listdir(path):
if file[-4:] == '.pdf':
merger.append(PdfFileReader(open(file, 'rb')))
merger.write(path + "output.pdf")
Rotate PDFs
import PyPDF2
pdf_in = open('D:/yizhen.chen/desktop/Untitled.pdf', 'rb')
pdf_reader = PyPDF2.PdfFileReader(pdf_in)
pdf_writer = PyPDF2.PdfFileWriter()
for pagenum in range(pdf_reader.numPages):
page = pdf_reader.getPage(pagenum)
page.rotateClockwise(90)
pdf_writer.addPage(page)
pdf_out = open('D:/yizhen.chen/desktop/Etude 2.pdf', 'wb')
pdf_writer.write(pdf_out)
pdf_out.close()
pdf_in.close()
Delete pages from PDF
from PyPDF2 import PdfFileWriter, PdfFileReader
import os
file_path = './'
output_path = './output.pdf'
pdf = PdfFileReader(open(file_path, 'rb'))
writer = PdfFileWriter()
for i in range(1, pdf.getNumPages()):
writer.addPage(pdf.getPage(i))
with open(output_path, 'wb') as f:
writer.write(f)
comtypes
- a lightweight python COM (The Microsoft Component Object Model) package: Github - comtypes
- only available in Windows
Install:
pip install comtypes
eyeD3
用于增加、删除、修改mp3文件中ID3元数据(即歌曲信息)
# add cover to mp3
eyeD3 --add-image=image.jpg:FRONT_COVER song.mp3