StandardLib

Text Processing Services

re

正则表达式

import re

# 编译
datepat = re.compile(r'\d+/\d+/\d+')

# 匹配
text1 = '11/27/2012'
if datepat.match(text1): print('yes')

# 搜索
text = 'Today is 11/27/2012. PyCon starts 3/13/2013.'
datepat.findall(text) # ['11/27/2012', '3/13/2013']

# 通常会分组匹配
datepat = re.compile(r'(\d+)/(\d+)/(\d+)')
m = datepat.match('11/27/2012')
print(m.group(0), m.group(1), m.group(2), m.group(3), m.groups())
datepat.findall(text) # [('11', '27', '2012'), ('3', '13', '2013')]

# 返回迭代
for m in datepat.finditer(text):
    print(m.groups())

# 只是一次匹配/搜索操作的话可以无需先编译
re.findall(r'(\d+)/(\d+)/(\d+)', text)

# 替换
re.sub(r'(\d+)/(\d+)/(\d+)', r'\3-\1-\2', text) # 'Today is 2012-11-27. PyCon starts 2013-3-13.'
re.sub(r'(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)', r'\g<year>-\g<month>-\g<day>', text) # 命名分组

Data Types

datetime

from datetime import datetime
a = datetime(2012, 9, 23)

# 时间转字符串
a.strftime('%Y-%m-%d')

# 字符串转时间
text = '2012-09-20'
y = datetime.strptime(text, '%Y-%m-%d')

zoneinfo (3.9+)

from datetime import datetime
from zoneinfo import ZoneInfo

# Create a datetime object without timezone
naive_dt = datetime.now()

# Add the timezone to the datetime object
aware_dt = naive_dt.replace(tzinfo=ZoneInfo('Asia/Shanghai'))

print(aware_dt)

collections

nametuple

from collections import nametuple

# namedtuple(typename, field_names)
Point = namedtuple('Point', ['x', 'y'])
p = Point(x=11, y=22)
print(p.x + p.y)

deque

from collections import deque

d = deque(["a", "b", "c"])
d.append("f") # add to the right side
d.appendleft("z") # add to the left side
e = d.pop() # pop from the right side
e = d.popleft() # pop from the left side

d = deque(maxlen=10) # deque with max length, FIFO

Counter

collections — Container datatypes

from collections import Counter
words = ['look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes']
word_counts = Counter(words)
top_three = word_counts.most_common(3)

defaultdict

自动初始化每个 key 刚开始对应的值

from collections import defaultdict
d = defaultdict(list)
d['a'].append(1)
d['a'].append(2)
d['b'].append(4)

OrderedDict

from collections import OrderedDict
d = OrderedDict()

ChainMap

先从字典 a 中找,如果找不到再在字典 b 中找

from collections import ChainMap
a = {'x': 1, 'z': 3 }
b = {'y': 2, 'z': 4 }
c = ChainMap(a,b)

Numeric and Mathematical Modules

random

import random

# 随机生成 0-1 之间的小数
random.random()

# 随机生成一个整数
random.randint(0,10)

values = [1, 2, 3, 4, 5, 6]

# 随机选取一个
random.choice(values)

# 随机采样
random.sample(values, 2)

# 打乱顺序
random.shuffle(values)

# 设定种子
random.seed(12345)

Functional Programming Modules

opertor

operator — Standard operators as functions

# lt: <
# le: <=
# eq: ==
# ne: !
# ge: >=
# gt: >
from operator import lt
lt(a, b) # a < b

# contains
from operator import contains
contains(a, b) # b in a

# getitem
from operator import getitem
getitem(a, b) # a[b]

# getattr
getattr(obj, name) # obj.name

# attrgetter
from operator import attrgetter
f = attrgetter('name') # f(b) -> b.name

# itemgetter
from operator import itemgetter
f = itemgetter(2) # f(r) -> r[2]

itertools

itertools — Functions creating iterators for efficient looping

Infinite iterators:

# count
start, step = 10, 2
for i in itertools.count(start, step):
    print(i) # 10, 12, 14, ...

# cycle
s = "ABCD"
for c in itertools.cycle(s):
    print(c) # A, B, C, D, A, ...
    
# repeat
for i in repeat(10, 3):
    print(i) # 10, 10, 10
for i in repeat(10):
    print(i) # 10, 10, 10, ...

Combinatoric iterators:

