Fluent Python 笔记 —— 装饰器和闭包

装饰器

函数装饰器用于在源码中“标记”函数,以某种方式增强函数的行为。它是一种以另一个函数(被装饰的函数)为参数的可调用对象,可能会处理被装饰的函数并将其返回,或者将其替换为另一个函数。

装饰器严格来说只是语法糖。假如有个名为 decorate 的装饰器:

1
2
3
@decorate
def target():
print('running target()')

上述代码效果等同于如下写法:

1
2
3
4
def target():
print('running target()')

target = decorate(target)

即原来的 target 函数会被替换为 decorate(target) 返回的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> def deco(func):
... def inner():
... print('running inner()')
... return inner
...
>>> @deco
... def target():
... print('running target()')
...
>>> target()
running inner()
>>> target
<function deco.<locals>.inner at 0x7ff01bad9a60>

如上述代码,deco 返回 inner 函数对象,使用 deco 装饰 target,调用被装饰的 target 实际会运行 inner。target 对象变为 inner 的引用。

装饰器有如下两大特性:

  • 能把被装饰的函数替换成其他函数
  • 装饰器在加载模块时立即执行

装饰器何时执行

装饰器会在被装饰的函数定义之后立即运行,这通常是在 Python 加载模块时。

参考如下 registration.py 模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
registry = []

def register(func):
print(f'running register({func})')
registry.append(func)
return func

@register
def f1():
print('running f1()')

@register
def f2():
print('running f2()')

def f3():
print('running f3()')

def main():
print('running main()')
print('registry ->', registry)
f1()
f2()
f3()

if __name__ == '__main__':
main()

运行后输出如下:

1
2
3
4
5
6
7
running register(<function f1 at 0x7fbc852d43a0>)
running register(<function f2 at 0x7fbc852d4430>)
running main()
registry -> [<function f1 at 0x7fbc852d43a0>, <function f2 at 0x7fbc852d4430>]
running f1()
running f2()
running f3()

Python 加载模块后,装饰器 register 会在其他函数之前运行,将被装饰的函数(f1 和 f2)的引用添加到 registry 列表中。原本的函数 f1 和 f2,以及未被装饰的 f3,则只在 main 明确调用它们时才执行。

如果导入 registration.py 模块(不作为脚本运行),输出如下:

1
2
3
>>> import registration
running register(<function f1 at 0x7f8fbd8c3b80>)
running register(<function f2 at 0x7f8fbd8c3c10>)

函数装饰器在导入模块时立即执行,而被装饰的函数只在明确调用时执行。突出了导入时和运行时之间的区别。

变量作用域规则

测试如下函数,它读取两个变量的值,一个是局部变量 a,是函数的参数;另一个是未被定义的变量 b:

1
2
3
4
5
6
7
8
9
10
>>> def f1(a):
... print(a)
... print(b)
...
>>> f1(3)
3
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 3, in f1
NameError: name 'b' is not defined

运行时变量 a 的值正常输出,接着报出 name ‘b’ is not defined。

若先给全局变量 b 赋值,再调用 f1 函数,就不会报错:

1
2
3
4
>>> b = 6
>>> f1(3)
3
6

但如下代码的结果可能会让人意想不到:

1
2
3
4
5
6
7
8
9
10
11
12
>>> b=6
>>> def f2(a):
... print(a)
... print(b)
... b = 9
...
>>> f2(3)
3
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 3, in f2
UnboundLocalError: local variable 'b' referenced before assignment

代码运行后首先输出了 3(print(a)),但是第二个语句 print(b) 执行报错。按照直觉第二个 print 语句应该输出 6,因为全局变量 b 已经在函数执行之前赋值,局部变量 b 的赋值动作也是在 print 语句后面。

事实上是,Python 在编译函数定义体时,会判断 b 是局部变量,Python 会尝试从本地环境获取 b。调用 f2(3) 时,f2 的定义体尝试获取局部变量 b 的值,发现 b 没有绑定后报错。

