Cookbook
{Back to Index}

Table of Contents

1 数据结构

1.1 序列操作

1.1.1 使用 * 处理元组拆包

* 前缀只能用在 一个 变量名前面,但是这个变量可以出现在赋值表达式的任意位置。

a, b, *rest1 = range(5)
log("rest1", rest1)

a, b, *rest2 = range(3)
log("rest2", rest2)

a, b, *rest3 = range(2)
log("rest3", rest3)

a, *body, c, d = range(5)
log("body", body)

*head, b, c, d = range(5)
log("head", head)
=========== rest1 ============
[2, 3, 4]
=========== rest2 ============
[2]
=========== rest3 ============
[]
============ body ============
[1, 2]
============ head ============
[0, 1]

1.1.2 对序列重排位(itemgetter)

from operator import itemgetter

data = [
    ('c', 2),
    ('b', 3),
    ['a', 1]
]

log("[itemgetter(1, 0)(t) for t in data]", [itemgetter(1, 0)(t) for t in data])
===================== [itemgetter(1, 0)(t) for t in data] ======================
[(2, 'c'), (3, 'b'), (1, 'a')]

1.1.3 给切片赋值

如果赋值的对象是一个切片, 赋值语句的右侧必须是个可迭代对象

l = list(range(10))
log("l", l)

l[2:5] = [20, 30]
log("after l[2:5] = [20, 30]", l)

del l[5:7]
log("after del l[5:7]", l)

l[3::2] = [11, 22]
log("after l[3::2] = [11, 22]", l)

try:
    l[2:5] = 100
except Exception as e:
    log("e", e)

l[2:5] = [100]
log("after l[2:5] = [100]", l)
============= l ==============
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
== after l[2:5] = [20, 30] ===
[0, 1, 20, 30, 5, 6, 7, 8, 9]
====== after del l[5:7] ======
[0, 1, 20, 30, 5, 8, 9]
== after l[3::2] = [11, 22] ==
[0, 1, 20, 11, 5, 22, 9]
============= e ==============
can only assign an iterable
==== after l[2:5] = [100] ====
[0, 1, 100, 22, 9]

1.1.4 使用 bisect 管理已排序序列

1.1.4.1 使用 bisect 搜索
import bisect
import sys
HAYSTACK = [1, 4, 5, 6, 8, 12, 15, 20, 21, 23, 23, 26, 29, 30]
NEEDLES = [0, 1, 2, 5, 8, 10, 22, 23, 29, 30, 31]
ROW_FMT = '{0:2d} @ {1:2d}    {2}{0:<2d}'

def demo(bisect_fn):
    print('DEMO:', bisect_fn.__name__)
    print('haystack ->', ' '.join('%2d' % n for n in HAYSTACK))
    for needle in reversed(NEEDLES):
	position = bisect_fn(HAYSTACK, needle)
	offset = position * ' |'
	print(ROW_FMT.format(needle, position, offset))


demo(bisect.bisect_left)
print('=' * 60)
demo(bisect.bisect)
DEMO: bisect_left
haystack ->  1  4  5  6  8 12 15 20 21 23 23 26 29 30
31 @ 14     | | | | | | | | | | | | | |31
30 @ 13     | | | | | | | | | | | | |30
29 @ 12     | | | | | | | | | | | |29
23 @  9     | | | | | | | | |23
22 @  9     | | | | | | | | |22
10 @  5     | | | | |10
 8 @  4     | | | |8
 5 @  2     | |5
 2 @  1     |2
 1 @  0    1
 0 @  0    0
============================================================
DEMO: bisect_right
haystack ->  1  4  5  6  8 12 15 20 21 23 23 26 29 30
31 @ 14     | | | | | | | | | | | | | |31
30 @ 14     | | | | | | | | | | | | | |30
29 @ 13     | | | | | | | | | | | | |29
23 @ 11     | | | | | | | | | | |23
22 @  9     | | | | | | | | |22
10 @  5     | | | | |10
 8 @  5     | | | | |8
 5 @  3     | | |5
 2 @  1     |2
 1 @  1     |1
 0 @  0    0
1.1.4.2 使用 bisect 建立查询表格
def grade(score, breakpoints=[60, 70, 80, 90], grades='FDCBA'):
    i = bisect.bisect(breakpoints, score)
    return grades[i]

log("[grade(score) for score in [33, 99, 77, 70, 89, 90, 100]]",
     [grade(score) for score in [33, 99, 77, 70, 89, 90, 100]])
========== [grade(score) for score in [33, 99, 77, 70, 89, 90, 100]] ===========
['F', 'A', 'C', 'C', 'B', 'A', 'A']
1.1.4.3 使用 bisect.insort 插入新元素
import bisect
import random
SIZE=7
random.seed(1729)
my_list = []
for i in range(SIZE):
    new_item = random.randrange(SIZE*2)
    bisect.insort(my_list, new_item)
    print('%2d ->' % new_item, my_list)
10 -> [10]
 0 -> [0, 10]
 6 -> [0, 6, 10]
 8 -> [0, 6, 8, 10]
 7 -> [0, 6, 7, 8, 10]
 2 -> [0, 2, 6, 7, 8, 10]
10 -> [0, 2, 6, 7, 8, 10, 10]

1.1.5 列表中删除数据的陷阱

lst = ['a', 'b', 'c', '', '']
for i in lst:
    if i is '':
	lst.remove(i)

print(lst)
['a', 'b', 'c', '']
1.1.5.1 原理

list 属于线性表,它的连续在于用一块连续的内存空间存储元素,在调用 remove 时,只是删除了地址内的元素。

回到问题,当删除列表中的元素时, for in 是对下标进行操作, 而 remove 是对值进行操作, 当 for 到达索引为 3 ,即第一个空字符位置时,符合条件即删除。 因为 list 是线性表,所以删除这个空字符时, 同时后面的所有元素自动移动位置 1 , 此时的 lst 等于 ['a', 'b', 'c', ''] ,因为 for in 已经遍历到了索引 3 处,所以循环结束。

1.1.5.2 解决方法
lst = ['a','','b','','c','','']

# 做法一
while '' in lst:
    lst.remove('')
print(lst)


# 做法二

lst_new = [i for i in lst if i != '']

1.1.6 列表中出现频率最高

words = [
   'look', 'into', 'my', 'eyes', 'look', 'into', 'my', 'eyes',
   'the', 'eyes', 'the', 'eyes', 'the', 'eyes', 'not', 'around', 'the',
   'eyes', "don't", 'look', 'around', 'the', 'eyes', 'look', 'into',
   'my', 'eyes', "you're", 'under'
]

from collections import Counter
word_counts = Counter(words)
top_three = word_counts.most_common(3)
print(top_three)
[('eyes', 8), ('the', 5), ('look', 4)]

1.1.7 列表中最大或最小的几项

import heapq

portfolio = [
   {'name': 'IBM', 'shares': 100, 'price': 91.1},
   {'name': 'AAPL', 'shares': 50, 'price': 543.22},
   {'name': 'FB', 'shares': 200, 'price': 21.09},
   {'name': 'HPQ', 'shares': 35, 'price': 31.75},
   {'name': 'YHOO', 'shares': 45, 'price': 16.35},
   {'name': 'ACME', 'shares': 75, 'price': 115.65}
]

cheap = heapq.nsmallest(3, portfolio, key=lambda s: s['price'])
expensive = heapq.nlargest(3, portfolio, key=lambda s: s['price'])
r = {'cheap': cheap, 'expensive': expensive}
print(r)

1.1.8 对列表中的数据分组