s1 = "abc"
s2 = "123"

# cartesian product
for c1, c2 in itertools.product(s1, s2):
    print(c1, c2) # (a, 1), (a, 2), (a, 3), (b, 1), ...

# permutation
for c in itertools.permutations(s1):
    print(c) # (a, b, c), (a, c, b), (b, a, c), ...

# combination
for c in itertools.combinations(s1, 2):
    print(c) # (a, b), (a, c), (b, c)

# combination with replacement
for c in itertools.combinations_with_replacement(s1, 2):
    print(c) # (a, a), (a, b), (a, c), (b, b), (b, c), (c, c)

Iterators terminating on the shortest input sequence:

itertools.accumulate([1,2,3,4,5]) # --> 1 3 6 10 15
itertools.chain("ABC", "DEF") # --> A B C D E F
itertools.compress("ABCDEF", [1,0,1,0,1,1]) # --> A C E F

itertools.takewhile(lambda x: x < 5, [1,4,6,4,1]) # --> 1 4
itertools.dropwhile(lambda x: x < 5, [1,4,6,4,1]) # --> 6 4 1
itertools.filterfalse(lambda x: x % 2, range(10)) # --> 0 2 4 6 8

itertools.islice("ABCDEFG", 2, None) # --> C D E F G
itertools.starmap(pow, [(2,5), (3,2), (10,3)]) # --> 32 9 1000

functools

functools — Higher-order functions and operations on callable objects

# cache
# Simple lightweight unbounded function cache.
@cache
def factorial(n):
    return n * factorial(n-1) if n else 1

# Decorator to wrap a function with a memoizing callable that saves up to the maxsize most recent calls.
@lru_cache(maxsize=128)
def fib(n):
    if n < 2: return n
    return fib(n-1) + fib(n-2)
  
# cmp_to_key
# Transform an old-style comparison function to a key function.
sorted(iterable, key=cmp_to_key(locale.strcoll))

# partial
basetwo = partial(int, base=2)
basetwo('10010') # --> 18

# reduce
reduce(lambda x, y: x+y, [1, 2, 3, 4, 5]) # --> ((((1+2)+3)+4)+5)

File and Directory Access

pathlib

pathlib — Object-oriented filesystem paths

from pathlib import Path

p = Path('.')

# list subdirectories
[x for x in p.iterdir() if x.is_dir()]

# list Python source files
list(p.glob('**/*.py'))

# navigate directory tree
q = p / 'init.d' / 'reboot'

# check existence
q.exists()

# check is directory
q.is_dir()

glob

glob - Unix style pathname pattern expansion

import glob

gif_files = glob.glob('*.gif')

tempfile

tempfile - Generate temporary files and directories

import tempfile

with tempfile.TemporaryFile() as fp:
    fp.write(b'Hello world!')
    fp.seek(0)
    fp.read()

shutil

shutil - High-level file operations

import shutil

# cp
shutil.copy(src, dst)

# copyfile, only copy the content of source file
shutil.copyfile(src, dst)

# chown
shutil.chown(path)

# copytree
shutil.copytree(source, destination)

# rmtree
shutil.rmtree(directory)

# make archive
archive_name = os.path.expanduser(os.path.join('~', 'myarchive'))
root_dir = os.path.expanduser(os.path.join('~', '.ssh'))
shutile.make_archive(archive_name, 'gztar', root_dir)

File Formats

configparser

配置文件

[hosts]
host = localhost
port = 27017
import configparser

config = configparser.ConfigParser()
config.read('config.ini')
HOST = config['hosts']['host']
PORT = config['hosts'].getint('port')

Generic Operating System Services

os

import os
path = "/Users/beazley/Data/data.csv"

# get basename
os.path.basename(path) # data.csv

# get dirname
os.path.dirname(path) # /Users/beazley/Data

# join path
os.path.join('tmp', 'data', os.path.basename(path)) # tmp/data/data.csv

# split file extension
os.path.splitext(path) # ('~/Data/data', '.csv')

# file/directory existence
os.path.exists('/etc/passwd')

# is file
os.path.isfile('/etc/passwd')

# is directory
os.path.isdir('/etc/passwd')

# list directory
os.listdir('/etc')

argparse

argparse - Parser for command-line options, arguments and sub-commands

Argparse Tutorial