如果在函数内部赋值时想让解释器把 b 当成全局变量,需要使用 global 声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
>>> b = 6
>>> def f3(a):
... global b
... print(a)
... print(b)
... b = 9
...
>>> f3(3)
3
6
>>> b
9
>>> f3(3)
3
9
>>> b = 30
>>> b
30

闭包

闭包指延伸了作用域的函数,其中包含函数定义体中引用、不在定义体中定义的非全局变量。

计算移动平均值(不断增加的系列值的均值)的类:

1
2
3
4
5
6
7
8
9
# average_oo.py
class Averager:
def __init__(self):
self.series = []

def __call__(self, new_value):
self.series.append(new_value)
total = sum(self.series)
return total/len(self.series)

效果如下:

1
2
3
4
5
6
7
8
>>> from average_oo import Averager
>>> avg = Averager()
>>> avg(10)
10.0
>>> avg(11)
10.5
>>> avg(12)
11.0

以下代码是同样功能的函数式实现:

1
2
3
4
5
6
7
8
9
def make_averager():
series = []

def averager(new_value):
series.append(new_value)
total = sum(series)
return total / len(series)

return averager

1
2
3
4
5
6
7
8
>>> from average import make_averager
>>> avg = make_averager()
>>> avg(10)
10.0
>>> avg(11)
10.5
>>> avg(12)
11.0

第一个例子中,Averager 类的实例 avg 存储历史值的位置很明显:通过 self.series 实例属性。
第二个例子中,seriesmake_averager 函数的局部变量,但调用 avg(10) 时,make_averager 函数已经返回,它的本地作用域也就不存在了。

在 averager 函数中,series 是自由变量(free variable),指未在本地作用域中绑定的变量。
closure

闭包是一种函数,它会保留定义函数时存在的自由变量的绑定。这样在调用函数时,即便定义作用域不可用了,通过闭包仍能使用那些绑定。

nolocal

前面实现 make_averager 函数的方式效率并不高,把所有值存储在历史列表中,在每次调用 averager 时使用 sum 求和。更好的实现方式是,只存储目前的总和以及元素个数,只使用这两个值计算均值。

1
2
3
4
5
6
7
8
9
10
11
# average2.py
def make_averager():
count = 0
total = 0

def averager(new_value):
count += 1
total += new_value
return total / count

return averager

上述代码运行后会报出如下错误:

1
2
3
4
5
6
7
8
>>> from average2 import make_averager
>>> avg = make_averager()
>>> avg(10)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/home/starky/program/python/algorithm/average2.py", line 6, in averager
count += 1
UnboundLocalError: local variable 'count' referenced before assignment

原因在于,当 count 是数字或其他任何不可变类型时,count += 1 的作用等同于 count = count + 1。导致在 averager 的定义体中为 count 赋值了,将 count 变成了局部变量。total 变量也是如此。
之前的 series 变量没有出现此问题,原因是只调用了 series.append,列表作为可变对象,并不存在重新赋值的情况。

Python 3 中引入了 nolocal 声明,其作用是把变量标记为自由变量。为 nolocal 声明的变量赋予新值后,闭包中保存的绑定也会更新。

1
2
3
4
5
6
7
8
9
10
11
def make_averager():
count = 0
total = 0

def averager(new_value):
nonlocal count, total
count += 1
total += new_value
return total / count

return averager

实现一个简单的装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time

def clock(func):
def clocked(*args):
t0 = time.perf_counter()
result = func(*args)
elapsed = time.perf_counter() - t0
name = func.__name__
arg_str = ', '.join(repr(arg) for arg in args)
print('[%0.8fs] %s(%s) -> %r' % (elapsed, name, arg_str, result))
return result
return clocked

@clock
def snooze(seconds):
time.sleep(seconds)

@clock
def factorial(n):
return 1 if n < 2 else n * factorial(n - 1)

if __name__ == '__main__':
print('*' * 40, 'Calling snooze(.123)')
snooze(.123)
print('*' * 40, 'Calling factorial(6)')
print('6! =', factorial(6))

执行结果如下:

1
2
3
4
5
6
7
8
9
10
**************************************** Calling snooze(.123)
[0.12318703s] snooze(0.123) -> None
**************************************** Calling factorial(6)
[0.00000168s] factorial(1) -> 1
[0.00003647s] factorial(2) -> 2
[0.00006038s] factorial(3) -> 6
[0.00008216s] factorial(4) -> 24
[0.00010411s] factorial(5) -> 120
[0.00012920s] factorial(6) -> 720
6! = 720

在上述代码中:

1
2
3
@clock
def factorial(n):
return 1 if n < 2 else n * factorial(n - 1)

等同于:

1
2
3
def factorial(n):
return 1 if n < 2 else n * factorial(n - 1)
factorial = clock(factorial)

factorial 会作为 func 参数传递给 clock,返回 clocked 函数。Python 解释器在背后会把 clocked 赋值给 factorial。此后,每次调用 factorial(n),实际执行的都是 clocked(n)。总体步骤如下:

  • 记录初始时间 t0
  • 调用原来的 factorial 函数,保存结果
  • 计算执行的时间
  • 格式化收集到的数据
  • 返回第二步保存的结果

以上即装饰器的典型行为:将被装饰的函数替换为新函数,二者接收同样的参数,(通常)返回被装饰函数本该返回的值,并做些额外的操作。

参数化装饰器

参数化的注册装饰器

为了便于启用或禁用 register 的函数注册功能,可以为其提供一个可选的 active 参数,设为 False 时,不注册被装饰的函数。
从概念上讲,这个新的 register 函数不是装饰器,而是装饰器工厂函数,用来返回真正的装饰器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# registration_param.py
registry = set()
def register(active=True):
def decorate(func):
print('running register(active=%s)->decorate(%s)'
% (active, func))
if active:
registry.add(func)
else:
registry.discard(func)

return func
return decorate

@register(active=False)
def f1():
print('running f1()')

@register()
def f2():
print('running f2()')

def f3():
print('running f3()')

运行效果:

1
2
3
4
5
>>> import registration_param
running register(active=False)->decorate(<function f1 at 0x7fa801b1bc10>)
running register(active=True)->decorate(<function f2 at 0x7fa801b1bca0>)
>>> registration_param.registry
{<function f2 at 0x7fa801b1bca0>}

decorate 是装饰器,必须返回一个函数。register 是装饰器工厂函数,返回 decorate。
只有 active 参数的值为 True 时才注册 func;若 active 不为真,且 func 在 registry 中,则将 func 移除。
@register 工厂函数必须作为函数调用,传入所需参数(或 @register())。

若不使用 @ 句法,也可以像常规函数那样使用 register:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> from registration_param import *
running register(active=False)->decorate(<function f1 at 0x7fc32e6d0b80>)
running register(active=True)->decorate(<function f2 at 0x7fc32e6d0c10>)
>>> registry
{<function f2 at 0x7fc32e6d0c10>}
>>> register()(f3)
running register(active=True)->decorate(<function f3 at 0x7fc32e6d0af0>)
<function f3 at 0x7fc32e6d0af0>
>>> registry
{<function f2 at 0x7fc32e6d0c10>, <function f3 at 0x7fc32e6d0af0>}
>>> register(active=False)(f2)
running register(active=False)->decorate(<function f2 at 0x7fc32e6d0c10>)
<function f2 at 0x7fc32e6d0c10>
>>> registry
{<function f3 at 0x7fc32e6d0af0>}

参数化的 clock 装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# clock_param.py
import time

DEFAULT_FMT = '[{elapsed:0.8f}s] {name}({args}) -> {result}'

def clock(fmt=DEFAULT_FMT):
def decorate(func):
def clocked(*_args):
t0 = time.time()
_result = func(*_args)
elapsed = time.time() - t0
name = func.__name__
args = ', '.join(repr(arg) for arg in _args)
result = repr(_result)
print(fmt.format(**locals()))
return _result
return clocked
return decorate


if __name__ == '__main__':
@clock()
def snooze(seconds):
time.sleep(seconds)

for i in range(3):
snooze(.123)