rows = [
    {'address': '5412 N CLARK', 'date': '07/01/2012'},
    {'address': '5148 N CLARK', 'date': '07/04/2012'},
    {'address': '5800 E 58TH', 'date': '07/02/2012'},
    {'address': '2122 N CLARK', 'date': '07/03/2012'},
    {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'},
    {'address': '1060 W ADDISON', 'date': '07/02/2012'},
    {'address': '4801 N BROADWAY', 'date': '07/01/2012'},
    {'address': '1039 W GRANVILLE', 'date': '07/04/2012'},
]

from itertools import groupby

rows.sort(key=lambda r: r['date'])
for date, items in groupby(rows, key=lambda r: r['date']):
    print(date)
    for i in items:
	print('    ', i)
07/01/2012
     {'address': '5412 N CLARK', 'date': '07/01/2012'}
     {'address': '4801 N BROADWAY', 'date': '07/01/2012'}
07/02/2012
     {'address': '5800 E 58TH', 'date': '07/02/2012'}
     {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'}
     {'address': '1060 W ADDISON', 'date': '07/02/2012'}
07/03/2012
     {'address': '2122 N CLARK', 'date': '07/03/2012'}
07/04/2012
     {'address': '5148 N CLARK', 'date': '07/04/2012'}
     {'address': '1039 W GRANVILLE', 'date': '07/04/2012'}

或者使用 defaultdict 来实现:

from collections import defaultdict
rows_by_date = defaultdict(list)
for row in rows:
    rows_by_date[row['date']].append(row)
print(rows_by_date)
defaultdict(<class 'list'>, {'07/01/2012': [{'address': '5412 N CLARK', 'date': '07/01/2012'}, {'address': '4801 N BROADWAY', 'date': '07/01/2012'}], '07/02/2012': [{'address': '5800 E 58TH', 'date': '07/02/2012'}, {'address': '5645 N RAVENSWOOD', 'date': '07/02/2012'}, {'address': '1060 W ADDISON', 'date': '07/02/2012'}], '07/03/2012': [{'address': '2122 N CLARK', 'date': '07/03/2012'}], '07/04/2012': [{'address': '5148 N CLARK', 'date': '07/04/2012'}, {'address': '1039 W GRANVILLE', 'date': '07/04/2012'}]})

1.1.9 消除序列中的重复数据,同时保持数据顺序

def dedupe(items, key=None):
    seen = set()
    for item in items:
	val = item if key is None else key(item)
	if val not in seen:
	    yield item
	    seen.add(val)


a = [
	{'x': 2, 'y': 3},
	{'x': 1, 'y': 4},
	{'x': 2, 'y': 3},
	{'x': 2, 'y': 3},
	{'x': 10, 'y': 15}
    ]
print(list(dedupe(a, key=lambda a: (a['x'],a['y']))))
[{'x': 2, 'y': 3}, {'x': 1, 'y': 4}, {'x': 10, 'y': 15}]
def dedupe(items):
    seen = set()
    for item in items:
	if item not in seen:
	    yield item
	    seen.add(item)

a = [1, 5, 2, 1, 9, 1, 5, 10]
print(list(dedupe(a)))
[1, 5, 2, 9, 10]

1.1.10 序列解包 (unpack)

records = [
     ('foo', 1, 2),
     ('bar', 'hello'),
     ('foo', 3, 4),
]

def do_foo(x,y):
    print('foo', x, y)

def do_bar(s):
    print('bar', s)

for tag, *args in records:
    if tag == 'foo':
	do_foo(*args)
    elif tag == 'bar':
	do_bar(*args)
foo 1 2
bar hello
foo 3 4

1.1.11 flatten 列表

from collections import Iterable

def flatten(items, ignore_types=(str, bytes)):
    for x in items:
	if isinstance(x, Iterable) and not isinstance(x, ignore_types):
	    yield from flatten(x)
	else:
	    yield x

items = [1, 2, [3, 4, [5, 6], 7], 8]

# Produces 1 2 3 4 5 6 7 8
for x in flatten(items):
    print(x)

items = ['Dave', 'Paula', ['Thomas', 'Lewis']]
for x in flatten(items):
    print(x)

1
2
3
4
5
6
7
8
Dave
Paula
Thomas
Lewis

1.1.12 优先队列

import heapq

class PriorityQueue:
    def __init__(self):
	self._queue = []
	self._index = 0

    def push(self, item, priority):
	heapq.heappush(self._queue, (-priority, self._index, item))
	self._index += 1

    def pop(self):
	return heapq.heappop(self._queue)[-1]

# Example use
class Item:
    def __init__(self, name):
	self.name = name
    def __repr__(self):
	return 'Item({!r})'.format(self.name)

q = PriorityQueue()
q.push(Item('foo'), 1)
q.push(Item('bar'), 5)
q.push(Item('spam'), 4)
q.push(Item('grok'), 1)

print("Should be bar:", q.pop())
print("Should be spam:", q.pop())
print("Should be foo:", q.pop())
print("Should be grok:", q.pop())
Should be bar: Item('bar')
Should be spam: Item('spam')
Should be foo: Item('foo')
Should be grok: Item('grok')

1.2 字典操作

1.2.1 defaultdict

在实例化一个 defaultdict 的时候,需要给构造方法提供一个可调用对象, 这个可调用对象会在 __getitem__ 找不到键的时候被调用,返回默认值。

import collections
d = collections.defaultdict(list)
d['a'].append('b')
d['c'].append('d')
log("d", d)
============= d ==============
defaultdict(<class 'list'>, {'a': ['b'], 'c': ['d']})

1.2.2 OrderedDict

这个类型在添加键的时候会保持顺序,因此键的迭代次序总是一致的。

OrderedDict 的 popitem 方法默认删除并返回字典里最后一个元素, 但是如果调用 popitem(last=False) ,则删除并返回第一个被添加进去的元素。

1.2.3 ChainMap

该类型可以容纳多个不同的映射对象,在进行键查找操作时,会逐个查找这些映射对象,直到键被找到为止。

这个功能在给有嵌套作用域的语言做解释器的时候很有用,可以用一个映射对象来代表一个作用域上下文。

import builtins
from collections import ChainMap
pylookup = ChainMap(locals(), globals(), vars(builtins))

1.2.4 Counter

这个映射类型会给键准备一个整数计数器,每次更新一个键的时候会增加这个计数器。

Counter 实现了 + 和 - 运算符来合并记录。 most_common([n]) 方法会返回最常见的 n 个键和它们的计数。

from collections import Counter
ct = Counter('abracadabra')
print(ct)
ct.update('aaaaazzz')
print('after update'.center(30, '='))
print(ct)
print(ct.most_common(2))
Counter({'a': 5, 'b': 2, 'r': 2, 'c': 1, 'd': 1})
=========after update=========
Counter({'a': 10, 'z': 3, 'b': 2, 'r': 2, 'c': 1, 'd': 1})
[('a', 10), ('z', 3)]

1.2.5 UserDict

这个类其实是把标准 dict 用纯 Python 又实现了一遍,主要的用途是让用户继承写子类的。

更倾向于从 UserDict 而不是从 dict 继承的主要原因是后者有时会在某些方法的实现上走一些捷径, 导致不得不在子类中重写这些方法,但是 UserDict 就不会有这个问题。

继承自 dict 的 __init__ 方法忽略了子类的 __setitem__ 方法:

class MyDict(dict):
    def __setitem__(self, key, value):
	super().__setitem__(key, value*2)

d = MyDict(one=1)
print(d)

d['two'] = 2
print(d)
{'one': 1}
{'one': 1, 'two': 4}

继承自 dict 的 update 方法也忽略了子类的 __setitem__ 方法:

d.update(three=3)
print(d)
{'one': 1, 'two': 4, 'three': 3}

另外一个值得注意的地方是, UserDict 并不是 dict 的子类 。 UserDict 有一个 data 属性,是 dict 的实例,这个属性实际上是 UserDict 最终存储数据的地方

1.2.6 不可变映射类型(MappingProxyType)

types.MappingProxyType 会返回一个只读的映射视图。 虽然是只读视图,但是它是动态的,如果对原映射做出改动, 通过这个视图可以观察到,但是无法通过这个视图对原映射做出修改。

from types import MappingProxyType
d = {1: 'A'}
d_proxy = MappingProxyType(d)
print(d_proxy)

log("d_proxy[1]", d_proxy[1])

try:
    d_proxy[2] = 'x'
except Exception as e:
    print(e)

d[2] = 'B'
print(d_proxy)
{1: 'A'}
========= d_proxy[1] =========
A
'mappingproxy' object does not support item assignment
{1: 'A', 2: 'B'}

1.2.7 对字典作集合运算

a = {
   'x' : 1,
   'y' : 2,
   'z' : 3
}

b = {
   'w' : 10,
   'x' : 11,
   'y' : 2
}

print('Common keys:', a.keys() & b.keys())
print('Keys in a not in b:', a.keys() - b.keys())
print('(key,value) pairs in common:', a.items() & b.items())
Common keys: {'y', 'x'}
Keys in a not in b: {'z'}
(key,value) pairs in common: {('y', 2)}

1.2.8 组合多个字典当作一个字典使用

a = {'x': 1, 'z': 3 }
b = {'y': 2, 'z': 4 }

# (a) Simple example of combining
from collections import ChainMap
c = ChainMap(a,b)

print("c:", c)

print(c['x'])      # Outputs 1  (from a)
print(c['y'])      # Outputs 2  (from b)
print(c['z'])      # Outputs 3  (from a)

# Output some common values
print('len(c):', len(c))
print('c.keys():', list(c.keys()))
print('c.values():', list(c.values()))

# Modify some values
c['z'] = 10
c['w'] = 40
del c['x']
print("a:", a)
c: ChainMap({'x': 1, 'z': 3}, {'y': 2, 'z': 4})
1
2
3
len(c): 3
c.keys(): ['z', 'y', 'x']
c.values(): [3, 2, 1]
a: {'z': 10, 'w': 40}

1.2.9 字典栈

# Example of stacking mappings (like scopes)
values = ChainMap()
values['x'] = 1

# Add a new mapping
values = values.new_child()
values['x'] = 2

# Add a new mapping
values = values.new_child()
values['x'] = 3

print(values)
print(values['x'])

# Discard last mapping
values = values.parents
print(values)
print(values['x'])

# Discard last mapping
values = values.parents
print(values)
print(values['x'])
ChainMap({'x': 3}, {'x': 2}, {'x': 1})
3
ChainMap({'x': 2}, {'x': 1})
2
ChainMap({'x': 1})
1

1.3 文本操作

1.3.1 获取 Unicode 字符名称(unicodedata)

from unicodedata import name

for i in range(250, 256):
    print(chr(i), ":", name(chr(i), ''))
ú : LATIN SMALL LETTER U WITH ACUTE
û : LATIN SMALL LETTER U WITH CIRCUMFLEX
ü : LATIN SMALL LETTER U WITH DIAERESIS
ý : LATIN SMALL LETTER Y WITH ACUTE
þ : LATIN SMALL LETTER THORN
ÿ : LATIN SMALL LETTER Y WITH DIAERESIS

1.3.2 字符编码侦测(Chardet)

! chardetect ~/test.org
/Users/ruan/test.org: ascii with confidence 1.0

1.3.3 格式化

1.3.3.1 填充与对齐
print('{:>10}'.format('test'))
print('{:10}'.format('test'))
print('{:^10}'.format('test'))
print('{:_<10}'.format('test'))
      test
test
   test
test______
1.3.3.2 字符串截断
print('{:.5}'.format('xylophone'))
print('{:10.5}'.format('xylophone'))
xylop
xylop
1.3.3.3 占位符
data = {'first': 'Hodor', 'last': 'Hodor!'}
print('{first} {last}'.format(**data))

print('{first} {last}'.format(first='Hodor', last='Hodor!'))
Hodor Hodor!
Hodor Hodor!
person = {'first': 'Jean-Luc', 'last': 'Picard'}
data = [4, 8, 15, 16, 23, 42]
class Plant(object):
    category = 'tree'
    kinds = [{'name': 'oak'}, {'name': 'maple'}]

print('{p[first]} {p[last]}'.format(p=person))
print('{d[4]} {d[5]}'.format(d=data))
print('{p.category}: {p.kinds[0][name]}'.format(p=Plant()))
Jean-Luc Picard
23 42
tree: oak
1.3.3.4 排版
import textwrap

s = "Look into my eyes, look into my eyes, the eyes, the eyes, \
the eyes, not around the eyes, don't look around the eyes, \
look into my eyes, you're under."

print(textwrap.fill(s, 70))
print()

print(textwrap.fill(s, 40))
print()

print(textwrap.fill(s, 40, initial_indent='    '))
print()

print(textwrap.fill(s, 40, subsequent_indent='    '))
print()
Look into my eyes, look into my eyes, the eyes, the eyes, the eyes,
not around the eyes, don't look around the eyes, look into my eyes,
you're under.

Look into my eyes, look into my eyes,
the eyes, the eyes, the eyes, not around
the eyes, don't look around the eyes,
look into my eyes, you're under.

    Look into my eyes, look into my
eyes, the eyes, the eyes, the eyes, not
around the eyes, don't look around the
eyes, look into my eyes, you're under.

Look into my eyes, look into my eyes,
    the eyes, the eyes, the eyes, not
    around the eyes, don't look around
    the eyes, look into my eyes, you're
    under.

1.3.4 字符串匹配

1.3.4.1 使用 shell 风格的通配符匹配字符串
from fnmatch import fnmatchcase as match

addresses = [
    '5412 N CLARK ST',
    '1060 W ADDISON ST',
    '1039 W GRANVILLE AVE',
    '2122 N CLARK ST',
    '4802 N BROADWAY',
]

a = [addr for addr in addresses if match(addr, '* ST')]
print(a)

b = [addr for addr in addresses if match(addr, '54[0-9][0-9] *CLARK*')]
print(b)
['5412 N CLARK ST', '1060 W ADDISON ST', '2122 N CLARK ST']
['5412 N CLARK ST']
1.3.4.2 贪婪和非贪婪匹配
import re

# Sample text
text = 'Computer says "no." Phone says "yes."'

# (a) Regex that finds quoted strings - longest match
str_pat = re.compile(r'\"(.*)\"')
print(str_pat.findall(text))

# (b) Regex that finds quoted strings - shortest match
str_pat = re.compile(r'\"(.*?)\"')
print(str_pat.findall(text))
['no." Phone says "yes.']
['no.', 'yes.']

1.3.5 输入密码

import getpass

user = getpass.getuser()
passwd = getpass.getpass()

print('User:', user)
print('Passwd:', passwd)

1.4 字节操作

1.4.1 将字节序列转换成不同类型字段组成的元组(struct)

struct 模块能处理 bytes, bytearraymemoryview 对象。

import struct
fmt = '<1s3sHHH'
with open('img/p3_hash.png', 'rb') as fp:
    img = memoryview(fp.read())

header = img[:10]
log("bytes(header)", bytes(header))
log("struct.unpack(fmt, header)", struct.unpack(fmt, header))
================================ bytes(header) =================================
b'\x89PNG\r\n\x1a\n\x00\x00'
========================== struct.unpack(fmt, header) ==========================
(b'\x89', b'PNG', 2573, 2586, 0)

1.5 时间操作

1.5.1 time 模块

time 模块始终返回 UTC 时间。

1.5.1.1 获取 Unix Timestamp

即从 Epoch (1970年1月1日00:00:00 UTC) 开始所经过的秒数。

import time
print(time.time())
1542554851.5957272
1.5.1.2 获取具体时间值
current_time = time.time()
current_struct_time = time.gmtime(current_time)
print(current_struct_time)
time.struct_time(tm_year=2018, tm_mon=11, tm_mday=18, tm_hour=15, tm_min=27, tm_sec=37, tm_wday=6, tm_yday=322, tm_isdst=0)
current_year = current_struct_time.tm_year
current_mon  = current_struct_time.tm_mon
current_mday = current_struct_time.tm_mday
current_hour = current_struct_time.tm_hour
current_min  = current_struct_time.tm_min
r = (current_year, current_mon, current_mday, current_hour, current_min)
print(r)
(2018, 11, 18, 15, 27)

1.5.2 datatime 模块

datetime 模块简化了日期操作,如增加天数,设置时区等。

1.5.2.1 创建时间
import datetime
d = datetime.datetime(year=2017, month=12, day=31, hour=12, minute=59, second=59)
r = (d.year, d.month, d.day, d.hour, d.minute, d.second, d.microsecond)
print(r)
(2017, 12, 31, 12, 59, 59, 0)
1.5.2.2 获取 UTC 时间
print(datetime.datetime.utcnow())
2018-11-18 15:27:48.018708
1.5.2.3 获取当前时区时间
print(datetime.datetime.now())
2018-11-18 23:27:54.145839
1.5.2.4 日期运算
today = datetime.datetime.now()
diff = datetime.timedelta(weeks=3, days=2)
future = today + diff
past = today - diff
print((future, past))
(datetime.datetime(2018, 12, 11, 23, 28, 0, 332108), datetime.datetime(2018, 10, 26, 23, 28, 0, 332108))
1.5.2.5 日期转字符串
r = '{:%Y-%m-%d %H:%M}'.format(datetime.datetime(2001, 2, 3, 4, 5))
print(r)
2001-02-03 04:05
1.5.2.6 字符串转日期
r = datetime.datetime.strptime("Mar 03, 2010", "%b %d, %Y")
print(r)
2010-03-03 00:00:00

1.5.3 获取月份缩写

from calendar import month_abbr
print(list(month_abbr))
['', 'Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

1.6 数字操作

1.6.1 数字格式化

print('{:06.2f}'.format(3.141592653589793))
print('{:04d}'.format(42))
print('{:+d}'.format(42))
print('{: d}'.format(42))
print('{: d}'.format(-42))
print('{:=5d}'.format((- 23)))
print('{:=+5d}'.format((23)))
003.14
0042
+42
 42
-42
-  23
+  23

2 函数

2.1 获取函数签名信息(inspect.signature)

from inspect import signature

def foo(a, b=1, **c):
    pass

print(signature(foo))
print()
for name, param in sig.parameters.items():
    print(param.kind, ':', name, '=', param.default)
(a, b=1, **c)

POSITIONAL_OR_KEYWORD : a = <class 'inspect._empty'>
POSITIONAL_OR_KEYWORD : b = 1
VAR_KEYWORD : c = <class 'inspect._empty'>

inspect.signature 返回一个 inspect.Signature 对象, 它有一个 parameters 属性,这是一个有序映射,将参数名和 inspect.Parameter 对象对应起来。 Parameter 对象的属性有 name, default, kind

inspect._empty 表示没有默认值。(这里不能使用 None ,是因为 None 可以作为默认值)

2.1.1 绑定参数到函数签名中(inspect.Signature.bind)

inspect.Signature 对象有个 bind 方法,可以把任意个参数绑定到签名中的形参上, 所用的规则与实参到形参的匹配方式一样。 在框架中可以使用这个方法 在真正调用函数前验证参数

import inspect
def foo(a, b=1, **c):
    pass

args = {'a': 1, 'b': 2, 'x': '1', 'y': '2'}
sig = inspect.signature(foo)
bound_args = sig.bind(**args)
log("bound_args", bound_args)
print()
for name, value in bound_args.arguments.items():
    print(name, '=', value)

print()
del args['a']
try:
    bound_args = sig.bind(**args)
except Exception as e:
    print(e)


================================== bound_args ==================================
<BoundArguments (a=1, b=2, c={'x': '1', 'y': '2'})>

a = 1
b = 2
c = {'x': '1', 'y': '2'}

missing a required argument: 'a'

2.2 函数式编程风格(operator)

operator 模块以函数的形式提供了 Python 全部的中缀运算符, 从而避免编写类似 lambda a, b: a * b 这种平凡的匿名函数。

2.2.1 数学运算

import functools
import operator
log("functools.reduce(operator.xor, range(6))", functools.reduce(operator.xor, range(6)))
log("functools.reduce(operator.mul, range(1, 7))", functools.reduce(operator.mul, range(1, 7)))
=================== functools.reduce(operator.xor, range(6)) ===================
1
================= functools.reduce(operator.mul, range(1, 7)) ==================
720

2.2.2 从序列中取出元素(itemgetter)

from operator import itemgetter

data = [
    ('c', 2),
    ('b', 3),
    ('a', 1)
]

log("sorted(data, key=itemgetter(0))", sorted(data, key=itemgetter(0)))
======================= sorted(data, key=itemgetter(0)) ========================
[('a', 1), ('b', 3), ('c', 2)]

如果把多个参数传给 itemgetter ,它构建的函数会返回提取的值构成的元组:

log("[itemgetter(1, 0)(t) for t in data]", [itemgetter(1, 0)(t) for t in data])
===================== [itemgetter(1, 0)(t) for t in data] ======================
[(2, 'c'), (3, 'b'), (1, 'a')]

2.2.3 读取对象属性(attrgetter)

attrgetteritemgetter 作用类似,它创建的函数根据名称提取对象的属性。 如果把多个属性名传给 attrgetter ,它也会返回提取的值构成的元组。

此外,如果参数名中包含 .attrgetter 会深入嵌套对象,获取指定的属性。

metro_data = [
    ('Tokyo', 'JP', 36.933, (35.689722, 139.691667)),
    ('Delhi NCR', 'IN', 21.935, (28.613889, 77.208889)),
    ('Mexico City', 'MX', 20.142, (19.433333, -99.133333)),
    ('New York-Newark', 'US', 20.104, (40.808611, -74.020386)),
    ('Sao Paulo', 'BR', 19.649, (-23.547778, -46.635833))
]

from collections import namedtuple
LatLong = namedtuple('LatLong', 'lat long')
Metropolis = namedtuple('Metropolis', 'name cc pop coord')
metro_areas = [Metropolis(name, cc, pop, LatLong(lat, long))
	       for name, cc, pop, (lat, long) in metro_data]


log("metro_areas[0]", metro_areas[0])
name_lat = attrgetter('name', 'coord.lat')
log("name_lat(metro_areas[0])", name_lat(metro_areas[0]))
print()

from operator import attrgetter
for city in sorted(metro_areas, key=attrgetter('coord.lat')):
    print(name_lat(city))
================================ metro_areas[0] ================================
Metropolis(name='Tokyo', cc='JP', pop=36.933, coord=LatLong(lat=35.689722, long=139.691667))
=========================== name_lat(metro_areas[0]) ===========================
('Tokyo', 35.689722)

('Sao Paulo', -23.547778)
('Mexico City', 19.433333)
('Delhi NCR', 28.613889)
('Tokyo', 35.689722)
('New York-Newark', 40.808611)

2.2.4 调用对象方法(methodcall)

from operator import methodcaller
s = "hello world"
log("methodcaller('upper')(s)", methodcaller('upper')(s))
log("methodcaller('replace', ' ', '-')(s)", methodcaller('replace', ' ', '-')(s))
=========================== methodcaller('upper')(s) ===========================
HELLO WORLD
===================== methodcaller('replace', ' ', '-')(s) =====================
hello-world

2.3 使用装饰器

2.3.1 记录函数运行时间

import functools
import time

def clock(func):
    @functools.wraps(func)
    def wrapper(*args, **kw):
	t0 = time.perf_counter()
	result = func(*args, **kw)
	elapsed = time.perf_counter() - t0
	name = func.__name__
	arg_lst = []
	if args:
	    arg_lst.append(', '.join(repr(arg) for arg in args))
	if kw:
	    pairs = ['{}={}'.format(k, w) for k, w in sorted(kw.items())]
	    arg_lst.append(', '.join(pairs))
	arg_str = ', '.join(arg_lst)
	print("[{:0.8f}] {}({}) -> {}".format(elapsed, name, arg_str, result))
	return result
    return wrapper

@clock
def snooze(t):
    time.sleep(t)

snooze(0.1)
snooze(0.2)
snooze(0.3)
[0.10157284] snooze(0.1) -> None
[0.20283653] snooze(0.2) -> None
[0.30416814] snooze(0.3) -> None

2.3.2 缓存函数返回值(functools.lru_cache)

它把耗时的函数的结果缓存起来,避免传入相同的参数时重复计算,可以使用两个可选的参数来配置:

  • maxsize

    指定存储多少个调用的结果。缓存满了之后,旧的结果会被删除,腾出空间。

为了得到最佳性能,=maxsize= 应设为 2 的幂。

  • typed

    如果设为 True ,把不同参数类型得到的结果分开保存, 即把通常认为相等的浮点数和整数参数(如 1 和 1.0)区分开。

因为 lru_cache 使用字典存储结果,而且键根据调用时传入的定位参数和关键字参数创建, 因此被 lru_cache 装饰的函数, 它的所有参数必须是可散列的

@clock
def fib(n):
    if n < 2: return n
    return fib(n-2) + fib(n-1)

print("result: ", fib(6))
[0.00000045] fib(0) -> 0
[0.00000053] fib(1) -> 1
[0.00029566] fib(2) -> 1
[0.00000040] fib(1) -> 1
[0.00000035] fib(0) -> 0
[0.00000041] fib(1) -> 1
[0.00005358] fib(2) -> 1
[0.00010595] fib(3) -> 2
[0.00048775] fib(4) -> 3
[0.00000033] fib(1) -> 1
[0.00000032] fib(0) -> 0
[0.00000110] fib(1) -> 1
[0.00010550] fib(2) -> 1
[0.00015888] fib(3) -> 2
[0.00000056] fib(0) -> 0
[0.00000050] fib(1) -> 1
[0.00012996] fib(2) -> 1
[0.00000042] fib(1) -> 1
[0.00000048] fib(0) -> 0
[0.00000041] fib(1) -> 1
[0.00005725] fib(2) -> 1
[0.00025901] fib(3) -> 2
[0.00050781] fib(4) -> 3
[0.00088358] fib(5) -> 5
[0.00145318] fib(6) -> 8
result:  8
import functools

@functools.lru_cache()
@clock
def fib(n):
    if n < 2:
	return n
    return fib(n-2) + fib(n-1)

print("result:", fib(6))
[0.00000047] fib(0) -> 0
[0.00000077] fib(1) -> 1
[0.00021293] fib(2) -> 1
[0.00000149] fib(3) -> 2
[0.00030547] fib(4) -> 3
[0.00000166] fib(5) -> 5
[0.00044729] fib(6) -> 8
result: 8

2.3.3 单分派泛函(functools.singledispatch)

使用 singledispatch 装饰的普通函数会变成分派泛函: 根据 第一个参数的类型 ,执行具体的操作。 (正是因为根据第一个参数的类型,所以称为单分派, 若根据多个参数选择 专门的函数 ,那就是多分派了)

专门的函数 应该处理抽象基类 ,如 numbers.Integral, abc.MutableSequence , 不要处理具体实现,如 int, list 。这样,代码支持的兼容类型才能更广泛。

from functools import singledispatch
from collections import abc
import numbers
import html

@singledispatch
def htmlize(obj):
    content = html.escape(repr(obj))
    return '<pre>{}</pre>'.format(content)

@htmlize.register(str)
def _(text):
    content = html.escape(text).replace('\n', '<br>\n')
    return '<p>{0}</p>'.format(content)

@htmlize.register(numbers.Integral)
def _(n):
    return '<pre>{0} (0x{0:x})</pre>'.format(n)

@htmlize.register(tuple)
@htmlize.register(abc.MutableSequence)
def _(seq):
    inner = '</li>\n<li>'.join(htmlize(item) for item in seq)
    return '<ul>\n<li>' + inner + '</li>\n</ul>'


log("htmlize({1, 2, 3})", htmlize({1, 2, 3}))
log("htmlize(abs)", htmlize(abs))
log("htmlize('helloworld')", htmlize('helloworld'))
log("htmlize(42)", htmlize(42))
log("htmlize([1, 2, 3])", htmlize([1, 2, 3]))

============================== htmlize({1, 2, 3}) ==============================
<pre>{1, 2, 3}</pre>
================================= htmlize(abs) =================================
<pre>&lt;built-in function abs&gt;</pre>
============================ htmlize('helloworld') =============================
<p>helloworld</p>
================================= htmlize(42) ==================================
<pre>42 (0x2a)</pre>
============================== htmlize([1, 2, 3]) ==============================
<ul>
<li><pre>1 (0x1)</pre></li>
<li><pre>2 (0x2)</pre></li>
<li><pre>3 (0x3)</pre></li>
</ul>

3 IO 编程

3.1 文件 IO

3.1.1 将文件描述符包装成文件对象

from socket import socket, AF_INET, SOCK_STREAM

def echo_client(client_sock, addr):
    print("Got connection from", addr)

    # Make text-mode file wrappers for socket reading/writing
    client_in = open(client_sock.fileno(), 'rt', encoding='latin-1', closefd=False)
    client_out = open(client_sock.fileno(), 'wt', encoding='latin-1', closefd=False)

    # Echo lines back to the client using file I/O
    for line in client_in:
	client_out.write(line)
	client_out.flush()
    client_sock.close()

def echo_server(address):
    sock = socket(AF_INET, SOCK_STREAM)
    sock.bind(address)
    sock.listen(1)
    while True:
	client, addr = sock.accept()
	echo_client(client, addr)

print('Echo serving running on localhost:25000')
echo_server(('', 25000))

3.1.2 改变已打开文件的编码方式

# Example of adding a text encoding to existing file-like object

import urllib.request
import io

u = urllib.request.urlopen('http://www.python.org')
f = io.TextIOWrapper(u, encoding='utf-8')
text = f.read()

print(text)

3.2 内存 IO

3.2.1 内存字符串

from io import StringIO
f = StringIO()
f.write('hello')
f.write(' ')
f.write('world!')
print(f.getvalue())
f = StringIO('Hello!\nHi!\nGoodbye!')
while True:
    s = f.readline()
    if s == '':  # eof
	break
    print(s, end='')

3.2.2 内存比特流

from io import BytesIO
f = BytesIO()
f.write('中文'.encode('utf-8'))
print(f.getvalue())
f = BytesIO(b'\xe4\xb8\xad\xe6\x96\x87')
r = f.read().decode('utf-8')
print(r)

4 面向对象编程

4.1 为实例属性提供默认值

类属性可用于为实例属性提供默认值:

class A:
    value = 'default value'
    pass

a = A()
print(a.value)
default value

4.2 创建缓存对象

class Spam:
    def __init__(self, name):
	self.name = name

# Caching support
import weakref
_spam_cache = weakref.WeakValueDictionary()

def get_spam(name):
    if name not in _spam_cache:
	s = Spam(name)
	_spam_cache[name] = s
    else:
	s = _spam_cache[name]
    return s

if __name__ == '__main__':
    a = get_spam('foo')
    b = get_spam('bar')
    print('a is b:', a is b)
    c = get_spam('foo')
    print('a is c:', a is c)
a is b: False
a is c: True
import weakref

class CachedSpamManager:
    def __init__(self):
	self._cache = weakref.WeakValueDictionary()
    def get_spam(self, name):
	if name not in self._cache:
	    s = Spam(name)
	    self._cache[name] = s
	else:
	    s = self._cache[name]
	return s

class Spam:
    def __init__(self, name):
	self.name = name

Spam.manager = CachedSpamManager()

def get_spam(name):
    return Spam.manager.get_spam(name)

if __name__ == '__main__':
    a = get_spam('foo')
    b = get_spam('bar')
    print('a is b:', a is b)
    c = get_spam('foo')
    print('a is c:', a is c)
a is b: False
a is c: True
# Example involving new and some of its problems

import weakref

class Spam:
    _spam_cache = weakref.WeakValueDictionary()
    def __new__(cls, name):
	if name in cls._spam_cache:
	    return cls._spam_cache[name]
	else:
	    self = super().__new__(cls)
	    cls._spam_cache[name] = self
	    return self

    def __init__(self, name):
	print('Initializing Spam')
	self.name = name

if __name__ == '__main__':
    print("This should print 'Initializing Spam' twice")
    s = Spam('Dave')
    t = Spam('Dave')
    print(s is t)
This should print 'Initializing Spam' twice
Initializing Spam
Initializing Spam
True

4.3 使用抽象类

from abc import ABCMeta, abstractmethod

class A(metaclass=ABCMeta):
    @property
    @abstractmethod
    def name(self):
	pass

    @name.setter
    @abstractmethod
    def name(self, value):
	pass

    @classmethod
    @abstractmethod
    def method1(cls):
	pass

    @staticmethod
    @abstractmethod
    def method2():
	pass

4.4 处理 Json 数据

4.4.1 使用 OrderedDict 处理 Json 数据

import json

# Some JSON encoded text
s = '{"name": "ACME", "shares": 50, "price": 490.1}'

# (a) Turning JSON into an OrderedDict

from collections import OrderedDict
data = json.loads(s, object_pairs_hook=OrderedDict)
print(data)
OrderedDict([('name', 'ACME'), ('shares', 50), ('price', 490.1)])

4.4.2 使用 Json 数据填充对象

class JSONObject:
    def __init__(self, d):
	self.__dict__ = d

data = json.loads(s, object_hook=JSONObject)
print(data.name)
print(data.shares)
print(data.price)
ACME
50
490.1

4.4.3 将对象序列化成 Json 数据

class Point:
    def __init__(self, x, y):
	self.x = x
	self.y = y

def serialize_instance(obj):
    d = { '__classname__' : type(obj).__name__ }
    d.update(vars(obj))
    return d

p = Point(3,4)
s = json.dumps(p, default=serialize_instance)
print(s)
{"__classname__": "Point", "x": 3, "y": 4}

4.4.4 将 Json 数据反序列化成对象

classes = {
    'Point' : Point
}

def unserialize_object(d):
    clsname = d.pop('__classname__', None)
    if clsname:
	cls = classes[clsname]
	obj = cls.__new__(cls)
	for key, value in d.items():
	    setattr(obj, key, value)
	return obj
    else:
	return d

a = json.loads(s, object_hook=unserialize_object)
print(a)
print(a.x)
print(a.y)
<__main__.Point object at 0x11c5f3e48>
3
4

5 流程控制

5.1 归约函数

5.1.1 all(it)

5.1.2 any(it)

5.1.3 max(it, [key], [default])

如果可迭代对象为空,返回 default

可以这样调用:

max(arg1, arg2, ..., [key])

5.1.4 min(it, [key], [default])

如果可迭代对象为空,返回 default

可以这样调用:

min(arg1, arg2, ..., [key])

5.1.5 functools.reduce(func, it, [initial])

5.1.6 sum(it, start=0)

5.2 标准库中的生成器函数

5.2.1 用于过滤的生成器函数

5.2.1.1 itertools.compress(it, selector_it)

并行处理两个可迭代对象:如果 selector_it 中的元素是真值,产出 it 中对应的元素。

import itertools
def vowel(c):
    return c.lower() in 'aeiou'

result = list(itertools.compress('Aardvark', (1, 0, 1, 1, 0, 1)))
print(result)
['A', 'r', 'd', 'a']
5.2.1.2 itertools.dropwhile(predicate, it)

处理 it ,跳过 predicate 计算结果为真值的元素,产出剩下的元素。

print(list(itertools.dropwhile(vowel, 'Aardvark')))
['r', 'd', 'v', 'a', 'r', 'k']
5.2.1.3 builtin.filter(predicate, it)

如果 predicate(item) 返回真值,产出对应的元素,如果 predicate 是 None ,则只产出真值元素。

print(list(filter(vowel, 'Aardvark')))
['A', 'a', 'a']
5.2.1.4 itertools.filterfalse(predicate, it)

如果 predicate(item) 返回假值,产出对应的元素。

5.2.1.5 itertools.islice(it, [start], stop, step=1)

产出 it 的切片,类似于 s[:stop]s[start:stop:step]

print(list(itertools.islice('Aardvark', 4)))
print(list(itertools.islice('Aardvark', 4, 7)))
print(list(itertools.islice('Aardvark', 1, 7, 2)))
['A', 'a', 'r', 'd']
['v', 'a', 'r']
['a', 'd', 'a']
5.2.1.6 itertools.takewhile(predicate, it)

如果 predicate(item) 返回真值,产出对应的元素,然后停止。

print(list(itertools.takewhile(vowel, 'Aardvark')))
['A', 'a']
5.2.1.7 builtin.iter(callable, sentinel)

第一个参数是一个没有参数的可调用对象,用于不断调用,产出各个值;第二个值是哨符,当可调用对象返回这个值时,迭代结束 (不产出哨符) 。

iter 的文档中有个实用的例子,这段代码逐行读取文件,直到遇到空行或到达文件末尾为止:

with open('mydata.txt') as fp:
    for line in iter(fp.readline, ''):
	process_line(line)

5.2.2 用于映射的生成器函数

5.2.2.1 itertools.accumulate(it, [func])

产出累计值,默认为求和;如果提供了 func ,则把前面两个元素传个 func , 然后把计算结果和下一个元素传给它,以此类推,最后产出结果。

from operator import mul
sample = [5, 4, 2, 8, 7, 6, 3, 0, 9, 1]

print(list(itertools.accumulate(sample)))
print(list(itertools.accumulate(sample, min)))
print(list(itertools.accumulate(sample, mul)))

[5, 9, 11, 19, 26, 32, 35, 35, 44, 45]
[5, 4, 2, 2, 2, 2, 2, 0, 0, 0]
[5, 20, 40, 320, 2240, 13440, 40320, 0, 0, 0]
5.2.2.2 builtin.enumerate(it, start=0)

产出有两个元素组成的元组,结构是 (index, item) ,其中 index 从 start 开始计数,item 则从 iterable 中获取。

5.2.2.3 builtin.map(func, it1, [it2, …, itN])

如果传入 N 个 可迭代对象,则 func 必须能接受 N 个参数。

5.2.2.4 itertools.starmap(func, it)

把 it 中各个元素传给 func ,产出结果。

print(list(itertools.starmap(mul, enumerate('albatroz', 1))))
print(list(itertools.starmap(lambda a, b: b/a,
			     enumerate(itertools.accumulate(sample), 1))))
['a', 'll', 'bbb', 'aaaa', 'ttttt', 'rrrrrr', 'ooooooo', 'zzzzzzzz']
[5.0, 4.5, 3.6666666666666665, 4.75, 5.2, 5.333333333333333, 5.0, 4.375, 4.888888888888889, 4.5]

5.2.3 用于合并可迭代对象的生成器函数

5.2.3.1 itertools.chain(it1, …, itN)

无缝连接多个可迭代对象。

print(list(itertools.chain('ABC', range(3))))
['A', 'B', 'C', 0, 1, 2]
5.2.3.2 itertools.chain.from_iterable(it)

产出 it 生成的各个可迭代对象中的元素,无缝连接在一起。

print(list(itertools.chain.from_iterable(enumerate('ABC'))))
[0, 'A', 1, 'B', 2, 'C']
5.2.3.3 itertaools.product(it1, …, itN, repeat=1)

计算笛卡尔积,合并成由 N 个元素组成的元组。 repeat 关键字参数告诉 product 函数重复 N 次处理输入的各个可迭代对象。

print(list(itertools.product('ABC', range(2))))
# list(itertools.product('ABC', 'ABC'))
print(list(itertools.product('ABC', repeat=2)))
[('A', 0), ('A', 1), ('B', 0), ('B', 1), ('C', 0), ('C', 1)]
[('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'B'), ('B', 'C'), ('C', 'A'), ('C', 'B'), ('C', 'C')]
5.2.3.4 builtin.zip(it1, .., itN)

产出由 N 个元素组成的元组,只要有一个可迭代对象到头了,即停止。

print(list(zip('ABC', range(5), [10, 20, 30, 40, 50, 60])))
[('A', 0, 10), ('B', 1, 20), ('C', 2, 30)]
5.2.3.5 itertools.zip_longest(it1, …, itN, fillvalue=None)

产出由 N 个元素组成的元组,等到最长的可迭代对象到头了,即停止。

print(list(itertools.zip_longest('ABC',
				 range(5),
				 [10, 20, 30, 40, 50, 60],
				 fillvalue='?')))
[('A', 0, 10), ('B', 1, 20), ('C', 2, 30), ('?', 3, 40), ('?', 4, 50), ('?', '?', 60)]

5.2.4 用于扩展输出元素的生成器函数

5.2.4.1 itertools.combinations(it, out_len)

把 it 产出的 out_len 个元素组合在一起,然后产出。

print(list(itertools.combinations('ABC', 2)))
[('A', 'B'), ('A', 'C'), ('B', 'C')]
5.2.4.2 itertools.combinations_with_replacement(it, out_len)

把 it 产出的 out_len 个元素组合在一起,然后产出,包含相同元素的组合。

print(list(itertools.combinations_with_replacement('ABC', 2)))
[('A', 'A'), ('A', 'B'), ('A', 'C'), ('B', 'B'), ('B', 'C'), ('C', 'C')]
5.2.4.3 itertools.permutation(it, out_len=None)

把 out_len 个 it 产出元素排列在一起,然后产出这些排列;out_len 的默认值等于 len(list(it))

print(list(itertools.permutations('ABC', 2)))
[('A', 'B'), ('A', 'C'), ('B', 'A'), ('B', 'C'), ('C', 'A'), ('C', 'B')]
5.2.4.4 itertools.count(start=0, step=1)

从 start 开支不断产出数字,按 step 步幅增加。

print(list(itertools.islice(itertools.count(1, .3), 3)))
[1, 1.3, 1.6]
print(list(zip(count(), ['a', 'b', 'c'])))
[(0, 'a'), (1, 'b'), (2, 'c')]
5.2.4.5 itertools.cycle(it)

从 it 中产出元素,存储各个元素的 副本 ,然后按顺序重复不断地产出各个元素。

print(list(itertools.islice(itertools.cycle('ABC'), 7)))
['A', 'B', 'C', 'A', 'B', 'C', 'A']
5.2.4.6 itertools.repeat(item, [times])

不断产出指定元素,除非指定次数。常见用途,为 map 函数提供固定参数:

print(list(map(mul, range(11), itertools.repeat(5))))
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50]

5.2.5 用于重新排列元素的生成器函数

5.2.5.1 itertools.groupby(it, key=None)

产出由两个元素组成的元组,形式为 (key, group) ,其中 key 是分组标准,group 是生成器,用于产出分组里的元素。

注意, itertool.groupby 假定输入的可迭代对象已使用指定的 key 分组了各个元素。

print(list(itertools.groupby('LLLAAGGG')))
print(list(itertools.groupby('LLAALAAGGG')))
[('L', <itertools._grouper object at 0x106a79208>), ('A', <itertools._grouper object at 0x106a791d0>), ('G', <itertools._grouper object at 0x106a79080>)]
[('L', <itertools._grouper object at 0x106a79cf8>), ('A', <itertools._grouper object at 0x106a791d0>), ('L', <itertools._grouper object at 0x106a79080>), ('A', <itertools._grouper object at 0x106a79160>), ('G', <itertools._grouper object at 0x106a79358>)]
animals = ['duck', 'eagle', 'rat', 'giraffe', 'bear', 'bat', 'dolphin', 'shark', 'lion']
animals.sort(key=len)
for length, group in itertools.groupby(animals, len):
    print(length, '->', list(group))
3 -> ['rat', 'bat']
4 -> ['duck', 'bear', 'lion']
5 -> ['eagle', 'shark']
7 -> ['giraffe', 'dolphin']
5.2.5.2 builtin.reversed(seq)

seq 必须是序列,或是实现了 __reversed__ 特殊方法的对象。

5.2.5.3 itertools.tee(it, n=2)

产出一个由 n 个生成器组成的元组,每个生成器用于单独产出输入的可迭代对象中的元素。

print(list(zip(*itertools.tee('ABC'))))
[('A', 'A'), ('B', 'B'), ('C', 'C')]

5.3 上下文管理

5.3.1 自动关闭对象(closing)

如果对象提供了 close() 方法,但没有实现 __enter__/__exit__ 协议,则可以用这个函数构建上下文管理器。

from contextlib import closing

class Door:

    def open(self):
	print("door opened")

    def close(self):
	print("door closed")

with closing(Door()) as door:
    door.open()
door opened
door closed

5.3.2 忽略异常(suppress)

构建忽略指定异常的上下文管理器。

from contextlib import suppress
import os

with suppress(FileNotFoundError):
    os.remove('somefile.tmp')

5.3.3 重定向(redirect_stdout)

import io
from contextlib import redirect_stdout

f = io.StringIO()
with redirect_stdout(f):
    help(pow)

log("f.getvalue()", f.getvalue())
================================= f.getvalue() =================================
Help on built-in function pow in module builtins:

pow(x, y, z=None, /)
    Equivalent to x**y (with two arguments) or x**y % z (with three arguments)

    Some types, such as ints, are able to use a more efficient algorithm when
    invoked using the three argument form.


with open('/tmp/help.txt', 'w') as f:
    with redirect_stdout(f):
	help(pow)

5.3.4 使用生成器作为上下文管理器(@contextmanager)

这个装饰器把简单的生成器函数变成上下文管理器,这样就不用创建类去实现管理器协议了。

在使用 @contextmanager 装饰的生成器中, yield 语句的作用是把函数的定义体分成两部分:

  • yield 语句前面的所有代码在 with 块开始时 (即解释器调用 __enter__ 方法时) 执行
  • yield 语句后面的代码在 with 块结束时 (即调用 __exit__ 方法时) 执行
from contextlib import contextmanager

class Query(object):

    def __init__(self, name):
	self.name = name

    def query(self):
	print('Query info about %s...' % self.name)

@contextmanager
def create_query(name):
    print('Begin')
    with suppress(Exception):
	yield Query(name)  # 绑定到 as 子句的目标变量
    print('End')

with create_query('Bob') as q:
    q.query()
Begin
Query info about Bob...
End
@contextmanager
def tag(name):
    print("<%s>" % name, end='')
    with suppress(Exception):
	yield  # 无需使用 as 子句
    print("</%s>" % name)

with tag("h1"):
    print("hello", end='')
<h1>hello</h1>

本质上,=contextlib.contextmanager= 装饰器会把函数包装成实现了 __enter____exit__ 方法的类 (类的名称是 _GeneraorContextManager) 。

这个类的 __enter__ 方法有如下作用:

  • 调用生成器函数,保存生成器对象 (这里把它成为 gen)
  • 调用 next(gen) ,执行到 yield 关键字所在位置
  • 返回上一步 next(gen) 产出的值,以便把产出的值绑定到 with/as 语句中的目标变量上

with 块终止时, __exit__ 方法会做以下几件事:

  • 检查有没有异常,如果有,调用 gen.throw(ex) ,在生成器函数定义体中包含 yield 关键字的那一行抛出异常
  • 否则,再调用 next(gen) ,继续执行生成器函数定义体中 yield 语句之后的代码

注意:

如果在 with 块中抛出了异常,Python 解释器会将其捕获,然后会在生成器函数中 yield 表达式处再次抛出。

因此使用 @contextmanager 装饰器时,要把 yield 语句放在 try/finally 语句中 (或者放在 with 语句中)。 (因为我们永远不知道使用上下文管理器的用户会在 with 块中做什么)

另外,=@contextmanager= 装饰器提供的 __exit__ 方法 假定发给生成器的所有异常都得到处理了 ,因此应该压制异常。 如果不想让 @contextmanager 压制异常,必须在被装饰的函数中显式重新抛出异常。

6 并发编程

6.1 创建子进程

from multiprocessing import Process
import os

def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()  # 等待子进程结束后再继续往下运行
    print('Child process end.')

6.2 子进程的输入输出

import subprocess

r = subprocess.call(['nslookup', 'www.python.org'])
print('r: {}'.format(r))

p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

# 相当于在命令行执行命令nslookup,然后手动输入:
# set q=mx
# python.org
# exit
r: 0
Server:		64.104.123.144
Address:	64.104.123.144#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:
org	nameserver = d0.org.afilias-nst.org.
org	nameserver = a2.org.afilias-nst.info.
org	nameserver = a0.org.afilias-nst.info.
org	nameserver = b0.org.afilias-nst.org.
org	nameserver = b2.org.afilias-nst.org.
org	nameserver = c0.org.afilias-nst.info.
a0.org.afilias-nst.info	internet address = 199.19.56.1
a2.org.afilias-nst.info	internet address = 199.249.112.1
b0.org.afilias-nst.org	internet address = 199.19.54.1
b2.org.afilias-nst.org	internet address = 199.249.120.1
c0.org.afilias-nst.info	internet address = 199.19.53.1
d0.org.afilias-nst.org	internet address = 199.19.57.1
a0.org.afilias-nst.info	has AAAA address 2001:500:e::1
a2.org.afilias-nst.info	has AAAA address 2001:500:40::1
b0.org.afilias-nst.org	has AAAA address 2001:500:c::1
b2.org.afilias-nst.org	has AAAA address 2001:500:48::1
c0.org.afilias-nst.info	has AAAA address 2001:500:b::1
d0.org.afilias-nst.org	has AAAA address 2001:500:f::1


Exit code: 0

6.3 进程间通信

父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:

from multiprocessing import Process, Queue
import os, time, random

def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
	print('Put %s to queue...' % value)
	q.put(value)
	time.sleep(random.random())

def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
	value = q.get(True)
	print('Get %s from queue.' % value)

q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join()
pr.terminate()  # pr 进程里是死循环,无法等待其结束,只能强行终止:
Process to write: 92549
Put A to queue...
Process to read: 92550
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

6.4 创建线程

import time, threading

def worker():
    print('thread %s is running...' % threading.current_thread().name)
    time.sleep(1)
    print('thread %s ended.' % threading.current_thread().name)

t = threading.Thread(target=worker, name='WorkerThread')
t.start()
t.join()
thread WorkerThread is running...
thread WorkerThread ended.

6.5 线程加锁

import threading
from concurrent.futures import ThreadPoolExecutor

lock = threading.Lock()
count = 0

def run_thread_without_lock():
    global count
    while count < 10:
	count += 1
	print("%s, " % count, end='')

def run_thread_with_lock():
    global count
    with lock:
	while count < 10:
	    count += 1
	    print("%s, " % count, end='')

with ThreadPoolExecutor(max_workers=4) as executor:
    for i in range(5):
	executor.submit(run_thread_without_lock)

count = 0

with ThreadPoolExecutor(max_workers=4) as executor:
    for i in range(5):
	executor.submit(run_thread_with_lock)
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,

6.6 thread local

一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

最常用的地方就是为每个线程绑定一个数据库连接,HTTP 请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

import threading, time

tl = threading.local()

def worker(name):
    tl.name = name
    time.sleep(3)
    print("in thread:", tl.name)

t = threading.Thread(target=worker, args=("hello",), name='Thread-A')
t.start()
tl.name = 'world'
print("in main:", tl.name)
in main: world

6.7 线程池

from concurrent.futures import ThreadPoolExecutor
import urllib.request

def fetch_url(url):
    u = urllib.request.urlopen(url)
    data = u.read()
    return data

pool = ThreadPoolExecutor(10)
# Submit work to the pool
a = pool.submit(fetch_url, 'http://www.python.org')
b = pool.submit(fetch_url, 'http://www.pypy.org')

# Get the results back
x = a.result()
y = b.result()
# Out[1]:

7 元编程

7.1 控制类的属性定义的顺序

import collections

class EntityMeta(type):

    @classmethod
    def __prepare__(cls, name, bases):
	return collections.OrderedDict()

7.2 控制类的创建

7.2.1 实例缓存

# Cached instances

import weakref

class Cached(type):
    def __init__(self, *args, **kwargs):
	super().__init__(*args, **kwargs)
	self.__cache = weakref.WeakValueDictionary()

    def __call__(self, *args):
	if args in self.__cache:
	    return self.__cache[args]
	else:
	    obj = super().__call__(*args)
	    self.__cache[args] = obj
	    return obj

class Spam(metaclass=Cached):
    def __init__(self, name):
	print('Creating Spam({!r})'.format(name))
	self.name = name

if __name__ == '__main__':
    a = Spam('foo')
    b = Spam('bar')
    print('a is b:', a is b)
    c = Spam('foo')
    print('a is c:', a is c)
Creating Spam('foo')
Creating Spam('bar')
a is b: False
a is c: True

7.2.2 单例

# Singleton

class Singleton(type):
    def __init__(self, *args, **kwargs):
	self.__instance = None
	super().__init__(*args, **kwargs)

    def __call__(self, *args, **kwargs):
	if self.__instance is None:
	    self.__instance = super().__call__(*args, **kwargs)
	    return self.__instance
	else:
	    return self.__instance

class Spam(metaclass=Singleton):
    def __init__(self):
	print('Creating Spam')

if __name__ == '__main__':
    a = Spam()
    b = Spam()
    print(a is b)
Creating Spam
True

7.2.3 禁用直接实例化

# Not allowing direct instantiation

class NoInstances(type):
    def __call__(self, *args, **kwargs):
	raise TypeError("Can't instantiate directly")

class Spam(metaclass=NoInstances):
    @staticmethod
    def grok(x):
	print('Spam.grok')

if __name__ == '__main__':
    try:
	s = Spam()
    except TypeError as e:
	print(e)

    Spam.grok(42)
Can't instantiate directly
Spam.grok

7.3 不通过__init__方法创建实例

from time import localtime

class Date:
    def __init__(self, year, month, day):
	self.year = year
	self.month = month
	self.day = day

    # Class method that bypasses __init__
    @classmethod
    def today(cls):
	d = cls.__new__(cls)
	t = localtime()
	d.year = t.tm_year
	d.month = t.tm_mon
	d.day = t.tm_mday
	return d

d = Date.__new__(Date)
print(d)
print(hasattr(d,'year'))

data = {
    'year' : 2012,
    'month' : 8,
    'day' : 29
}

d.__dict__.update(data)
print(d.year)
print(d.month)

d = Date.today()
print(d.year, d.month, d.day)
<__main__.Date object at 0x1085e1588>
False
2012
8
2018 6 13

7.4 动态定义类

# Example of making a class manually from parts

# Methods
def __init__(self, name, shares, price):
    self.name = name
    self.shares = shares
    self.price = price

def cost(self):
    return self.shares * self.price

cls_dict = {
    '__init__' : __init__,
    'cost' : cost,
}

# Make a class
import types

Stock = types.new_class('Stock', (), {}, lambda ns: ns.update(cls_dict))

if __name__ == '__main__':
    s = Stock('ACME', 50, 91.1)
    print(s)
    print(s.cost())
<types.Stock object at 0x1085e1518>
4555.0
# An alternative formulation of namedtuples

import operator
import types
import sys

def named_tuple(classname, fieldnames):
    # Populate a dictionary of field property accessors
    cls_dict = { name: property(operator.itemgetter(n))
		 for n, name in enumerate(fieldnames) }

    # Make a __new__ function and add to the class dict
    def __new__(cls, *args):
	if len(args) != len(fieldnames):
	    raise TypeError('Expected {} arguments'.format(len(fieldnames)))
	return tuple.__new__(cls, (args))

    cls_dict['__new__'] = __new__

    # Make the class
    cls = types.new_class(classname, (tuple,), {},
			   lambda ns: ns.update(cls_dict))
    cls.__module__ = sys._getframe(1).f_globals['__name__']
    return cls

if __name__ == '__main__':
    Point = named_tuple('Point', ['x', 'y'])
    print(Point)
    p = Point(4, 5)
    print(len(p))
    print(p.x, p[0])
    print(p.y, p[1])
    try:
	p.x = 2
    except AttributeError as e:
	print(e)
    print('%s %s' % p)
<class '__main__.Point'>
2
4 4
5 5
can't set attribute
4 5

7.5 猴子补丁

import importlib
import sys
from collections import defaultdict

_post_import_hooks = defaultdict(list)

class PostImportFinder:
    def __init__(self):
	self._skip = set()

    def find_module(self, fullname, path=None):
	if fullname in self._skip:
	    return None
	self._skip.add(fullname)
	return PostImportLoader(self)

class PostImportLoader:
    def __init__(self, finder):
	self._finder = finder

    def load_module(self, fullname):
	importlib.import_module(fullname)
	module = sys.modules[fullname]
	for func in _post_import_hooks[fullname]:
	    func(module)
	self._finder._skip.remove(fullname)
	return module

def when_imported(fullname):
    def decorate(func):
	if fullname in sys.modules:
	    func(sys.modules[fullname])
	else:
	    _post_import_hooks[fullname].append(func)
	return func
    return decorate

sys.meta_path.insert(0, PostImportFinder())
@when_imported('threading')
def warn_threads(mod):
    print('Threads? Are you crazy?')

import threading

Author: Hao Ruan (ruanhao1116@gmail.com)

Created: 2018-02-22 Thu 00:00

Updated: 2021-08-27 Fri 11:05

Emacs 27.1 (Org mode 9.3)