params:

  • action
    • store (default): store and do nothing
    • store_const: stores the value specified by the const keyword argument
    • store_true: if the option is specified, assign the value True. Not specifying it implies False
    • count: count the number of occurrences of a specific optional arguments
  • nargs:
    • N: N arguments from the command line will be gathered together into a list
    • ?: if not present than default
    • *: 0 - all arguments are gathered into a list
    • +: at least one argument are gathered into a list
  • default: add default value
  • type: argument type
    • int
    • str
  • choices: argument value should be one of them
  • required: in default, argument starts with - or -- is treat as optional
  • metavar: changes the displayed name in help messages
  • dest: changes the name of the attribute of the optional argument
import argparse

parser = argparse.ArgumentParser(
    prog = 'ProgramName',
    description = 'What the program does')

# add positional argument
parser.add_argument(
    "echo", 
    type=str, 
    help="echo the string you use here")

# add optional argumenr
# argument starts with `-` or `--` is treat as optional
parser.add_argument(
    "--verbosity", 
    help="increase output verbosity")
# with short option
parser.add_argument(
    "-v", 
    "--verbosity",
    help="increase output verbosity")
# boolean argument
parser.add_argument(
    "-v", 
    "--verbosity",
    action="store_true",
    help="increase output verbosity")
# argument with choices
parser.add_argument(
    "-v", 
    "--verbosity",
    type=int,
    choices=[0, 1, 2],
    help="increase output verbosity")

parser.add_argument('-s', '--seed', default=0, help='the random seed',
    action='store', type='int', dest='seed')

logging

Basic config

import logging
logging.basicConfig(
    level=logging.DEBUG,
    filename='example.log',
    filemode='a',
    encoding='utf-8',
    format='%(asctime)s [%(levelname)s] %(pathname)s:%(lineno)d %(message)s',
    datefmt='%m/%d/%Y %I:%M:%S %p'
)

logging.debug("debug")
logging.info("info")
logging.warning("warning")
logging.error("error")
logging.critical("critical")

Advanced logging

import logging

# 设置日志等级
logger = logging.getLogger(__name__) # use a module-level logger
logger.setLevel(logging.DEBUG)

# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)

logger.info("info")

同时写入文件和console

logger = logging.getLogger("my-logger")
logger.setLevel(logging.DEBUG)

# 写入文件
file_handler = logging.FileHandler("../logs/log.log", encoding='UTF-8')
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s: %(levelname)s:%(message)s')
file_handler.setFormatter(formatter)

# 写入console
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)

logger.addHandler(file_handler)
logger.addHandler(console_handler)

curses

curses - Terminal handling for character-cell displays

用 Python 进行 Curses 编程

用于控制命令行字符级别的展示

import curses

# init
stdscr = curses.initscr()
curses.noecho()
curses.cbreak()
stdscr.keypad(True)

# exit gracefully
curses.nocbreak()
stdscr.keypad(False)
curses.echo()
curses.endwin()

使用 wrapper

from curses import wrapper

def main(stdscr):
    # Clear screen
    stdscr.clear()

    # This raises ZeroDivisionError when i == 10.
    for i in range(0, 11):
        v = i-10
        stdscr.addstr(i, 0, '10 divided by {} is {}'.format(v, 10/v))

    stdscr.refresh()
    stdscr.getkey()

wrapper(main)

Concurrent Execution

threading

threading — Thread-based parallelism

# Code to execute in an independent thread
import time
def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(5)

# Create and launch a thread
from threading import Thread
t = Thread(target=countdown, args=(10,))
t.start() # 运行线程

# 合并到当前线程,等待其结束
t.join()

给共享数据加锁

import threading

class SharedCounter:
    '''
    A counter object that can be shared by multiple threads.
    '''
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()

    def incr(self,delta=1):
        '''
        Increment the counter with locking
        '''
        with self._value_lock:
             self._value += delta

    def decr(self,delta=1):
        '''
        Decrement the counter with locking
        '''
        with self._value_lock:
             self._value -= delta

标准包内没有读写锁,如果需要可以使用第三方包 – readerwriterlock

# !pip install readerwriterlock

from readerwriterlock import rwlock
a = rwlock.RWLockFairD()

with a.gen_rlock():
    # Read stuff
    
with a.gen_wlock():
    # Write stuff

multiprocessing

multiprocessing — Process-based parallelism

创建子进程

from multiprocessing import Process
import os

