Cookbook
{Back to Index}
Table of Contents
- 1. 数据结构
- 2. 函数
- 3. IO 编程
- 4. 面向对象编程
- 5. 流程控制
- 5.1. 归约函数
- 5.2. 标准库中的生成器函数
- 5.2.1. 用于过滤的生成器函数
- 5.2.1.1. itertools.compress(it, selector_it)
- 5.2.1.2. itertools.dropwhile(predicate, it)
- 5.2.1.3. builtin.filter(predicate, it)
- 5.2.1.4. itertools.filterfalse(predicate, it)
- 5.2.1.5. itertools.islice(it, [start], stop, step=1)
- 5.2.1.6. itertools.takewhile(predicate, it)
- 5.2.1.7. builtin.iter(callable, sentinel)
- 5.2.2. 用于映射的生成器函数
- 5.2.3. 用于合并可迭代对象的生成器函数
- 5.2.4. 用于扩展输出元素的生成器函数
- 5.2.5. 用于重新排列元素的生成器函数
- 5.2.1. 用于过滤的生成器函数
- 5.3. 上下文管理
- 6. 并发编程
- 7. 元编程
1 数据结构
1.1 序列操作
1.1.1 使用 * 处理元组拆包
*
前缀只能用在 一个 变量名前面,但是这个变量可以出现在赋值表达式的任意位置。
a, b, *rest1 = range(5) log("rest1", rest1) a, b, *rest2 = range(3) log("rest2", rest2) a, b, *rest3 = range(2) log("rest3", rest3) a, *body, c, d = range(5) log("body", body) *head, b, c, d = range(5) log("head", head)
=========== rest1 ============ [2, 3, 4] =========== rest2 ============ [2] =========== rest3 ============ [] ============ body ============ [1, 2] ============ head ============ [0, 1]
1.1.2 对序列重排位(itemgetter)
from operator import itemgetter data = [ ('c', 2), ('b', 3), ['a', 1] ] log("[itemgetter(1, 0)(t) for t in data]", [itemgetter(1, 0)(t) for t in data])
===================== [itemgetter(1, 0)(t) for t in data] ====================== [(2, 'c'), (3, 'b'), (1, 'a')]
1.1.3 给切片赋值
如果赋值的对象是一个切片, 赋值语句的右侧必须是个可迭代对象 。
l = list(range(10)) log("l", l) l[2:5] = [20, 30] log("after l[2:5] = [20, 30]", l) del l[5:7] log("after del l[5:7]", l) l[3::2] = [11, 22] log("after l[3::2] = [11, 22]", l) try: l[2:5] = 100 except Exception as e: log("e", e) l[2:5] = [100] log("after l[2:5] = [100]", l)
============= l ============== [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] == after l[2:5] = [20, 30] === [0, 1, 20, 30, 5, 6, 7, 8, 9] ====== after del l[5:7] ====== [0, 1, 20, 30, 5, 8, 9] == after l[3::2] = [11, 22] == [0, 1, 20, 11, 5, 22, 9] ============= e ============== can only assign an iterable ==== after l[2:5] = [100] ==== [0, 1, 100, 22, 9]
1.1.4 使用 bisect 管理已排序序列
1.1.4.1 使用 bisect 搜索
import bisect import sys HAYSTACK = [1, 4, 5, 6, 8, 12, 15, 20, 21, 23, 23, 26, 29, 30] NEEDLES = [0, 1, 2, 5, 8, 10, 22, 23, 29, 30, 31] ROW_FMT = '{0:2d} @ {1:2d} {2}{0:<2d}' def demo(bisect_fn): print('DEMO:', bisect_fn.__name__) print('haystack ->', ' '.join('%2d' % n for n in HAYSTACK)) for needle in reversed(NEEDLES): position = bisect_fn(HAYSTACK, needle) offset = position * ' |' print(ROW_FMT.format(needle, position, offset)) demo(bisect.bisect_left) print('=' * 60) demo(bisect.bisect)
DEMO: bisect_left haystack -> 1 4 5 6 8 12 15 20 21 23 23 26 29 30 31 @ 14 | | | | | | | | | | | | | |31 30 @ 13 | | | | | | | | | | | | |30 29 @ 12 | | | | | | | | | | | |29 23 @ 9 | | | | | | | | |23 22 @ 9 | | | | | | | | |22 10 @ 5 | | | | |10 8 @ 4 | | | |8 5 @ 2 | |5 2 @ 1 |2 1 @ 0 1 0 @ 0 0 ============================================================ DEMO: bisect_right haystack -> 1 4 5 6 8 12 15 20 21 23 23 26 29 30 31 @ 14 | | | | | | | | | | | | | |31 30 @ 14 | | | | | | | | | | | | | |30 29 @ 13 | | | | | | | | | | | | |29 23 @ 11 | | | | | | | | | | |23 22 @ 9 | | | | | | | | |22 10 @ 5 | | | | |10 8 @ 5 | | | | |8 5 @ 3 | | |5 2 @ 1 |2 1 @ 1 |1 0 @ 0 0
1.1.4.2 使用 bisect 建立查询表格
def grade(score, breakpoints=[60, 70, 80, 90], grades='FDCBA'): i = bisect.bisect(breakpoints, score) return grades[i] log("[grade(score) for score in [33, 99, 77, 70, 89, 90, 100]]", [grade(score) for score in [33, 99, 77, 70, 89, 90, 100]])
========== [grade(score) for score in [33, 99, 77, 70, 89, 90, 100]] =========== ['F', 'A', 'C', 'C', 'B', 'A', 'A']
1.1.4.3 使用 bisect.insort 插入新元素
import bisect import random SIZE=7 random.seed(1729) my_list = [] for i in range(SIZE): new_item = random.randrange(SIZE*2) bisect.insort(my_list, new_item) print('%2d ->' % new_item, my_list)
10 -> [10] 0 -> [0, 10] 6 -> [0, 6, 10] 8 -> [0, 6, 8, 10] 7 -> [0, 6, 7, 8, 10] 2 -> [0, 2, 6, 7, 8, 10] 10 -> [0, 2, 6, 7, 8, 10, 10]
1.1.5 列表中删除数据的陷阱
lst = ['a', 'b', 'c', '', ''] for i in lst: if i is '': lst.remove(i) print(lst)
['a', 'b', 'c', '']
1.1.5.1 原理
list 属于线性表,它的连续在于用一块连续的内存空间存储元素,在调用 remove 时,只是删除了地址内的元素。
回到问题,当删除列表中的元素时, for in
是对下标进行操作,
而 remove 是对值进行操作, 当 for 到达索引为 3 ,即第一个空字符位置时,符合条件即删除。
因为 list 是线性表,所以删除这个空字符时, 同时后面的所有元素自动移动位置 1 ,
此时的 lst 等于 ['a', 'b', 'c', '']
,因为 for in
已经遍历到了索引 3 处,所以循环结束。
1.1.5.2 解决方法
lst = ['a','','b','','c','',''] # 做法一 while '' in lst: lst.remove('') print(lst) # 做法二 lst_new = [i for i in lst if i != '']
1.1.6 列表中出现频率最高
words = [ 'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes', 'the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around', 'the', 'eyes', "don't", 'look', 'around', 'the', 'eyes', 'look', 'into', 'my', 'eyes', "you're", 'under' ] from collections import Counter word_counts = Counter(words) top_three = word_counts.most_common(3) print(top_three)
[('eyes', 8), ('the', 5), ('look', 4)]
1.1.7 列表中最大或最小的几项
import heapq portfolio = [ {'name': 'IBM', 'shares': 100, 'price': 91.1}, {'name': 'AAPL', 'shares': 50, 'price': 543.22}, {'name': 'FB', 'shares': 200, 'price': 21.09}, {'name': 'HPQ', 'shares': 35, 'price': 31.75}, {'name': 'YHOO', 'shares': 45, 'price': 16.35}, {'name': 'ACME', 'shares': 75, 'price': 115.65} ] cheap = heapq.nsmallest(3, portfolio, key=lambda s: s['price']) expensive = heapq.nlargest(3, portfolio, key=lambda s: s['price']) r = {'cheap': cheap, 'expensive': expensive} print(r)
1.1.8 对列表中的数据分组
rows = [ {'address': '5412 N CLARK', 'date': '07/01/2012'}, {'address': '5148 N CLARK', 'date': '07/04/2012'}, {'address': '5800 E 58TH', 'date': '07/02/2012'}, {'address': '2122 N CLARK', 'date': '07/03/2012'}, {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'}, {'address': '1060 W ADDISON', 'date': '07/02/2012'}, {'address': '4801 N BROADWAY', 'date': '07/01/2012'}, {'address': '1039 W GRANVILLE', 'date': '07/04/2012'}, ] from itertools import groupby rows.sort(key=lambda r: r['date']) for date, items in groupby(rows, key=lambda r: r['date']): print(date) for i in items: print(' ', i)
07/01/2012 {'address': '5412 N CLARK', 'date': '07/01/2012'} {'address': '4801 N BROADWAY', 'date': '07/01/2012'} 07/02/2012 {'address': '5800 E 58TH', 'date': '07/02/2012'} {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'} {'address': '1060 W ADDISON', 'date': '07/02/2012'} 07/03/2012 {'address': '2122 N CLARK', 'date': '07/03/2012'} 07/04/2012 {'address': '5148 N CLARK', 'date': '07/04/2012'} {'address': '1039 W GRANVILLE', 'date': '07/04/2012'}
或者使用 defaultdict 来实现:
from collections import defaultdict rows_by_date = defaultdict(list) for row in rows: rows_by_date[row['date']].append(row) print(rows_by_date)
defaultdict(<class 'list'>, {'07/01/2012': [{'address': '5412 N CLARK', 'date': '07/01/2012'}, {'address': '4801 N BROADWAY', 'date': '07/01/2012'}], '07/02/2012': [{'address': '5800 E 58TH', 'date': '07/02/2012'}, {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'}, {'address': '1060 W ADDISON', 'date': '07/02/2012'}], '07/03/2012': [{'address': '2122 N CLARK', 'date': '07/03/2012'}], '07/04/2012': [{'address': '5148 N CLARK', 'date': '07/04/2012'}, {'address': '1039 W GRANVILLE', 'date': '07/04/2012'}]})
1.1.9 消除序列中的重复数据,同时保持数据顺序
def dedupe(items, key=None): seen = set() for item in items: val = item if key is None else key(item) if val not in seen: yield item seen.add(val) a = [ {'x': 2, 'y': 3}, {'x': 1, 'y': 4}, {'x': 2, 'y': 3}, {'x': 2, 'y': 3}, {'x': 10, 'y': 15} ] print(list(dedupe(a, key=lambda a: (a['x'],a['y']))))
[{'x': 2, 'y': 3}, {'x': 1, 'y': 4}, {'x': 10, 'y': 15}]
def dedupe(items): seen = set() for item in items: if item not in seen: yield item seen.add(item) a = [1, 5, 2, 1, 9, 1, 5, 10] print(list(dedupe(a)))
[1, 5, 2, 9, 10]
1.1.10 序列解包 (unpack)
records = [ ('foo', 1, 2), ('bar', 'hello'), ('foo', 3, 4), ] def do_foo(x,y): print('foo', x, y) def do_bar(s): print('bar', s) for tag, *args in records: if tag == 'foo': do_foo(*args) elif tag == 'bar': do_bar(*args)
foo 1 2 bar hello foo 3 4
1.1.11 flatten 列表
from collections import Iterable def flatten(items, ignore_types=(str, bytes)): for x in items: if isinstance(x, Iterable) and not isinstance(x, ignore_types): yield from flatten(x) else: yield x items = [1, 2, [3, 4, [5, 6], 7], 8] # Produces 1 2 3 4 5 6 7 8 for x in flatten(items): print(x) items = ['Dave', 'Paula', ['Thomas', 'Lewis']] for x in flatten(items): print(x)
1 2 3 4 5 6 7 8 Dave Paula Thomas Lewis
1.1.12 优先队列
import heapq class PriorityQueue: def __init__(self): self._queue = [] self._index = 0 def push(self, item, priority): heapq.heappush(self._queue, (-priority, self._index, item)) self._index += 1 def pop(self): return heapq.heappop(self._queue)[-1] # Example use class Item: def __init__(self, name): self.name = name def __repr__(self): return 'Item({!r})'.format(self.name) q = PriorityQueue() q.push(Item('foo'), 1) q.push(Item('bar'), 5) q.push(Item('spam'), 4) q.push(Item('grok'), 1) print("Should be bar:", q.pop()) print("Should be spam:", q.pop()) print("Should be foo:", q.pop()) print("Should be grok:", q.pop())
Should be bar: Item('bar') Should be spam: Item('spam') Should be foo: Item('foo') Should be grok: Item('grok')
1.2 字典操作
1.2.1 defaultdict
在实例化一个 defaultdict 的时候,需要给构造方法提供一个可调用对象,
这个可调用对象会在 __getitem__
找不到键的时候被调用,返回默认值。
import collections d = collections.defaultdict(list) d['a'].append('b') d['c'].append('d') log("d", d)
============= d ============== defaultdict(<class 'list'>, {'a': ['b'], 'c': ['d']})
1.2.2 OrderedDict
这个类型在添加键的时候会保持顺序,因此键的迭代次序总是一致的。
OrderedDict 的 popitem 方法默认删除并返回字典里最后一个元素,
但是如果调用 popitem(last=False)
,则删除并返回第一个被添加进去的元素。
1.2.3 ChainMap
该类型可以容纳多个不同的映射对象,在进行键查找操作时,会逐个查找这些映射对象,直到键被找到为止。
这个功能在给有嵌套作用域的语言做解释器的时候很有用,可以用一个映射对象来代表一个作用域上下文。
import builtins from collections import ChainMap pylookup = ChainMap(locals(), globals(), vars(builtins))
1.2.4 Counter
这个映射类型会给键准备一个整数计数器,每次更新一个键的时候会增加这个计数器。
Counter 实现了 + 和 - 运算符来合并记录。 most_common([n])
方法会返回最常见的 n 个键和它们的计数。
from collections import Counter ct = Counter('abracadabra') print(ct) ct.update('aaaaazzz') print('after update'.center(30, '=')) print(ct) print(ct.most_common(2))
Counter({'a': 5, 'b': 2, 'r': 2, 'c': 1, 'd': 1}) =========after update========= Counter({'a': 10, 'z': 3, 'b': 2, 'r': 2, 'c': 1, 'd': 1}) [('a', 10), ('z', 3)]
1.2.5 UserDict
这个类其实是把标准 dict 用纯 Python 又实现了一遍,主要的用途是让用户继承写子类的。
更倾向于从 UserDict 而不是从 dict 继承的主要原因是后者有时会在某些方法的实现上走一些捷径, 导致不得不在子类中重写这些方法,但是 UserDict 就不会有这个问题。
继承自 dict 的 __init__
方法忽略了子类的 __setitem__
方法:
class MyDict(dict): def __setitem__(self, key, value): super().__setitem__(key, value*2) d = MyDict(one=1) print(d) d['two'] = 2 print(d)
{'one': 1} {'one': 1, 'two': 4}
继承自 dict 的 update 方法也忽略了子类的 __setitem__
方法:
d.update(three=3) print(d)
{'one': 1, 'two': 4, 'three': 3}
另外一个值得注意的地方是, UserDict 并不是 dict 的子类 。
UserDict 有一个 data
属性,是 dict 的实例,这个属性实际上是 UserDict 最终存储数据的地方 。
1.2.6 不可变映射类型(MappingProxyType)
types.MappingProxyType 会返回一个只读的映射视图。 虽然是只读视图,但是它是动态的,如果对原映射做出改动, 通过这个视图可以观察到,但是无法通过这个视图对原映射做出修改。
from types import MappingProxyType d = {1: 'A'} d_proxy = MappingProxyType(d) print(d_proxy) log("d_proxy[1]", d_proxy[1]) try: d_proxy[2] = 'x' except Exception as e: print(e) d[2] = 'B' print(d_proxy)
{1: 'A'} ========= d_proxy[1] ========= A 'mappingproxy' object does not support item assignment {1: 'A', 2: 'B'}
1.2.7 对字典作集合运算
a = { 'x' : 1, 'y' : 2, 'z' : 3 } b = { 'w' : 10, 'x' : 11, 'y' : 2 } print('Common keys:', a.keys() & b.keys()) print('Keys in a not in b:', a.keys() - b.keys()) print('(key,value) pairs in common:', a.items() & b.items())
Common keys: {'y', 'x'} Keys in a not in b: {'z'} (key,value) pairs in common: {('y', 2)}
1.2.8 组合多个字典当作一个字典使用
a = {'x': 1, 'z': 3 } b = {'y': 2, 'z': 4 } # (a) Simple example of combining from collections import ChainMap c = ChainMap(a,b) print("c:", c) print(c['x']) # Outputs 1 (from a) print(c['y']) # Outputs 2 (from b) print(c['z']) # Outputs 3 (from a) # Output some common values print('len(c):', len(c)) print('c.keys():', list(c.keys())) print('c.values():', list(c.values())) # Modify some values c['z'] = 10 c['w'] = 40 del c['x'] print("a:", a)
c: ChainMap({'x': 1, 'z': 3}, {'y': 2, 'z': 4}) 1 2 3 len(c): 3 c.keys(): ['z', 'y', 'x'] c.values(): [3, 2, 1] a: {'z': 10, 'w': 40}
1.2.9 字典栈
# Example of stacking mappings (like scopes) values = ChainMap() values['x'] = 1 # Add a new mapping values = values.new_child() values['x'] = 2 # Add a new mapping values = values.new_child() values['x'] = 3 print(values) print(values['x']) # Discard last mapping values = values.parents print(values) print(values['x']) # Discard last mapping values = values.parents print(values) print(values['x'])
ChainMap({'x': 3}, {'x': 2}, {'x': 1}) 3 ChainMap({'x': 2}, {'x': 1}) 2 ChainMap({'x': 1}) 1
1.3 文本操作
1.3.1 获取 Unicode 字符名称(unicodedata)
from unicodedata import name for i in range(250, 256): print(chr(i), ":", name(chr(i), ''))
ú : LATIN SMALL LETTER U WITH ACUTE û : LATIN SMALL LETTER U WITH CIRCUMFLEX ü : LATIN SMALL LETTER U WITH DIAERESIS ý : LATIN SMALL LETTER Y WITH ACUTE þ : LATIN SMALL LETTER THORN ÿ : LATIN SMALL LETTER Y WITH DIAERESIS
1.3.2 字符编码侦测(Chardet)
! chardetect ~/test.org
/Users/ruan/test.org: ascii with confidence 1.0
1.3.3 格式化
1.3.3.1 填充与对齐
print('{:>10}'.format('test')) print('{:10}'.format('test')) print('{:^10}'.format('test')) print('{:_<10}'.format('test'))
test test test test______
1.3.3.2 字符串截断
print('{:.5}'.format('xylophone')) print('{:10.5}'.format('xylophone'))
xylop xylop
1.3.3.3 占位符
data = {'first': 'Hodor', 'last': 'Hodor!'} print('{first} {last}'.format(**data)) print('{first} {last}'.format(first='Hodor', last='Hodor!'))
Hodor Hodor! Hodor Hodor!
person = {'first': 'Jean-Luc', 'last': 'Picard'} data = [4, 8, 15, 16, 23, 42] class Plant(object): category = 'tree' kinds = [{'name': 'oak'}, {'name': 'maple'}] print('{p[first]} {p[last]}'.format(p=person)) print('{d[4]} {d[5]}'.format(d=data)) print('{p.category}: {p.kinds[0][name]}'.format(p=Plant()))
Jean-Luc Picard 23 42 tree: oak
1.3.3.4 排版
import textwrap s = "Look into my eyes, look into my eyes, the eyes, the eyes, \ the eyes, not around the eyes, don't look around the eyes, \ look into my eyes, you're under." print(textwrap.fill(s, 70)) print() print(textwrap.fill(s, 40)) print() print(textwrap.fill(s, 40, initial_indent=' ')) print() print(textwrap.fill(s, 40, subsequent_indent=' ')) print()
Look into my eyes, look into my eyes, the eyes, the eyes, the eyes, not around the eyes, don't look around the eyes, look into my eyes, you're under. Look into my eyes, look into my eyes, the eyes, the eyes, the eyes, not around the eyes, don't look around the eyes, look into my eyes, you're under. Look into my eyes, look into my eyes, the eyes, the eyes, the eyes, not around the eyes, don't look around the eyes, look into my eyes, you're under. Look into my eyes, look into my eyes, the eyes, the eyes, the eyes, not around the eyes, don't look around the eyes, look into my eyes, you're under.
1.3.4 字符串匹配
1.3.4.1 使用 shell 风格的通配符匹配字符串
from fnmatch import fnmatchcase as match addresses = [ '5412 N CLARK ST', '1060 W ADDISON ST', '1039 W GRANVILLE AVE', '2122 N CLARK ST', '4802 N BROADWAY', ] a = [addr for addr in addresses if match(addr, '* ST')] print(a) b = [addr for addr in addresses if match(addr, '54[0-9][0-9] *CLARK*')] print(b)
['5412 N CLARK ST', '1060 W ADDISON ST', '2122 N CLARK ST'] ['5412 N CLARK ST']
1.3.4.2 贪婪和非贪婪匹配
import re # Sample text text = 'Computer says "no." Phone says "yes."' # (a) Regex that finds quoted strings - longest match str_pat = re.compile(r'\"(.*)\"') print(str_pat.findall(text)) # (b) Regex that finds quoted strings - shortest match str_pat = re.compile(r'\"(.*?)\"') print(str_pat.findall(text))
['no." Phone says "yes.'] ['no.', 'yes.']
1.3.5 输入密码
import getpass user = getpass.getuser() passwd = getpass.getpass() print('User:', user) print('Passwd:', passwd)
1.4 字节操作
1.4.1 将字节序列转换成不同类型字段组成的元组(struct)
struct
模块能处理 bytes
, bytearray
和 memoryview
对象。
import struct fmt = '<1s3sHHH' with open('img/p3_hash.png', 'rb') as fp: img = memoryview(fp.read()) header = img[:10] log("bytes(header)", bytes(header)) log("struct.unpack(fmt, header)", struct.unpack(fmt, header))
================================ bytes(header) ================================= b'\x89PNG\r\n\x1a\n\x00\x00' ========================== struct.unpack(fmt, header) ========================== (b'\x89', b'PNG', 2573, 2586, 0)
1.5 时间操作
1.5.1 time 模块
time 模块始终返回 UTC 时间。
1.5.1.1 获取 Unix Timestamp
即从 Epoch (1970年1月1日00:00:00 UTC) 开始所经过的秒数。
import time print(time.time())
1542554851.5957272
1.5.1.2 获取具体时间值
current_time = time.time() current_struct_time = time.gmtime(current_time) print(current_struct_time)
time.struct_time(tm_year=2018, tm_mon=11, tm_mday=18, tm_hour=15, tm_min=27, tm_sec=37, tm_wday=6, tm_yday=322, tm_isdst=0)
current_year = current_struct_time.tm_year current_mon = current_struct_time.tm_mon current_mday = current_struct_time.tm_mday current_hour = current_struct_time.tm_hour current_min = current_struct_time.tm_min r = (current_year, current_mon, current_mday, current_hour, current_min) print(r)
(2018, 11, 18, 15, 27)
1.5.2 datatime 模块
datetime 模块简化了日期操作,如增加天数,设置时区等。
1.5.2.1 创建时间
import datetime d = datetime.datetime(year=2017, month=12, day=31, hour=12, minute=59, second=59) r = (d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond) print(r)
(2017, 12, 31, 12, 59, 59, 0)
1.5.2.2 获取 UTC 时间
print(datetime.datetime.utcnow())
2018-11-18 15:27:48.018708
1.5.2.3 获取当前时区时间
print(datetime.datetime.now())
2018-11-18 23:27:54.145839
1.5.2.4 日期运算
today = datetime.datetime.now() diff = datetime.timedelta(weeks=3, days=2) future = today + diff past = today - diff print((future, past))
(datetime.datetime(2018, 12, 11, 23, 28, 0, 332108), datetime.datetime(2018, 10, 26, 23, 28, 0, 332108))
1.5.2.5 日期转字符串
r = '{:%Y-%m-%d %H:%M}'.format(datetime.datetime(2001, 2, 3, 4, 5)) print(r)
2001-02-03 04:05
1.5.2.6 字符串转日期
r = datetime.datetime.strptime("Mar 03, 2010", "%b %d, %Y") print(r)
2010-03-03 00:00:00
1.5.3 获取月份缩写
from calendar import month_abbr print(list(month_abbr))
['', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']
1.6 数字操作
1.6.1 数字格式化
print('{:06.2f}'.format(3.141592653589793)) print('{:04d}'.format(42)) print('{:+d}'.format(42)) print('{: d}'.format(42)) print('{: d}'.format(-42)) print('{:=5d}'.format((- 23))) print('{:=+5d}'.format((23)))
003.14 0042 +42 42 -42 - 23 + 23
2 函数
2.1 获取函数签名信息(inspect.signature)
from inspect import signature def foo(a, b=1, **c): pass print(signature(foo)) print() for name, param in sig.parameters.items(): print(param.kind, ':', name, '=', param.default)
(a, b=1, **c) POSITIONAL_OR_KEYWORD : a = <class 'inspect._empty'> POSITIONAL_OR_KEYWORD : b = 1 VAR_KEYWORD : c = <class 'inspect._empty'>
inspect.signature
返回一个 inspect.Signature
对象,
它有一个 parameters
属性,这是一个有序映射,将参数名和 inspect.Parameter
对象对应起来。
Parameter
对象的属性有 name
, default
, kind
。
inspect._empty
表示没有默认值。(这里不能使用 None
,是因为 None
可以作为默认值)
2.1.1 绑定参数到函数签名中(inspect.Signature.bind)
inspect.Signature
对象有个 bind
方法,可以把任意个参数绑定到签名中的形参上,
所用的规则与实参到形参的匹配方式一样。
在框架中可以使用这个方法 在真正调用函数前验证参数 。
import inspect def foo(a, b=1, **c): pass args = {'a': 1, 'b': 2, 'x': '1', 'y': '2'} sig = inspect.signature(foo) bound_args = sig.bind(**args) log("bound_args", bound_args) print() for name, value in bound_args.arguments.items(): print(name, '=', value) print() del args['a'] try: bound_args = sig.bind(**args) except Exception as e: print(e)
================================== bound_args ================================== <BoundArguments (a=1, b=2, c={'x': '1', 'y': '2'})> a = 1 b = 2 c = {'x': '1', 'y': '2'} missing a required argument: 'a'
2.2 函数式编程风格(operator)
operator
模块以函数的形式提供了 Python 全部的中缀运算符,
从而避免编写类似 lambda a, b: a * b
这种平凡的匿名函数。
2.2.1 数学运算
import functools import operator log("functools.reduce(operator.xor, range(6))", functools.reduce(operator.xor, range(6))) log("functools.reduce(operator.mul, range(1, 7))", functools.reduce(operator.mul, range(1, 7)))
=================== functools.reduce(operator.xor, range(6)) =================== 1 ================= functools.reduce(operator.mul, range(1, 7)) ================== 720
2.2.2 从序列中取出元素(itemgetter)
from operator import itemgetter data = [ ('c', 2), ('b', 3), ('a', 1) ] log("sorted(data, key=itemgetter(0))", sorted(data, key=itemgetter(0)))
======================= sorted(data, key=itemgetter(0)) ======================== [('a', 1), ('b', 3), ('c', 2)]
如果把多个参数传给 itemgetter
,它构建的函数会返回提取的值构成的元组:
log("[itemgetter(1, 0)(t) for t in data]", [itemgetter(1, 0)(t) for t in data])
===================== [itemgetter(1, 0)(t) for t in data] ====================== [(2, 'c'), (3, 'b'), (1, 'a')]
2.2.3 读取对象属性(attrgetter)
attrgetter
与 itemgetter
作用类似,它创建的函数根据名称提取对象的属性。
如果把多个属性名传给 attrgetter
,它也会返回提取的值构成的元组。
此外,如果参数名中包含 . , attrgetter
会深入嵌套对象,获取指定的属性。
metro_data = [ ('Tokyo', 'JP', 36.933, (35.689722, 139.691667)), ('Delhi NCR', 'IN', 21.935, (28.613889, 77.208889)), ('Mexico City', 'MX', 20.142, (19.433333, -99.133333)), ('New York-Newark', 'US', 20.104, (40.808611, -74.020386)), ('Sao Paulo', 'BR', 19.649, (-23.547778, -46.635833)) ] from collections import namedtuple LatLong = namedtuple('LatLong', 'lat long') Metropolis = namedtuple('Metropolis', 'name cc pop coord') metro_areas = [Metropolis(name, cc, pop, LatLong(lat, long)) for name, cc, pop, (lat, long) in metro_data] log("metro_areas[0]", metro_areas[0]) name_lat = attrgetter('name', 'coord.lat') log("name_lat(metro_areas[0])", name_lat(metro_areas[0])) print() from operator import attrgetter for city in sorted(metro_areas, key=attrgetter('coord.lat')): print(name_lat(city))
================================ metro_areas[0] ================================ Metropolis(name='Tokyo', cc='JP', pop=36.933, coord=LatLong(lat=35.689722, long=139.691667)) =========================== name_lat(metro_areas[0]) =========================== ('Tokyo', 35.689722) ('Sao Paulo', -23.547778) ('Mexico City', 19.433333) ('Delhi NCR', 28.613889) ('Tokyo', 35.689722) ('New York-Newark', 40.808611)
2.2.4 调用对象方法(methodcall)
from operator import methodcaller s = "hello world" log("methodcaller('upper')(s)", methodcaller('upper')(s)) log("methodcaller('replace', ' ', '-')(s)", methodcaller('replace', ' ', '-')(s))
=========================== methodcaller('upper')(s) =========================== HELLO WORLD ===================== methodcaller('replace', ' ', '-')(s) ===================== hello-world
2.3 使用装饰器
2.3.1 记录函数运行时间
import functools import time def clock(func): @functools.wraps(func) def wrapper(*args, **kw): t0 = time.perf_counter() result = func(*args, **kw) elapsed = time.perf_counter() - t0 name = func.__name__ arg_lst = [] if args: arg_lst.append(', '.join(repr(arg) for arg in args)) if kw: pairs = ['{}={}'.format(k, w) for k, w in sorted(kw.items())] arg_lst.append(', '.join(pairs)) arg_str = ', '.join(arg_lst) print("[{:0.8f}] {}({}) -> {}".format(elapsed, name, arg_str, result)) return result return wrapper @clock def snooze(t): time.sleep(t) snooze(0.1) snooze(0.2) snooze(0.3)
[0.10157284] snooze(0.1) -> None [0.20283653] snooze(0.2) -> None [0.30416814] snooze(0.3) -> None
2.3.2 缓存函数返回值(functools.lru_cache)
它把耗时的函数的结果缓存起来,避免传入相同的参数时重复计算,可以使用两个可选的参数来配置:
maxsize
指定存储多少个调用的结果。缓存满了之后,旧的结果会被删除,腾出空间。
为了得到最佳性能,=maxsize= 应设为 2 的幂。
typed
如果设为 True ,把不同参数类型得到的结果分开保存, 即把通常认为相等的浮点数和整数参数(如 1 和 1.0)区分开。
因为 lru_cache
使用字典存储结果,而且键根据调用时传入的定位参数和关键字参数创建,
因此被 lru_cache
装饰的函数, 它的所有参数必须是可散列的 。
@clock def fib(n): if n < 2: return n return fib(n-2) + fib(n-1) print("result: ", fib(6))
[0.00000045] fib(0) -> 0 [0.00000053] fib(1) -> 1 [0.00029566] fib(2) -> 1 [0.00000040] fib(1) -> 1 [0.00000035] fib(0) -> 0 [0.00000041] fib(1) -> 1 [0.00005358] fib(2) -> 1 [0.00010595] fib(3) -> 2 [0.00048775] fib(4) -> 3 [0.00000033] fib(1) -> 1 [0.00000032] fib(0) -> 0 [0.00000110] fib(1) -> 1 [0.00010550] fib(2) -> 1 [0.00015888] fib(3) -> 2 [0.00000056] fib(0) -> 0 [0.00000050] fib(1) -> 1 [0.00012996] fib(2) -> 1 [0.00000042] fib(1) -> 1 [0.00000048] fib(0) -> 0 [0.00000041] fib(1) -> 1 [0.00005725] fib(2) -> 1 [0.00025901] fib(3) -> 2 [0.00050781] fib(4) -> 3 [0.00088358] fib(5) -> 5 [0.00145318] fib(6) -> 8 result: 8
import functools @functools.lru_cache() @clock def fib(n): if n < 2: return n return fib(n-2) + fib(n-1) print("result:", fib(6))
[0.00000047] fib(0) -> 0 [0.00000077] fib(1) -> 1 [0.00021293] fib(2) -> 1 [0.00000149] fib(3) -> 2 [0.00030547] fib(4) -> 3 [0.00000166] fib(5) -> 5 [0.00044729] fib(6) -> 8 result: 8
2.3.3 单分派泛函(functools.singledispatch)
使用 singledispatch
装饰的普通函数会变成分派泛函:
根据 第一个参数的类型 ,执行具体的操作。
(正是因为根据第一个参数的类型,所以称为单分派,
若根据多个参数选择 专门的函数 ,那就是多分派了)
专门的函数 应该处理抽象基类 ,如 numbers.Integral
, abc.MutableSequence
,
不要处理具体实现,如 int
, list
。这样,代码支持的兼容类型才能更广泛。
from functools import singledispatch from collections import abc import numbers import html @singledispatch def htmlize(obj): content = html.escape(repr(obj)) return '<pre>{}</pre>'.format(content) @htmlize.register(str) def _(text): content = html.escape(text).replace('\n', '<br>\n') return '<p>{0}</p>'.format(content) @htmlize.register(numbers.Integral) def _(n): return '<pre>{0} (0x{0:x})</pre>'.format(n) @htmlize.register(tuple) @htmlize.register(abc.MutableSequence) def _(seq): inner = '</li>\n<li>'.join(htmlize(item) for item in seq) return '<ul>\n<li>' + inner + '</li>\n</ul>' log("htmlize({1, 2, 3})", htmlize({1, 2, 3})) log("htmlize(abs)", htmlize(abs)) log("htmlize('helloworld')", htmlize('helloworld')) log("htmlize(42)", htmlize(42)) log("htmlize([1, 2, 3])", htmlize([1, 2, 3]))
============================== htmlize({1, 2, 3}) ============================== <pre>{1, 2, 3}</pre> ================================= htmlize(abs) ================================= <pre><built-in function abs></pre> ============================ htmlize('helloworld') ============================= <p>helloworld</p> ================================= htmlize(42) ================================== <pre>42 (0x2a)</pre> ============================== htmlize([1, 2, 3]) ============================== <ul> <li><pre>1 (0x1)</pre></li> <li><pre>2 (0x2)</pre></li> <li><pre>3 (0x3)</pre></li> </ul>
3 IO 编程
3.1 文件 IO
3.1.1 将文件描述符包装成文件对象
from socket import socket, AF_INET, SOCK_STREAM def echo_client(client_sock, addr): print("Got connection from", addr) # Make text-mode file wrappers for socket reading/writing client_in = open(client_sock.fileno(), 'rt', encoding='latin-1', closefd=False) client_out = open(client_sock.fileno(), 'wt', encoding='latin-1', closefd=False) # Echo lines back to the client using file I/O for line in client_in: client_out.write(line) client_out.flush() client_sock.close() def echo_server(address): sock = socket(AF_INET, SOCK_STREAM) sock.bind(address) sock.listen(1) while True: client, addr = sock.accept() echo_client(client, addr) print('Echo serving running on localhost:25000') echo_server(('', 25000))
3.1.2 改变已打开文件的编码方式
# Example of adding a text encoding to existing file-like object import urllib.request import io u = urllib.request.urlopen('http://www.python.org') f = io.TextIOWrapper(u, encoding='utf-8') text = f.read() print(text)
3.2 内存 IO
3.2.1 内存字符串
from io import StringIO f = StringIO() f.write('hello') f.write(' ') f.write('world!') print(f.getvalue())
f = StringIO('Hello!\nHi!\nGoodbye!') while True: s = f.readline() if s == '': # eof break print(s, end='')
3.2.2 内存比特流
from io import BytesIO f = BytesIO() f.write('中文'.encode('utf-8')) print(f.getvalue())
f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87') r = f.read().decode('utf-8') print(r)
4 面向对象编程
4.1 为实例属性提供默认值
类属性可用于为实例属性提供默认值:
class A: value = 'default value' pass a = A() print(a.value)
default value
4.2 创建缓存对象
class Spam: def __init__(self, name): self.name = name # Caching support import weakref _spam_cache = weakref.WeakValueDictionary() def get_spam(name): if name not in _spam_cache: s = Spam(name) _spam_cache[name] = s else: s = _spam_cache[name] return s if __name__ == '__main__': a = get_spam('foo') b = get_spam('bar') print('a is b:', a is b) c = get_spam('foo') print('a is c:', a is c)
a is b: False a is c: True
import weakref class CachedSpamManager: def __init__(self): self._cache = weakref.WeakValueDictionary() def get_spam(self, name): if name not in self._cache: s = Spam(name) self._cache[name] = s else: s = self._cache[name] return s class Spam: def __init__(self, name): self.name = name Spam.manager = CachedSpamManager() def get_spam(name): return Spam.manager.get_spam(name) if __name__ == '__main__': a = get_spam('foo') b = get_spam('bar') print('a is b:', a is b) c = get_spam('foo') print('a is c:', a is c)
a is b: False a is c: True
# Example involving new and some of its problems import weakref class Spam: _spam_cache = weakref.WeakValueDictionary() def __new__(cls, name): if name in cls._spam_cache: return cls._spam_cache[name] else: self = super().__new__(cls) cls._spam_cache[name] = self return self def __init__(self, name): print('Initializing Spam') self.name = name if __name__ == '__main__': print("This should print 'Initializing Spam' twice") s = Spam('Dave') t = Spam('Dave') print(s is t)
This should print 'Initializing Spam' twice Initializing Spam Initializing Spam True
4.3 使用抽象类
from abc import ABCMeta, abstractmethod class A(metaclass=ABCMeta): @property @abstractmethod def name(self): pass @name.setter @abstractmethod def name(self, value): pass @classmethod @abstractmethod def method1(cls): pass @staticmethod @abstractmethod def method2(): pass
4.4 处理 Json 数据
4.4.1 使用 OrderedDict 处理 Json 数据
import json # Some JSON encoded text s = '{"name": "ACME", "shares": 50, "price": 490.1}' # (a) Turning JSON into an OrderedDict from collections import OrderedDict data = json.loads(s, object_pairs_hook=OrderedDict) print(data)
OrderedDict([('name', 'ACME'), ('shares', 50), ('price', 490.1)])
4.4.2 使用 Json 数据填充对象
class JSONObject: def __init__(self, d): self.__dict__ = d data = json.loads(s, object_hook=JSONObject) print(data.name) print(data.shares) print(data.price)
ACME 50 490.1
4.4.3 将对象序列化成 Json 数据
class Point: def __init__(self, x, y): self.x = x self.y = y def serialize_instance(obj): d = { '__classname__' : type(obj).__name__ } d.update(vars(obj)) return d p = Point(3,4) s = json.dumps(p, default=serialize_instance) print(s)
{"__classname__": "Point", "x": 3, "y": 4}
4.4.4 将 Json 数据反序列化成对象
classes = { 'Point' : Point } def unserialize_object(d): clsname = d.pop('__classname__', None) if clsname: cls = classes[clsname] obj = cls.__new__(cls) for key, value in d.items(): setattr(obj, key, value) return obj else: return d a = json.loads(s, object_hook=unserialize_object) print(a) print(a.x) print(a.y)
<__main__.Point object at 0x11c5f3e48> 3 4
5 流程控制
5.1 归约函数
5.1.1 all(it)
5.1.2 any(it)
5.1.3 max(it, [key], [default])
如果可迭代对象为空,返回 default
。
可以这样调用:
max(arg1, arg2, ..., [key])
5.1.4 min(it, [key], [default])
如果可迭代对象为空,返回 default
。
可以这样调用:
min(arg1, arg2, ..., [key])
5.1.5 functools.reduce(func, it, [initial])
5.1.6 sum(it, start=0)
5.2 标准库中的生成器函数
5.2.1 用于过滤的生成器函数
5.2.1.1 itertools.compress(it, selector_it)
并行处理两个可迭代对象:如果 selector_it 中的元素是真值,产出 it 中对应的元素。
import itertools def vowel(c): return c.lower() in 'aeiou' result = list(itertools.compress('Aardvark', (1, 0, 1, 1, 0, 1))) print(result)
['A', 'r', 'd', 'a']
5.2.1.2 itertools.dropwhile(predicate, it)
处理 it ,跳过 predicate 计算结果为真值的元素,产出剩下的元素。
print(list(itertools.dropwhile(vowel, 'Aardvark')))
['r', 'd', 'v', 'a', 'r', 'k']
5.2.1.3 builtin.filter(predicate, it)
如果 predicate(item)
返回真值,产出对应的元素,如果 predicate 是 None ,则只产出真值元素。
print(list(filter(vowel, 'Aardvark')))
['A', 'a', 'a']
5.2.1.4 itertools.filterfalse(predicate, it)
如果 predicate(item)
返回假值,产出对应的元素。
5.2.1.5 itertools.islice(it, [start], stop, step=1)
产出 it 的切片,类似于 s[:stop]
或 s[start:stop:step]
。
print(list(itertools.islice('Aardvark', 4))) print(list(itertools.islice('Aardvark', 4, 7))) print(list(itertools.islice('Aardvark', 1, 7, 2)))
['A', 'a', 'r', 'd'] ['v', 'a', 'r'] ['a', 'd', 'a']
5.2.1.6 itertools.takewhile(predicate, it)
如果 predicate(item)
返回真值,产出对应的元素,然后停止。
print(list(itertools.takewhile(vowel, 'Aardvark')))
['A', 'a']
5.2.1.7 builtin.iter(callable, sentinel)
第一个参数是一个没有参数的可调用对象,用于不断调用,产出各个值;第二个值是哨符,当可调用对象返回这个值时,迭代结束 (不产出哨符) 。
iter 的文档中有个实用的例子,这段代码逐行读取文件,直到遇到空行或到达文件末尾为止:
with open('mydata.txt') as fp: for line in iter(fp.readline, ''): process_line(line)
5.2.2 用于映射的生成器函数
5.2.2.1 itertools.accumulate(it, [func])
产出累计值,默认为求和;如果提供了 func ,则把前面两个元素传个 func , 然后把计算结果和下一个元素传给它,以此类推,最后产出结果。
from operator import mul sample = [5, 4, 2, 8, 7, 6, 3, 0, 9, 1] print(list(itertools.accumulate(sample))) print(list(itertools.accumulate(sample, min))) print(list(itertools.accumulate(sample, mul)))
[5, 9, 11, 19, 26, 32, 35, 35, 44, 45] [5, 4, 2, 2, 2, 2, 2, 0, 0, 0] [5, 20, 40, 320, 2240, 13440, 40320, 0, 0, 0]
5.2.2.2 builtin.enumerate(it, start=0)
产出有两个元素组成的元组,结构是 (index, item) ,其中 index 从 start 开始计数,item 则从 iterable 中获取。
5.2.2.3 builtin.map(func, it1, [it2, …, itN])
如果传入 N 个 可迭代对象,则 func 必须能接受 N 个参数。
5.2.2.4 itertools.starmap(func, it)
把 it 中各个元素传给 func ,产出结果。
print(list(itertools.starmap(mul, enumerate('albatroz', 1)))) print(list(itertools.starmap(lambda a, b: b/a, enumerate(itertools.accumulate(sample), 1))))
['a', 'll', 'bbb', 'aaaa', 'ttttt', 'rrrrrr', 'ooooooo', 'zzzzzzzz'] [5.0, 4.5, 3.6666666666666665, 4.75, 5.2, 5.333333333333333, 5.0, 4.375, 4.888888888888889, 4.5]
5.2.3 用于合并可迭代对象的生成器函数
5.2.3.1 itertools.chain(it1, …, itN)
无缝连接多个可迭代对象。
print(list(itertools.chain('ABC', range(3))))
['A', 'B', 'C', 0, 1, 2]
5.2.3.2 itertools.chain.from_iterable(it)
产出 it 生成的各个可迭代对象中的元素,无缝连接在一起。
print(list(itertools.chain.from_iterable(enumerate('ABC'))))
[0, 'A', 1, 'B', 2, 'C']
5.2.3.3 itertaools.product(it1, …, itN, repeat=1)
计算笛卡尔积,合并成由 N 个元素组成的元组。 repeat 关键字参数告诉 product 函数重复 N 次处理输入的各个可迭代对象。
print(list(itertools.product('ABC', range(2)))) # list(itertools.product('ABC', 'ABC')) print(list(itertools.product('ABC', repeat=2)))
[('A', 0), ('A', 1), ('B', 0), ('B', 1), ('C', 0), ('C', 1)] [('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'B'), ('B', 'C'), ('C', 'A'), ('C', 'B'), ('C', 'C')]
5.2.3.4 builtin.zip(it1, .., itN)
产出由 N 个元素组成的元组,只要有一个可迭代对象到头了,即停止。
print(list(zip('ABC', range(5), [10, 20, 30, 40, 50, 60])))
[('A', 0, 10), ('B', 1, 20), ('C', 2, 30)]
5.2.3.5 itertools.zip_longest(it1, …, itN, fillvalue=None)
产出由 N 个元素组成的元组,等到最长的可迭代对象到头了,即停止。
print(list(itertools.zip_longest('ABC', range(5), [10, 20, 30, 40, 50, 60], fillvalue='?')))
[('A', 0, 10), ('B', 1, 20), ('C', 2, 30), ('?', 3, 40), ('?', 4, 50), ('?', '?', 60)]
5.2.4 用于扩展输出元素的生成器函数
5.2.4.1 itertools.combinations(it, out_len)
把 it 产出的 out_len 个元素组合在一起,然后产出。
print(list(itertools.combinations('ABC', 2)))
[('A', 'B'), ('A', 'C'), ('B', 'C')]
5.2.4.2 itertools.combinations_with_replacement(it, out_len)
把 it 产出的 out_len 个元素组合在一起,然后产出,包含相同元素的组合。
print(list(itertools.combinations_with_replacement('ABC', 2)))
[('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ('B', 'C'), ('C', 'C')]
5.2.4.3 itertools.permutation(it, out_len=None)
把 out_len 个 it 产出元素排列在一起,然后产出这些排列;out_len 的默认值等于 len(list(it))
。
print(list(itertools.permutations('ABC', 2)))
[('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]
5.2.4.4 itertools.count(start=0, step=1)
从 start 开支不断产出数字,按 step 步幅增加。
print(list(itertools.islice(itertools.count(1, .3), 3)))
[1, 1.3, 1.6]
print(list(zip(count(), ['a', 'b', 'c'])))
[(0, 'a'), (1, 'b'), (2, 'c')]
5.2.4.5 itertools.cycle(it)
从 it 中产出元素,存储各个元素的 副本 ,然后按顺序重复不断地产出各个元素。
print(list(itertools.islice(itertools.cycle('ABC'), 7)))
['A', 'B', 'C', 'A', 'B', 'C', 'A']
5.2.4.6 itertools.repeat(item, [times])
不断产出指定元素,除非指定次数。常见用途,为 map 函数提供固定参数:
print(list(map(mul, range(11), itertools.repeat(5))))
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]
5.2.5 用于重新排列元素的生成器函数
5.2.5.1 itertools.groupby(it, key=None)
产出由两个元素组成的元组,形式为 (key, group) ,其中 key 是分组标准,group 是生成器,用于产出分组里的元素。
注意, itertool.groupby
假定输入的可迭代对象已使用指定的 key 分组了各个元素。
print(list(itertools.groupby('LLLAAGGG'))) print(list(itertools.groupby('LLAALAAGGG')))
[('L', <itertools._grouper object at 0x106a79208>), ('A', <itertools._grouper object at 0x106a791d0>), ('G', <itertools._grouper object at 0x106a79080>)] [('L', <itertools._grouper object at 0x106a79cf8>), ('A', <itertools._grouper object at 0x106a791d0>), ('L', <itertools._grouper object at 0x106a79080>), ('A', <itertools._grouper object at 0x106a79160>), ('G', <itertools._grouper object at 0x106a79358>)]
animals = ['duck', 'eagle', 'rat', 'giraffe', 'bear', 'bat', 'dolphin', 'shark', 'lion'] animals.sort(key=len) for length, group in itertools.groupby(animals, len): print(length, '->', list(group))
3 -> ['rat', 'bat'] 4 -> ['duck', 'bear', 'lion'] 5 -> ['eagle', 'shark'] 7 -> ['giraffe', 'dolphin']
5.2.5.2 builtin.reversed(seq)
seq 必须是序列,或是实现了 __reversed__
特殊方法的对象。
5.2.5.3 itertools.tee(it, n=2)
产出一个由 n 个生成器组成的元组,每个生成器用于单独产出输入的可迭代对象中的元素。
print(list(zip(*itertools.tee('ABC'))))
[('A', 'A'), ('B', 'B'), ('C', 'C')]
5.3 上下文管理
5.3.1 自动关闭对象(closing)
如果对象提供了 close()
方法,但没有实现 __enter__/__exit__
协议,则可以用这个函数构建上下文管理器。
from contextlib import closing class Door: def open(self): print("door opened") def close(self): print("door closed") with closing(Door()) as door: door.open()
door opened door closed
5.3.2 忽略异常(suppress)
构建忽略指定异常的上下文管理器。
from contextlib import suppress import os with suppress(FileNotFoundError): os.remove('somefile.tmp')
5.3.3 重定向(redirect_stdout)
import io from contextlib import redirect_stdout f = io.StringIO() with redirect_stdout(f): help(pow) log("f.getvalue()", f.getvalue())
================================= f.getvalue() ================================= Help on built-in function pow in module builtins: pow(x, y, z=None, /) Equivalent to x**y (with two arguments) or x**y % z (with three arguments) Some types, such as ints, are able to use a more efficient algorithm when invoked using the three argument form.
with open('/tmp/help.txt', 'w') as f: with redirect_stdout(f): help(pow)
5.3.4 使用生成器作为上下文管理器(@contextmanager)
这个装饰器把简单的生成器函数变成上下文管理器,这样就不用创建类去实现管理器协议了。
在使用 @contextmanager
装饰的生成器中, yield
语句的作用是把函数的定义体分成两部分:
- yield 语句前面的所有代码在 with 块开始时 (即解释器调用
__enter__
方法时) 执行 - yield 语句后面的代码在 with 块结束时 (即调用
__exit__
方法时) 执行
from contextlib import contextmanager class Query(object): def __init__(self, name): self.name = name def query(self): print('Query info about %s...' % self.name) @contextmanager def create_query(name): print('Begin') with suppress(Exception): yield Query(name) # 绑定到 as 子句的目标变量 print('End') with create_query('Bob') as q: q.query()
Begin Query info about Bob... End
@contextmanager def tag(name): print("<%s>" % name, end='') with suppress(Exception): yield # 无需使用 as 子句 print("</%s>" % name) with tag("h1"): print("hello", end='')
<h1>hello</h1>
本质上,=contextlib.contextmanager= 装饰器会把函数包装成实现了 __enter__
和 __exit__
方法的类
(类的名称是 _GeneraorContextManager) 。
这个类的 __enter__
方法有如下作用:
- 调用生成器函数,保存生成器对象 (这里把它成为 gen)
- 调用
next(gen)
,执行到yield
关键字所在位置 - 返回上一步
next(gen)
产出的值,以便把产出的值绑定到 with/as 语句中的目标变量上
with 块终止时, __exit__
方法会做以下几件事:
- 检查有没有异常,如果有,调用
gen.throw(ex)
,在生成器函数定义体中包含yield
关键字的那一行抛出异常 - 否则,再调用
next(gen)
,继续执行生成器函数定义体中yield
语句之后的代码
注意:
如果在 with 块中抛出了异常,Python 解释器会将其捕获,然后会在生成器函数中 yield
表达式处再次抛出。
因此使用 @contextmanager
装饰器时,要把 yield
语句放在 try/finally 语句中 (或者放在 with 语句中)。
(因为我们永远不知道使用上下文管理器的用户会在 with 块中做什么)
另外,=@contextmanager= 装饰器提供的 __exit__
方法 假定发给生成器的所有异常都得到处理了 ,因此应该压制异常。
如果不想让 @contextmanager
压制异常,必须在被装饰的函数中显式重新抛出异常。
6 并发编程
6.1 创建子进程
from multiprocessing import Process import os def run_proc(name): print('Run child process %s (%s)...' % (name, os.getpid())) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test',)) print('Child process will start.') p.start() p.join() # 等待子进程结束后再继续往下运行 print('Child process end.')
6.2 子进程的输入输出
import subprocess r = subprocess.call(['nslookup', 'www.python.org']) print('r: {}'.format(r)) p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'set q=mx\npython.org\nexit\n') print(output.decode('utf-8')) print('Exit code:', p.returncode) # 相当于在命令行执行命令nslookup,然后手动输入: # set q=mx # python.org # exit
r: 0 Server: 64.104.123.144 Address: 64.104.123.144#53 Non-authoritative answer: python.org mail exchanger = 50 mail.python.org. Authoritative answers can be found from: org nameserver = d0.org.afilias-nst.org. org nameserver = a2.org.afilias-nst.info. org nameserver = a0.org.afilias-nst.info. org nameserver = b0.org.afilias-nst.org. org nameserver = b2.org.afilias-nst.org. org nameserver = c0.org.afilias-nst.info. a0.org.afilias-nst.info internet address = 199.19.56.1 a2.org.afilias-nst.info internet address = 199.249.112.1 b0.org.afilias-nst.org internet address = 199.19.54.1 b2.org.afilias-nst.org internet address = 199.249.120.1 c0.org.afilias-nst.info internet address = 199.19.53.1 d0.org.afilias-nst.org internet address = 199.19.57.1 a0.org.afilias-nst.info has AAAA address 2001:500:e::1 a2.org.afilias-nst.info has AAAA address 2001:500:40::1 b0.org.afilias-nst.org has AAAA address 2001:500:c::1 b2.org.afilias-nst.org has AAAA address 2001:500:48::1 c0.org.afilias-nst.info has AAAA address 2001:500:b::1 d0.org.afilias-nst.org has AAAA address 2001:500:f::1 Exit code: 0
6.3 进程间通信
父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:
from multiprocessing import Process, Queue import os, time, random def write(q): print('Process to write: %s' % os.getpid()) for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) def read(q): print('Process to read: %s' % os.getpid()) while True: value = q.get(True) print('Get %s from queue.' % value) q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate() # pr 进程里是死循环,无法等待其结束,只能强行终止:
Process to write: 92549 Put A to queue... Process to read: 92550 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
6.4 创建线程
import time, threading def worker(): print('thread %s is running...' % threading.current_thread().name) time.sleep(1) print('thread %s ended.' % threading.current_thread().name) t = threading.Thread(target=worker, name='WorkerThread') t.start() t.join()
thread WorkerThread is running... thread WorkerThread ended.
6.5 线程加锁
import threading from concurrent.futures import ThreadPoolExecutor lock = threading.Lock() count = 0 def run_thread_without_lock(): global count while count < 10: count += 1 print("%s, " % count, end='') def run_thread_with_lock(): global count with lock: while count < 10: count += 1 print("%s, " % count, end='') with ThreadPoolExecutor(max_workers=4) as executor: for i in range(5): executor.submit(run_thread_without_lock) count = 0 with ThreadPoolExecutor(max_workers=4) as executor: for i in range(5): executor.submit(run_thread_with_lock)
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
6.6 thread local
一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
最常用的地方就是为每个线程绑定一个数据库连接,HTTP 请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
import threading, time tl = threading.local() def worker(name): tl.name = name time.sleep(3) print("in thread:", tl.name) t = threading.Thread(target=worker, args=("hello",), name='Thread-A') t.start() tl.name = 'world' print("in main:", tl.name)
in main: world
6.7 线程池
from concurrent.futures import ThreadPoolExecutor import urllib.request def fetch_url(url): u = urllib.request.urlopen(url) data = u.read() return data pool = ThreadPoolExecutor(10) # Submit work to the pool a = pool.submit(fetch_url, 'http://www.python.org') b = pool.submit(fetch_url, 'http://www.pypy.org') # Get the results back x = a.result() y = b.result()
# Out[1]:
7 元编程
7.1 控制类的属性定义的顺序
import collections class EntityMeta(type): @classmethod def __prepare__(cls, name, bases): return collections.OrderedDict()
7.2 控制类的创建
7.2.1 实例缓存
# Cached instances import weakref class Cached(type): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__cache = weakref.WeakValueDictionary() def __call__(self, *args): if args in self.__cache: return self.__cache[args] else: obj = super().__call__(*args) self.__cache[args] = obj return obj class Spam(metaclass=Cached): def __init__(self, name): print('Creating Spam({!r})'.format(name)) self.name = name if __name__ == '__main__': a = Spam('foo') b = Spam('bar') print('a is b:', a is b) c = Spam('foo') print('a is c:', a is c)
Creating Spam('foo') Creating Spam('bar') a is b: False a is c: True
7.2.2 单例
# Singleton class Singleton(type): def __init__(self, *args, **kwargs): self.__instance = None super().__init__(*args, **kwargs) def __call__(self, *args, **kwargs): if self.__instance is None: self.__instance = super().__call__(*args, **kwargs) return self.__instance else: return self.__instance class Spam(metaclass=Singleton): def __init__(self): print('Creating Spam') if __name__ == '__main__': a = Spam() b = Spam() print(a is b)
Creating Spam True
7.2.3 禁用直接实例化
# Not allowing direct instantiation class NoInstances(type): def __call__(self, *args, **kwargs): raise TypeError("Can't instantiate directly") class Spam(metaclass=NoInstances): @staticmethod def grok(x): print('Spam.grok') if __name__ == '__main__': try: s = Spam() except TypeError as e: print(e) Spam.grok(42)
Can't instantiate directly Spam.grok
7.3 不通过__init__方法创建实例
from time import localtime class Date: def __init__(self, year, month, day): self.year = year self.month = month self.day = day # Class method that bypasses __init__ @classmethod def today(cls): d = cls.__new__(cls) t = localtime() d.year = t.tm_year d.month = t.tm_mon d.day = t.tm_mday return d d = Date.__new__(Date) print(d) print(hasattr(d,'year')) data = { 'year' : 2012, 'month' : 8, 'day' : 29 } d.__dict__.update(data) print(d.year) print(d.month) d = Date.today() print(d.year, d.month, d.day)
<__main__.Date object at 0x1085e1588> False 2012 8 2018 6 13
7.4 动态定义类
# Example of making a class manually from parts # Methods def __init__(self, name, shares, price): self.name = name self.shares = shares self.price = price def cost(self): return self.shares * self.price cls_dict = { '__init__' : __init__, 'cost' : cost, } # Make a class import types Stock = types.new_class('Stock', (), {}, lambda ns: ns.update(cls_dict)) if __name__ == '__main__': s = Stock('ACME', 50, 91.1) print(s) print(s.cost())
<types.Stock object at 0x1085e1518> 4555.0
# An alternative formulation of namedtuples import operator import types import sys def named_tuple(classname, fieldnames): # Populate a dictionary of field property accessors cls_dict = { name: property(operator.itemgetter(n)) for n, name in enumerate(fieldnames) } # Make a __new__ function and add to the class dict def __new__(cls, *args): if len(args) != len(fieldnames): raise TypeError('Expected {} arguments'.format(len(fieldnames))) return tuple.__new__(cls, (args)) cls_dict['__new__'] = __new__ # Make the class cls = types.new_class(classname, (tuple,), {}, lambda ns: ns.update(cls_dict)) cls.__module__ = sys._getframe(1).f_globals['__name__'] return cls if __name__ == '__main__': Point = named_tuple('Point', ['x', 'y']) print(Point) p = Point(4, 5) print(len(p)) print(p.x, p[0]) print(p.y, p[1]) try: p.x = 2 except AttributeError as e: print(e) print('%s %s' % p)
<class '__main__.Point'> 2 4 4 5 5 can't set attribute 4 5
7.5 猴子补丁
import importlib import sys from collections import defaultdict _post_import_hooks = defaultdict(list) class PostImportFinder: def __init__(self): self._skip = set() def find_module(self, fullname, path=None): if fullname in self._skip: return None self._skip.add(fullname) return PostImportLoader(self) class PostImportLoader: def __init__(self, finder): self._finder = finder def load_module(self, fullname): importlib.import_module(fullname) module = sys.modules[fullname] for func in _post_import_hooks[fullname]: func(module) self._finder._skip.remove(fullname) return module def when_imported(fullname): def decorate(func): if fullname in sys.modules: func(sys.modules[fullname]) else: _post_import_hooks[fullname].append(func) return func return decorate sys.meta_path.insert(0, PostImportFinder())
@when_imported('threading') def warn_threads(mod): print('Threads? Are you crazy?') import threading