# => [0.12320948s] snooze(0.123) -> None
# => [0.12319684s] snooze(0.123) -> None
# => [0.12318802s] snooze(0.123) -> None

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# clock_param2.py
import time
from clock_param import clock

@clock('{name}({args}) dt={elapsed:0.3f}s')
def snooze(seconds):
time.sleep(seconds)

for i in range(3):
snooze(.123)

# => snooze(0.123) dt=0.123s
# => snooze(0.123) dt=0.123s
# => snooze(0.123) dt=0.123s

标准库中的装饰器——单分派泛函数

假设需要开发一个调试 Web 应用的工具,能够生成 HTML 来显示不同类型的 Python 对象。

1
2
3
4
5
import html

def htmlize(obj):
content = html.escape(repr(obj))
return '<pre>{}</pre>'.format(content)

上述函数适用于任何 Python 类型。但如果想做进一步扩展,使其能够用不同的方式显示不同的类型:

  • str:把内部换行符替换为 ‘\
    \n’,使用

    进行格式化

  • int:以十进制和十六进制显示数字
  • list:输出 HTML 列表,根据各个元素的类型进行格式化

Python 不支持重载方法或函数,因此不能使用不同的签名定义 htmlize 的变体,也无法使用不同的方式处理不同的数据类型。
一种常见的做法是将 htmlize 变成一个分派函数,使用一系列 if/elif/elif 调用专门的函数,如 htmlize_str、htmlize_int 等。但这样不便于模块的扩展,且显得笨拙。分派函数 htmlize 会随着时间推移变得很大,与各个专门函数之间的耦合也很紧密。

Python 中的 functools.singledispatch 装饰器可以把整体方案拆分为等多个模块,甚至可以为无法修改的类提供专门函数。使用 @singledispatch 装饰的普通函数会变成泛函数(generic function),根据第一个参数的类型以不同方式执行相同操作的一组函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# htmlize.py
from functools import singledispatch
from collections import abc
import numbers
import html

@singledispatch
def htmlize(obj):
content = html.escape(repr(obj))
return '<pre>{}</pre>'.format(content)

@htmlize.register(str)
def _(text):
content = html.escape(text).replace('\n', '<br>\n')
return '<p>{0}</p>'.format(content)

@htmlize.register(numbers.Integral)
def _(n):
return '<pre>{0} (0x{0:x})</pre>'.format(n)

@htmlize.register(tuple)
@htmlize.register(abc.MutableSequence)
def _(seq):
inner = '</li>\n<li>'.join(htmlize(item) for item in seq)
return '<ul>\n<li>' + inner + '</li>\n</ul>'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> from htmlize import htmlize
>>> htmlize({1, 2, 3})
'<pre>{1, 2, 3}</pre>'
>>> htmlize(abs)
'<pre>&lt;built-in function abs&gt;</pre>'
>>> htmlize('Heimlich & Co.\n- a game')
'<p>Heimlich &amp; Co.<br>\n- a game</p>'
>>> htmlize(42)
'<pre>42 (0x2a)</pre>'
>>> print(htmlize(['alpha', 66, {3, 2, 1}]))
<ul>
<li><p>alpha</p></li>
<li><pre>66 (0x42)</pre></li>
<li><pre>{1, 2, 3}</pre></li>
</ul>

@singledispatch 标记处理 object 类型的基函数。各个专门函数使用 @<base_function>.register(<type>) 装饰。
为每个需要特殊处理的类型注册一个函数,numbers.Integral 是 int 的抽象基类。只要可能,注册的专门函数应该尽量处理抽象基类(如 numbers.Integralabc.MutableSequence),不要处理具体实现(如 intlist)。这样代码支持的兼容类型会更广泛(支持抽象基类现有的和未来的具体子类),比如用户可能通过子类化 numbers.Integral 实现固定位数的 int 类型。
可以叠放多个 register 装饰器,让同一个函数支持不同类型。

@singledispatch 可以在系统的任何地方和任何模块中注册专门函数,还可以为不是自己编写的或者不能修改的类添加自定义函数。

参考资料

Fluent Python