def say_hello(name):
    print(f'PID {os.getpid()}: hello {name}.')

if __name__ == '__main__':
    p = Process(target=say_hello, args=('Tom', )) # 创建子进程对象
    p.start() # 运行子进程
    p.join() # 等待子进程返回

进程池

from multiprocessing import Pool

def fibonacci(x):
    if x <= 1: return x
    if x >= 40: raise ValueError
    return fibonacci(x - 2) + fibonacci(x - 1)

if __name__ == '__main__':
    # takes about 5~6 seconds
    with Pool(3) as p:
        print(p.map(fibonacci, [35] * 3))
    
    # takes about 20 seconds
    with Pool(3) as p:
        print(p.map(fibonacci, [35] * 10))

进程间通信

Queue
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

    
# 使用管道
Pipe
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe() # Pipe()返回一对可以双向通信的连接
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

conccurent.futures

# 多线程
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def wait(n):
    time.sleep(n)
    print(f"Waited {n} seconds.")

def fibonacci(x):
    if x <= 1: return x
    if x >= 40: raise ValueError
    return fibonacci(x - 2) + fibonacci(x - 1)
    
with ThreadPoolExecutor(max_workers=workers) as pool:
    future = pool.submit(wait, [1] * 10) # submit后子进程开始运行

# 查看子进程是否在运行
future.running() 

# 等待结果返回
future.result()

# future.result()回阻塞主进程,如果不想阻塞,可以使用回调函数
def when_done(r):
    print('Got:', r.result())
with ThreadPoolExecutor() as pool:
     future = pool.submit(wait, 1)
     future.add_done_callback(when_done)
      
# as_completed函数返回一个包含指定的 Future 实例的迭代器,这些实例会在完成时被 yield 出来
with ThreadPoolExecutor() as pool:
    futures = [pool.submit(wait, random.randint(1, 5)) for _ in range(10)]
    for future in as_completed(futures):
        print(future.result())

        
# map写法
# max_workers默认值为 min(32, cpu核数+4),3.7版本之前为 5*cpu核数
# 当有一个进程报错时,生成器退出,如果需要捕获异常建议用submit
# 可以指定任务超时时间timeout
# chunksize默认为1,当任务数量很大,执行时间较短时,可以将chunksize调大,每次分配多个任务给每个子进程
with ThreadPoolExecutor(max_workers=workers, timeout=60, chunksize=100) as pool:
    result = pool.map(wait, [1] * n)
return result

# 多进程
from concurrent.futures import ProcessPoolExecutor

# submit写法
# max_workers默认值为cpu核数
with ProcessPoolExecutor(max_workers=workers) as pool:
    futures = [pool.submit(fibonacci, 30) for _ in range(n)]
return [res for res in as_completed(futures)]

# map写法
with ProcessPoolExecutor(max_workers=workers) as pool:
    result = pool.map(fibonacci, [30] * n)
return result

subprocess

# doesn't capture output
subprocess.run(["ls", "-l"])

# capture output
p = subprocess.run(["ls", "-l", "/dev/null"], capture_output=True)
print(p.stdout)

# run command string
subprocess.run("ls -l", shell=True)
# or
import shlex
subprocess.run(shlex.split("ls -la"))

# raise error if command fails
subprocess.run(["ls", "noexists"], check=True)

