跳转至

协程

函数对象和代码对象

每当定义了一个函数之后,就得到一个函数对象

1
2
3
4
5
6
>>> def func():
...     x = 1
...     return x
...
>>> func
<function func at 0x7fd462070510>

函数中的代码是保存在代码对象中的

1
2
>>> func.__code__
<code object func at 0x7fd4620e0780, file "<stdin>", line 1>

代码对象随着函数一起创建,是函数对象的一个重要属性。

代码对象中的重要的属性以co__开头

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
>>> func_code = func.__code__
>>> for attr in dir(func_code):
...     if attr.startswith('co_'):
...         print(f'{attr}\t: {getattr(func_code, attr)}')
...
co_argcount     : 0
co_cellvars     : ()
co_code : b'd\x01}\x00|\x00S\x00'
co_consts       : (None, 1)
co_filename     : <stdin>
co_firstlineno  : 1
co_flags        : 67
co_freevars     : ()
co_kwonlyargcount       : 0
co_lnotab       : b'\x00\x01\x04\x01'
co_name : func
co_names        : ()
co_nlocals      : 1
co_stacksize    : 1
co_varnames     : ('x',)

函数运行帧

函数对象和代码对象保存了函数的基本信息,当函数运行的时候,还需要一个对象来保存运行时的状态。这个对象就是帧对象(Frame object)。

每一次调用函数,都会自动创建一个帧对象,记录当次运行的状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> import inspect
>>> def foo():
...     return inspect.currentframe()  # 获取到函数的运行帧并返回
...
>>>
>>> f1 = foo()  # 由于变量被引用,所以帧不会被垃圾回收
>>> f1
<frame object at 0x7f3f80ed6868>
>>>
>>> f2 = foo()  # 再调用一次,得到另一个帧
>>> f2
<frame object at 0x7f3f80ed6ba8>

函数运行栈 当一个函数中调用了另一个函数,此时前一个函数还没有结束,所以这两个函数的对象是同时存在的。 比如我们的程序一般都始于main函数, 然后又调用其它函数,以此类推 因此,一个程序的运行期,同时存在很多个帧对象。 函数之间的调用关系是先执行的后退出,所以帧对象之间的关系也是先入后出,正好以栈的形式保存。因此,函数的运行真又称为栈帧。 注意:一个线程只有一个函数运栈帧。

生成器函数仍然是函数对象,当然也包括了代码对象。

调用生成器函数不会直接运行(也就是说,不像普通函数那样创建顿对象并且压入函数栈),而是得到一个生成器对象。

当每次使用next()对生成器进行迭代是,都用这个帧(fi_frame)对象来保存状态。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
>>> import inspect
>>> def gen_foo():
...     for _ in range(10):
...         yield inspect.currentframe()
...
>>> gf = gen_foo()
>>> gi_frame = gf.gi_frame  # 存为变量,不然迭代结束后该属性会清空
>>> frames = list(gf)  # 保存所有迭代的结果
>>> print(gf.gi_frame)
None
>>> for f in frames:
...     print(f is gi_frame)
...
True
True
True
True
True
True
True
True
True
True
结论: 1. 生成器函数并不直接运行,而是借助于生成器对象来间接运行 2. 创建生成器对象的同时创建了帧对象,并由生成器对象保持引用 3. 每次使用next()调用生成器时,就是将生成器引用的对象入栈 4. 当next()返回时,也就是代码遇到yield暂停的时候,就是将出栈直到迭代结束,最后一次出栈,并且被销毁

同步和异步

普通函数

  • 调用函数:构建帧对象并入栈
  • 函数结束:帧出栈并销毁