# pipeline
p1 = subprocess.Popen(["dmesg"], stdout=subprocess.PIPE)
p2 = subprocess.Popen(["grep", "hda"], stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close()  # Allow p1 to receive a SIGPIPE if p2 exits.
output = p2.communicate()[0]
# or (input trusted)
output = subprocess.check_output("dmesg | grep hda", shell=True)

Networking and Interprocess Communication

socket

Socket – Low-level networking interface

Socket Programming HOWTO

Socket Programming in Python (Guide)

socket connection process

server

import socket

HELLO_MSG = "Hello world"

if __name__ == "__main__":

    # Server socket doesn’t send any data.
    # It doesn’t receive any data.
    # It just produces “client” sockets.
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    print(socket.gethostname())
    server.bind(('', 12358))
    server.listen(1)


    while True:
        conn, addr = server.accept()
        try:
            conn.send(HELLO_MSG.encode())
            while True:
                print("waiting...")
                data = conn.recv(1028)
                print(f"receive: {data}")
                if len(data) == 0:
                    break
                conn.send(data.upper())
        except:
            raise
        finally:
            conn.close()

client

import socket

if __name__ == "__main__":
    conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        conn.connect(('192.168.3.3', 12358))
        print("connected")
        while True:
            data = conn.recv(1024)
            print(data.decode())
            msg = input()
            conn.send(msg.encode())
    except:
        raise
    finally:
        conn.close()

asyncio

asyncio — Asynchronous I/O

Async IO in Python: A Complete Walkthrough

用async定义异步函数

import asyncio

async def do_something():
    l = []
    for i in range(10000):
        l.append(i ** 3)
    print(len(l))
    print("finish.")

异步函数要用await来调用

# 只会返回一个coroutine对象,并不会执行函数
def main():
    do_something()

# 要用run来执行
asyncio.run(do_something())

# 或者在别的异步函数中调用
async def main():
    await do_something() # 异步函数通过await来调用,await只能放在async函数内

asyncio.run(main())

或者通过旧式的API运行

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

顺序调用异步函数并不能同时执行

import asyncio
import time

async def do_something_cost_time(something, cost_time):
    await asyncio.sleep(cost_time) 
    print(f"{something} is done at {time.strftime('%X')}.")

async def main():
    print(f"started at {time.strftime('%X')}")
    await do_something_cost_time("WashClothes", 2) 
    await do_something_cost_time("WatchTV", 5) # 会等上一行执行完了再执行
    print(f"finished at {time.strftime('%X')}")
asyncio.run(main())

如果需要同时(concurrently)执行多个异步函数,需要将异步函数定义为task

async def main():
    print(f"started at {time.strftime('%X')}")
    task1 = asyncio.create_task(do_something_cost_time("WashClothes", 2))
    task2 = asyncio.create_task(do_something_cost_time("WatchTV", 5))
    task3 = asyncio.create_task(do_something_cost_time("FeedBaby", 3))
    # await除了可以调用异步函数,也可以调用task
    await task1 # 启动所有task,同时执行
    # print(1) # print when task1 finishes
    await task2
    # print(2) # print when task1 and task2 finish
    await task3
    # print(3) # print when task1, task2 and task3 finish
    # 只要await第一个task所有tasks就会同时开始执行,但是await后面的语句会等它前面所有的await完成才会执行
    print(f"finished at {time.strftime('%X')}") 
asyncio.run(main())

asyncio.gather的方式同时定义并执行tasks

async def main():
    await asyncio.gather(
        do_something_cost_time("WashClothes", 2),
        do_something_cost_time("WatchTV", 5),
        do_something_cost_time("FeedBaby", 3),
    )
    
asyncio.run(main())

或者通过旧式的API运行

loop = asyncio.get_event_loop()
tasks = [
    do_something_cost_time("WashClothes", 2),
    do_something_cost_time("WatchTV", 5),
    do_something_cost_time("FeedBaby", 3),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Event Loop

核心是一个 Queue,在一个循环中不断 pop 下一个 ready 的 callback 来执行

Usage:

# create and access a new asyncio event loop
loop = asyncio.new_event_loop()

print(loop)
# output: <_UnixSelectorEventLoop running=False closed=False debug=False>

# access he running event loop
loop = asyncio.get_running_loop()

# to execute a task (blocking)
loop.run_until_complete(asyncio.sleep(2))

# schedule a task (non blocking)
task = loop.create_task(asyncio.sleep(2))

# close the event loop
loop.close()

# stop the event loop
loop.stop()

# run forever until stopped
loop.run_forever()

# check loop status
loop.is_running()
loop.is_closed()

Run event loop in Process/Thread Executor

# create an executor
with ThreadPoolExecutor() as exe:
	# execute a function in event loop using executor
	loop.run_in_executor(exe, task)

Internet Data Handling

json

Import

import json

data = {
    'name' : 'ACME',
    'shares' : 100,
    'price' : 542.23
}

Working with string

json_str = json.dumps(data)
data = json.loads(json_str)

Working with file

with open('data.json', 'w') as f:
    json.dump(data, f)
with open('data.json', 'r') as f:
    data = json.load(f)

Importing Modules

importlib

for module_name in os.listdir(os.path.dirname(__file__)):
    if module_name != "__init__.py" and module_name.endswith(".py"):
        module_name = Path(module_name).stem
        module = import_module(".".join([__package__, module_name]))