生成器函数

  • 创建生成器:构建帧对象 -(多次)通过next触发执行:帧入栈 -(多次)遇到yield:帧出栈(保留
  • 选代结束:顿出栈并销毁
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
In [2]: def async_task():
   ...:     print("step1")
   ...:     yield
   ...:     print("step2")
   ...:     yield
   ...:     print("step3")
   ...:     yield
   ...:
   ...: # 任务队列
   ...: all_tasks = []
   ...:
   ...: # 创建2个任务
   ...: async_task_a = async_task()
   ...: async_task_b = async_task()
   ...:
   ...: # 加入到任务队列
   ...: all_tasks.append(async_task_a)
   ...: all_tasks.append(async_task_b)
   ...:
   ...: def async_task_run():
   ...:     """一个简陋的一部任务调度器"""
   ...:     for task in all_tasks:
   ...:         next(task)
   ...:
   ...: async_task_run()
   ...: async_task_run()
   ...: async_task_run()
step1
step1
step2
step2
step3
step3

生成器对象是一个用来选代执行生成器函数的选代器。 数据的选代器:针对一个包含很多元素的数据集,逐个返回其中的元素 生成器选代器:针对一个包含很多代码的函数,分段执行其中代码 让一个函数可以多次选代运行其中的代码才是生成器对象最最根本的作用,而不仅是字面意思上的生成数据的东西。 送代产出数据只是选代执行代码的自然结果而已 当用生成器来实现代器的时候,我们关注的是重点是:yield <value>返回出来的数据 如果把焦点集中到被选代执行的代码了上,就能对生成器有个全新的视角,那就是协程。

基于生成器的协程: 在python3.8弃用,在3.11版本移除,使用sync def代替。

协程(yiled版) yield表达式

为生成器增加一个send()方法,该方法可以接受一个入参。 send方法顾名思义,将该参数发送给生成器,使生成器恢复运行的同时,将该入参作为yield表达式的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
>>> def show_yield_value():
...     print("开始")
...     x = yield
...     print('x is ', x)
...
>>> g = show_yield_value()
>>> g.send(None)  # 第一次只能传None
开始
>>> g.send('hello')
x is  hello
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

对于刚创建好的生成器,总是需要在第一次的时候send(None)值,使其运行到yied的地方暂停,这个步骤术语称为prime(激活)。

yield表达式的优先级

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
>>> def add_yield_value():
...     x = yield + 1
...     print('x is ', x)
...
>>> g = add_yield_value()
>>> g.send(None)
1
>>> g.send(1)
x is  1  # 不复合预期
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

yield优先级比较低,需要给yield加括号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
>>> def add_yield_value():
...     x = (yield) + 1
...     print('x is ', x)
...
>>> g = add_yield_value()
>>> g.send(None)
>>> g.send(1)
x is  2
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

send()用法总结

  1. send是生成器对象的方法
  2. 对于生成器对象g.next(g)等价于g.send(None)
  3. 只有当生成器处在暂停状态时,才能传入非None的值
  4. send方法是为了协程而增加的API,所以:
  5. 如果将生成器视作协程,就应该只用send方法
  6. 如果视作送代器,就仍用next

使用close()结束生成器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
>>> def gen_echo():
...     while True:
...         print((yield))
...
>>> echo = gen_echo()
>>> echo.send(None)
>>> for i in range(3):
...    echo.send(i)
...
0
1
2
>>> echo.send(StopIteration('stop'))
stop
上面的例子中无法使用StopIteration结束。

当生成器作为代器来用的时候,它的生命周期取决于有多少元素可以选代。 而当作协程来用的时候,通常可以视作是在执行一个任务,我们希望任务的终止能够变得可控。close()方法就是用来结束一个协程。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
>>> def gen_echo():
...     while True:
...         print((yield))
...
>>> echo = gen_echo()
>>> echo.send(None)
>>> for i in range(3):
...    echo.send(i)
...
0
1
2
>>> echo.send(StopIteration('stop'))
stop
>>> echo.close()
>>> echo.send(4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

由于echo协程的内容非常简单,所以可以直接结束 如果协程的代码比较复杂,它可能需要在结束的时候做一些善后处理,比如释放资源等类似于Stopiteration的实现机制,结束协程也是靠异常来实现的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
>>> def gen_echo():
...     while True:
...         try:
...             x = yield
...         except GeneratorExit:
...             print('exit. bye')
...             return
...         else:
...             print('x')
...
>>> echo = gen_echo()
>>> echo.send(None)
>>> echo.close()
exit. bye
>>>
>>> echo = gen_echo()
>>> echo.send(None)
>>> # 除了显式地调用close方法,如果生成器对象被垃圾回收,也会自动调用c1ose
...
>>> del echo
exit. bye

使用throw()将异常抛给yield

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
>>> def gen_echo():
...     while True:
...         try:
...             x = yield
...         except GeneratorExit:
...             print('exit. bye')
...             return
...         except KeyboardInterrupt:
...             print('按下了 Ctrl-C')
...         else:
...             print('x')
...
>>> echo = gen_echo()
>>> echo.send(None)
>>> echo.throw(KeyboardInterrupt)
按下了 Ctrl-C

yield作用 1.在yield的位置产出数据 2.在yield的位置暂停 3.在yield的位置恢复,并接受新的参数 4.在yield的位置传入结束信号 5.在yield的位置传入其它异常

生成器的三种用法

生成器三种用法: 1. yield x 2. x = yield y 3. field from sleep(1)

生成器有3种模式 1. pull:特点在于能不断向外产出数据,也就是选代器 2. push:特点在于能不断向内发送数据,是非常早期的协程概念 3. task:任务式(是asyncio里的协程)

pull和push是受数据驱动的 task是受任务驱动的

event

事件(event)是一个抽象的概念,就是指一件事情发生了 例如:要休息3秒钟后继续执行滴答,滴答,滴答3秒时间到,这就是一个事件。 再例如:要读取网络数据socket.recv(1024);如果socket还没接受到数据,此时这个调用就会阻塞在这里,直到有数据可读。socket变得可读,这就是一个事件。

事件通常都是通过回调函数(callback)来处理的

协程(yield from版)

yield from语法的实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# https://peps.python.org/pep-0380/
# RESULT = yield from EXPR 等价写法简化版
_i = iter(EXPR)  # __iter__  后面的版本转为await __await__
try:
    _y = _i.send(None)
except StopIteration as _e:  # 直接结束,一次yield都没遇上
    _r = _e.value
else:
    while 1:  # 不遇到StopIteration不算完
        try:
            _s = yield _y  # 照原样yield出去,并接收send传入的值
        except GeneratorExit as _e:  # 处理close
            _i.close
            raise _e
        except BaseException as _e:  # 处理其他异常
            sys.exc_info()
            try:
                _y = _i.throw(*_x)
            except StopIteration as _e:
                _r = _e.value
                break
        else:
            try:
                _y = _i.send(_s)  # 接收到的值原样再send下去
            except StopIteration as _e:
                _r = _e.value
                break
RESULT = _r  # StopIteration带出来的值就是结果

事实上总会send(None),再次简化得到

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
_i = iter(EXPR)  # __iter__  
while 1:  # 不遇到StopIteration不算完
    try:
        _y = _i.send(None)  # 总是None 
    except StopIteration as _e:
        _r = _e.value
        break
    else:
        yield _y  # 原样再send下去,不再接收send传入的值,因为总是None
RESULT = _r  # StopIteration带出来的值就是结果

自定义一个任务

1. 一个同步阻塞的简单任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = small_step()
    print(" - end big task", time.time())
    return small_result + 1

def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 工作中...")
    print("   - end small task", time.time())
    return 1

main_task()

执行结果

1
2
3
4
5
6
7
8
begin task 1730016851.280093
 - begin big task 1730016851.280093
   - begin small task 1730016851.280093
     - 工作中...
   - end small task 1730016851.280093
 - end big task 1730016851.280093
end task 1730016851.280093
big_result is  2

在任务重引入一个阻塞

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import time
def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = small_step()
    print(" - end big task", time.time())
    return small_result + 1

def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    time.sleep(3)  # 引入一个阻塞
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1

main_task()

执行结果

1
2
3
4
5
6
7
8
9
begin task 1730016988.833141
 - begin big task 1730016988.833141
   - begin small task 1730016988.834105
     - 休息一下,马上回来
     - 努力工作中...
   - end small task 1730016991.8345354
 - end big task 1730016991.8345354
end task 1730016991.8345354
big_result is  2

2. 使用yield变协程,让任务不阻塞。

给阻塞的代码加一个yield

1
2
3
4
5
6
7
8
def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    yield time.sleep(3)
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1

如果只是给阻塞的代码简单加一个yield是不可行的,运行报错

1
TypeError: unsupported operand type(s) for +: 'generator' and 'int'

一旦用了yield,调用方也需要做相应的改变

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import time
def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_coro = small_step()    # 改变调用方式
    while True:
        try:
            x = small_coro.send(None)
        except StopIteration as e:
            small_result = e.value
            break
        else:
            pass
    print(" - end big task", time.time())
    return small_result + 1

def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    yield time.sleep(3)
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1

main_task()

此时的任务仍然存在阻塞

1
2
3
4
5
6
7
8
9
begin task 1730017666.1158626
 - begin big task 1730017666.1158626
   - begin small task 1730017666.1158626
     - 休息一下,马上回来
     - 努力工作中...
   - end small task 1730017669.1298668
 - end big task 1730017669.1308722
end task 1730017669.1308722
big_result is  2

将阻塞从下游传到上游

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    start_time = time.time()   
    yield time.sleep, 3  # 把这个行为传递出去
    assert time.time() - start_time > 3, '休息时间不足'  # 通过计算休息时间判断是不是真的sleep了3秒
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1

main_task()

但是并没有真正的消除阻塞

1
2
    assert time.time() - start_time > 3, '休息时间不足'
AssertionError: 休息时间不足

将yield传递到顶层

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import time
def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_coro = big_step()
    while True:
        try:
            x = big_coro.send(None)
        except StopIteration as e:
            big_result = e.value
            break
        else:
           func, arg = x  # func, arg = (time.sleep, 3)
           func(arg)  # time.sleep(3)
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_coro = small_step()
    while True:
        try:
            x = small_coro.send(None)
        except StopIteration as e:
            small_result = e.value
            break
        else:
            yield x
    print(" - end big task", time.time())
    return small_result + 1

def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    start_time = time.time()   
    yield time.sleep, 3  # 把这个行为传递出去
    assert time.time() - start_time > 3, '休息时间不足'  # 通过计算休息时间判断是不是真的sleep了3秒
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1

main_task()

此时任务阻塞被传递到了上游main_task中

1
2
3
4
5
6
7
8
9
begin task 1730018313.6238086
 - begin big task 1730018313.6238086
   - begin small task 1730018313.6238086
     - 休息一下,马上回来
     - 努力工作中...
   - end small task 1730018316.6327195
 - end big task 1730018316.6346214
end task 1730018316.6346214
big_result is  2

由此可知: - 协程自己并不能消除阻塞 - 协程具有传染性 - 协程通过yield把阻塞换个方式传递给了上游 - 最终的阻塞人人需要被解决

#### 3. 使用yield from

  1. 在big_step中使用yield from
1
2
3
4
5
6
def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = yield from small_step()
    print(" - end big task", time.time())
    return small_result + 1
  1. 在small_step使用yield from
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import time
def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_coro = big_step()
    while True:
        try:
            x = big_coro.send(None)
        except StopIteration as e:
            big_result = e.value
            break
        else:
            func, arg = x  # func, arg = (time.sleep, 3)
            func(arg)  # time.sleep(3)
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = yield from small_step()
    print(" - end big task", time.time())
    return small_result + 1


def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    start_time = time.time()
    yield from YieldFromAble((time.sleep, 3))  # 把这个行为传递出去
    assert (
        time.time() - start_time > 3
    ), "休息时间不足"  # 通过计算休息时间判断是不是真的sleep了3秒
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1


class YieldFromAble:
    def __init__(self, value):
        self.value = value

    def __iter__(self):
        yield self.value


main_task()
  1. 在main_step使用yield from
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import time


def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = yield from big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = yield from small_step()
    print(" - end big task", time.time())
    return small_result + 1


def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    start_time = time.time()
    yield from YieldFromAble((time.sleep, 3))  # 把这个行为传递出去
    assert (
        time.time() - start_time > 3
    ), "休息时间不足"  # 通过计算休息时间判断是不是真的sleep了3秒
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1


class YieldFromAble:
    def __init__(self, value):
        self.value = value

    def __iter__(self):
        yield self.value


class Task:
    def __init__(self, coro):
        self.coro = coro

    def run(self):
        print("---------------")
        while True:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                result = e.value
                break
            else:
                func, arg = x  # func, arg = (time.sleep, 3)
                func(arg)  # time.sleep(3)
        print("---------------")


t = Task(main_task())  # 改变调用方式
t.run()

运行正常

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
---------------
begin task 1730019257.0435085
 - begin big task 1730019257.0435085
   - begin small task 1730019257.0435085
     - 休息一下,马上回来
     - 努力工作中...
   - end small task 1730019260.0521905
 - end big task 1730019260.0521905
end task 1730019260.0521905
big_result is  2
---------------

进一步优化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time


def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = yield from big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = yield from small_step()
    print(" - end big task", time.time())
    return small_result + 1


def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    start_time = time.time()
    yield from YieldFromAble((time.sleep, 3))  # 把这个行为传递出去
    assert (
        time.time() - start_time > 3
    ), "休息时间不足"  # 通过计算休息时间判断是不是真的sleep了3秒
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1


class YieldFromAble:
    def __init__(self, value):
        self.value = value

    def __iter__(self):
        # yield self.value
        yield self  # 不再传值,直接传自身


class Task:
    """一个通用的任务处理"""
    def __init__(self, coro):
        self.coro = coro

    def run(self):
        print("---------------")
        while True:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                result = e.value
                break
            else:
                assert isinstance(x, YieldFromAble)
                func, arg = x.value  # func, arg = (time.sleep, 3)
                func(arg)  # time.sleep(3)
        print("---------------")


t = Task(main_task())  # 改变调用方式
t.run()

awiat/async

yield from过度到await

三步: 1. yield from替换为await 2. __iter__替换为__await__ 3. 函数前面加async

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import time


async def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = await big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


async def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = await small_step()
    print(" - end big task", time.time())
    return small_result + 1


async def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    print("     - 休息一下,马上回来")
    start_time = time.time()
    await Awaitable((time.sleep, 3))  # 把这个行为传递出去
    assert (
        time.time() - start_time > 3
    ), "休息时间不足"  # 通过计算休息时间判断是不是真的sleep了3秒
    print("     - 努力工作中...")
    print("   - end small task", time.time())
    return 1


class Awaitable:  # 
    def __init__(self, value):
        self.value = value

    def __await__(self):
        # yield self.value
        yield self  # 不再传值,直接传自身


class Task:
    """一个通用的任务处理"""
    def __init__(self, coro):
        self.coro = coro

    def run(self):
        print("---------------")
        while True:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                result = e.value
                break
            else:
                assert isinstance(x, Awaitable)
                func, arg = x.value  # func, arg = (time.sleep, 3)
                func(arg)  # time.sleep(3)
        print("---------------")


t = Task(main_task())  # 改变调用方式
t.run()

什么是awaitable? 1. 如果一个类实现了__await__那么这个类就是awaitable 2. small_result = await small_step() small_step()就是awaitable

协作式的多任务

当前Task.run是一个普通函数,对其进行改造

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class Task:
    """一个通用的任务处理"""
    def __init__(self, coro):
        self.coro = coro
        self._done = False
        self._result = None
    def run(self):
        print("---------------")
        if not self._done:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                self._result = e.value
                self._done = True
            else:
                assert isinstance(x, Awaitable)
                func, arg = x.value  # func, arg = (time.sleep, 3)
                func(arg)  # time.sleep(3)
        print("---------------")

此时任务在sleep3s之后直接退出了

1
2
3
4
5
6
---------------
begin task 1730023922.2244272
 - begin big task 1730023922.2244272
   - begin small task 1730023922.2244272
     - 休息一下,马上回来
---------------

在sleep3s的过程中做其他事情。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Task:
    """一个通用的任务处理"""

    def __init__(self, coro):
        self.coro = coro
        self._done = False
        self._result = None

    def run(self):
        print("---------------")
        if not self._done:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                self._result = e.value
                self._done = True
            else:
                assert isinstance(x, Awaitable)
        print("---------------")


t = Task(main_task())  # 改变调用方式
t.run()
# 等待3s
for _ in range(10):
    print("doing something...")
    time.sleep(0.5)
t.run()

在等待的间隙做其他事情。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
---------------
begin task 1730024119.5251992
 - begin big task 1730024119.5251992
   - begin small task 1730024119.5251992
     - 休息一下马上回来
---------------
doing something...
doing something...
doing something...
doing something...
doing something...
doing something...
doing something...
doing something...
doing something...
doing something...
---------------
     - 努力工作中...
   - end small task 1730024124.6075497
 - end big task 1730024124.6075497
end task 1730024124.6075497
big_result is  2
---------------

自定义EventLoop

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class EventLoop:
    """相当于一个任务调度器"""

    def __init__(self):
        self._ready = collections.deque()  # 一个双向任务队列,先入先出
        self._scheduled = []  # 定时任务,需要保持一个顺序,
        self._stopping = False  # 提供一个标志位,用来结束run_once中的循环

    def call_soon(self, callback, *args):
        """把任务加入任务队列"""
        self._ready.append((callback, args))

    def call_later(self, delay, callback, *args):
        """利用堆来存储定时任务"""
        t = time.time() + delay
        heapq.heappush(self._scheduled, (t, callback, args))  ## 让时间最小的排在堆顶的位置

    def stop(self):
        self._stopping = True

    def run_forever(self):
        while True:
            self.run_once()  # 保证循环至少执行一轮
            if self._stopping:
                break

    def run_once(self):
        # 处理定时任务
        now = time.time()
        if self._scheduled:
            if self._scheduled[0][0] < now:
                _, cb, args = heapq.heappop(self._scheduled)
                self._ready.append((cb, args))  # 符合要求的定时任务加入就绪队列

        # 把就绪的任务拿出来执行
        num = len(self._ready)
        for i in range(num):
            cb, args = self._ready.popleft()
            cb(*args)


if __name__ == "__main__":

    loop = EventLoop()
    t = Task(main_task())
    loop.call_soon(t.run)
    loop.call_later(3, t.run)
    loop.call_later(4, loop.stop)
    loop.run_forever()

改造一下Task

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import collections
import heapq
import itertools
import random
import time


async def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = await big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


async def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = await small_step()
    print(" - end big task", time.time())
    return small_result + 1


async def small_step():
    """一个小任务"""
    print("   - begin small task", time.time())
    t1 = time.time()
    sleep_time = random.random()
    await Awaitable(sleep_time)  # sleep_time最终传递给Task
    assert (time.time() - t1 > sleep_time), "休息时间不足"  # 通过计算休息时间判断是不是真的sleep了3秒
    print("   - end small task", time.time())
    return sleep_time


class Awaitable:  # YieldFromAble 替换为Awaitable
    """如果一个类中实现了__await__就是Awaitable"""

    def __init__(self, value):
        self.value = value

    def __await__(self):
        yield self  # 不再传值,直接传自身
task_id_counter = itertools.count(1)

class Task:
    """一个通用的任务处理"""

    def __init__(self, coro):
        self.coro = coro
        self._done = False
        self._result = None
        self._id = "Task-{}".format(next(task_id_counter))

    def run(self):
        print("--------Task {}-------".format(self._id))
        if not self._done:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                self._result = e.value
                self._done = True
            else:
                assert isinstance(x, Awaitable)
                loop.call_later(x.value, self.run)
        print("---------------")


class EventLoop:
    """相当于一个任务调度器"""

    def __init__(self):
        self._ready = collections.deque()  # 一个双向任务队列,先入先出
        self._scheduled = []  # 定时任务,需要保持一个顺序,
        self._stopping = False  # 提供一个标志位,用来结束run_once中的循环

    def call_soon(self, callback, *args):
        """把任务加入任务队列"""
        self._ready.append((callback, args))

    def call_later(self, delay, callback, *args):
        """利用堆来存储定时任务"""
        t = time.time() + delay
        heapq.heappush(self._scheduled, (t, callback, args))  ## 让时间最小的排在堆顶的位置

    def stop(self):
        self._stopping = True

    def run_forever(self):
        while True:
            self.run_once()  # 保证循环至少执行一轮
            if self._stopping:
                break

    def run_once(self):
        # 处理定时任务
        now = time.time()
        if self._scheduled:
            if self._scheduled[0][0] < now:
                _, cb, args = heapq.heappop(self._scheduled)
                self._ready.append((cb, args))  # 符合要求的定时任务加入就绪队列

        # 把就绪的任务拿出来执行
        num = len(self._ready)
        for i in range(num):
            cb, args = self._ready.popleft()
            cb(*args)


if __name__ == "__main__":

    loop = EventLoop()
    for i in range(3): 
        t = Task(main_task())
        loop.call_soon(t.run)
    loop.call_later(4, loop.stop)
    loop.run_forever()

先完成的任务先结束

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
--------Task Task-1-------
begin task 1730035454.8707514       
 - begin big task 1730035454.8707514
   - begin small task 1730035454.8707514
---------------
--------Task Task-2-------
begin task 1730035454.8707514
 - begin big task 1730035454.8707514
   - begin small task 1730035454.8707514
---------------
--------Task Task-3-------
begin task 1730035454.8707514
 - begin big task 1730035454.8707514
   - begin small task 1730035454.8707514
---------------
--------Task Task-3-------
   - end small task 1730035454.983271
 - end big task 1730035454.983271
end task 1730035454.983271
big_result is  1.112460818788886
---------------
--------Task Task-1-------
   - end small task 1730035454.9969466
 - end big task 1730035454.9978602
end task 1730035454.9978602
big_result is  1.1256256122808315
---------------
--------Task Task-2-------
   - end small task 1730035455.7463698
 - end big task 1730035455.7463698
end task 1730035455.7473648
big_result is  1.873241963935576
---------------

Future

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import collections
import heapq
import itertools
import random
import time
import threading


def fake_io_read(future):
    def read():
        time.sleep(random.random())
        future.set_result(random.randint(1, 100))

    threading.Thread(target=read).start()


async def main_task():
    """一个主任务"""
    print("begin task", time.time())
    big_result = await big_step()
    print("end task", time.time())
    print("big_result is ", big_result)


async def big_step():
    """一个大任务"""
    print(" - begin big task", time.time())
    small_result = await small_step()
    print(" - end big task", time.time())
    return small_result + 1


async def small_step():
    global loop
    fut = Future()

    # 指派一个目标执行set_result
    fake_io_read(fut)  # 模拟一个IO操作  把Future和IO操作绑定到一起
    result = await fut
    return result


class Future:

    def __init__(self):
        global loop
        self._loop = loop
        self._result = None
        self._done = None
        self._callbacks = []

    def set_result(self, result):
        if self._done:
            raise RuntimeError("future already done.")
        self._result = result
        self._done = True

        for cb in self._callbacks:
            self._loop.call_soon(cb)

    def result(self):
        if self._done:
            return self._result
        else:
            raise RuntimeError("future is not done")

    def add_done_callback(self, callback):
        self._callbacks.append(callback)

    def __await__(self):
        yield self
        return self.result()


task_id_counter = itertools.count(1)


class Task(Future):
    """一个通用的任务处理"""

    def __init__(self, coro):
        super().__init__()
        # self._result = None
        # self._done = None
        self.coro = coro
        self._id = "Task-{}".format(next(task_id_counter))
        self._loop.call_soon(self.run)

    def run(self):
        print("--------Task {}-------".format(self._id))
        if not self._done:
            try:
                x = self.coro.send(None)
            except StopIteration as e:
                # self._result = e.value
                # self._done = True
                self.set_result(e.value)
            else:
                assert isinstance(x, Future)
                x.add_done_callback(self.run)  # 重新激活task
        print("---------------")


class EventLoop:
    """相当于一个任务调度器"""

    def __init__(self):
        self._ready = collections.deque()  # 一个双向任务队列,先入先出
        self._scheduled = []  # 定时任务,需要保持一个顺序,
        self._stopping = False  # 提供一个标志位,用来结束run_once中的循环

    def call_soon(self, callback, *args):
        """把任务加入任务队列"""
        self._ready.append((callback, args))

    def call_later(self, delay, callback, *args):
        """利用堆来存储定时任务"""
        t = time.time() + delay
        heapq.heappush(
            self._scheduled, (t, callback, args)
        )  ## 让时间最小的排在堆顶的位置

    def stop(self):
        self._stopping = True

    def run_forever(self):
        while True:
            self.run_once()  # 保证循环至少执行一轮
            if self._stopping:
                break

    def run_once(self):
        # 处理定时任务
        now = time.time()
        if self._scheduled:
            if self._scheduled[0][0] < now:
                _, cb, args = heapq.heappop(self._scheduled)
                self._ready.append((cb, args))  # 符合要求的定时任务加入就绪队列

        # 把就绪的任务拿出来执行
        num = len(self._ready)
        for i in range(num):
            cb, args = self._ready.popleft()
            cb(*args)


if __name__ == "__main__":

    loop = EventLoop()
    for i in range(3):
        t = Task(main_task())

    loop.call_later(1, loop.stop)
    loop.run_forever()