龙空技术网

用一万字从0到1讲解Python中的Async IO

SuperOps 3061

前言:

当前各位老铁们对“python中sleep怎么用”大约比较关怀,大家都需要剖析一些“python中sleep怎么用”的相关内容。那么小编同时在网摘上搜集了一些关于“python中sleep怎么用””的相关资讯,希望兄弟们能喜欢,兄弟们一起来学习一下吧!

Async IO是一种并行编程设计,它在Python中得到了专门的支持,从Python 3.4开始支持,目前越来越完善,可能还会进一步发展。

到目前为止,python的编程模型已经具备: “并发、并行、多线程、多进程”。已经有很多东西需要理解了。Async IO适合在哪里?

本教程旨在帮助您回答这个问题,使您更牢固地掌握Python实现Async IO的方法。

以下是你将涵盖的内容:

Async IO (async IO):一种与语言无关的范式(模型),它具有跨一系列编程语言的实现

async/await:两个用于定义协程的新的Python关键字asyncio: Python包,为运行和管理协程提供基础API协程(专门的生成器函数)是Python中Async IO的核心,稍后我们将深入研究它们。

注意:在本文中,我使用术语async IO来表示异步IO的语言无关设计,而asyncio指的是Python包。

在开始之前,您需要确保已经设置好使用本教程中提供的asyncio和其他库。

设置环境

你需要Python 3.7或以上版本才能完整理解本文,以及aiohttp和aiofiles包:

$ python3.7 -m venv ./py37async$ source ./py37async/bin/activate  # Windows: .\py37async\Scripts\activate.bat$ pip install --upgrade pip aiohttp aiofiles  # Optional: aiodns

有关安装python环境的安装和pip的使用参考我的上篇!

Async IO深入理解

Async IO比它久经考验的表兄弟多进程和多线程更鲜为人知。本节将让您更全面地了解什么是Async IO以及它如何与周围环境相适应。

Async IO适合在哪里?

并发性和并行性是扩展的课题,本文只是简单说明。虽然本文主要讨论的是Async IO及其在Python中的实现,但还是有必要花点时间来比较一下Async IO与它的对应版本,以便了解Async IO如何适用于这个更大的、有时令人眼花缭乱的课题。

并行性包括同时执行多个操作。多进程是实现并行的一种方法,它需要将任务分散到计算机的中央处理单元(cpu或核心)上。多进程非常适合cpu绑定的任务:紧密绑定的for循环和数学计算通常属于这一类。

并发性是一个比并行性更宽泛的术语。它表明多个任务能够以重叠的方式运行。(有一种说法是,并发并不意味着并行。)

线程是一种并发执行模型,多个线程轮流执行任务。一个进程可以包含多个线程。由于GIL的存在,Python与线程之间的关系非常复杂,但这超出了本文的范围。

了解线程的重要之处在于它更适合 IO-bound的任务。CPU-bound的任务的特征是计算机的核心从开始到结束都在努力工作,而io绑定的任务主要是等待输入/输出完成。

综上所述,并发包括多进程(非常适合CPU-bound的任务)和线程(适合 IO-bound的任务)。多进程是并行的一种形式,并行是并发的一种特定类型(子集)。Python标准库通过其多进程、线程和并发为这两者提供了长期的支持。

现在是时候加入一个新成员了。在过去的几年里,CPython中更全面地内置了一个单独的设计:Async IO,通过标准库的asyncio包以及新的async和await语言关键字启用。需要明确的是,Async IO不是一个新发明的概念,它已经存在或正在构建到其他语言和运行时环境中,如Go、c#或Scala。

Python文档将asyncio包作为编写并发代码的库。然而,Async IO不是线程,也不是多进程。它不是建立在这两者之上的。

事实上,Async IO是一种单线程、单进程的设计:它使用协同多任务处理,这个术语将在本教程的最后进一步充实。换句话说,Async IO给出了一个使用一个线程并发的感觉,尽管在一个处理器上。协程(Async IO的一个核心特性)可以并发调度,但它们本身并不是并发的。

重申一下,Async IO是并发编程的一种风格,但它不是并行。与多进程相比,它与线程更接近,但与这两者有很大的不同,是并发的技巧包中的一个独立成员。

还剩下一项。异步是什么意思?这不是一个严格的定义,但为了我们的目的,我可以想到两个性质:

异步协程能够在等待最终结果时“暂停”,同时让其他协程运行。通过上述机制,异步代码促进了并发执行。换句话说,异步代码提供了并发的外观和感觉。

这里有一个图表,把所有这些放在一起。白色的词代表概念,绿色的词代表它们被实现或影响的方式:

并发编程模型之间的比较上点到为止…不再深入细节。本教程将重点介绍Async IO的子组件、如何使用它以及围绕它涌现的api。

Async IO解释

Async IO乍一看似乎是违反直觉和矛盾的。促进并发代码的东西是如何使用单个线程和单个CPU核心的?我没有很匹配的例子,所以我想套用一个来自米格尔格林贝格2017 PyCon说的话,他的话很漂亮的解释了一切

国际象棋大师Judit波尔加主持一个象棋展览中,她扮演多个业余选手。她有两种方式进行展览:同步和异步。

假设:

24的对手

Judit每下一盘棋都需要5秒

对手每花费55秒移动

游戏平均30对移动(总共60步)

同步版本:Judit每次只玩一款游戏,而不会同时玩两款游戏,直到游戏完成。每款游戏耗时(55 + 5)* 30 == 1800秒,即30分钟。整个展览需要24 * 30 == 720分钟,即12个小时。

异步版本:Judit并发版本。她离开桌子,让对手在等待时间内采取下一步行动。在所有24局游戏中,Judit的一次移动需要24 * 5 == 120秒,也就是2分钟。整个展览现在减少120 * 30 = = 3600秒,或1小时。(Source)

只有一个Judit Polgár,她只有两只手,一次只能自己做一个动作。但异步体验将展览时间从12小时缩短至1小时。所以,合作多任务的一种方式,程序的事件循环(稍后详细介绍)与多个任务,让每一个轮流运行在最佳的时间。

Async IO需要很长的等待时间,否则函数将被阻塞,并允许其他函数在停机期间运行。(阻塞函数有效地禁止其他函数从它启动到它返回的时间内运行。)

Async IO并不容易

我听说过这样的说法:“尽可能使用Async IO;必要时使用线程。”事实上,构建持久的多线程代码非常困难,而且容易出错。Async IO避免了在线程设计中可能遇到的一些潜在的障碍。

但这并不是说Python中的Async IO很容易。需要注意的是:当您在表面层次以下进行尝试时,异步编程也可能非常困难!Python的异步模型是围绕回调、事件、传输、协议和未来等概念构建的——仅是术语就可能令人生畏。事实上,它的API一直在变化,这一点并不容易。

幸运的是,asyncio已经成熟到一个程度,它的大多数特性不再是临时的,同时它的文档也得到了巨大的修改,一些关于这个主题的高质量资源也开始出现。

asyncio包和async/await

本篇假设你有一些Async IO基本知识的前提下,我们探索下Python的实现。Python的asyncio包(在Python 3.4中引入的)和它的两个关键词,async和await,这两个关键字共同承担起了Async IO的协程创建,执行,调度等工作。

async/await语法和本地协程

注意:你在网上读到的东西一定要小心甄别。Python的Async IO API已经从Python 3.4快速迭代到Python 3.7甚至以后的更高阶版本。一些旧的使用方式不再被推荐和废弃,之前一些被禁用的功能现在又开放了。

Async IO的核心是协程。协程是Python生成器函数的定制版本。让我们从基线协程定义开始:协程是一个可以在到达返回之前暂停执行的函数,它可以在一段时间内间接地将控制传递给另一个协程。

稍后,您将深入了解如何将传统generator重新用于协程。目前,了解协同程序如何工作的最简单方法是开始编写一些协程程序。

下面这个简短的程序是Async IO的Hello World,但在说明其核心功能方面有很长的路要走:

#!/usr/bin/env python3# countasync.pyimport asyncioasync def count():   # 注意函数的定义方式需要用async def    print("One")    await asyncio.sleep(1)  # 注意此处和常规time.sleep()的差异    print("Two")async def main():    await asyncio.gather(count(), count(), count())if __name__ == "__main__":    import time    s = time.perf_counter()    asyncio.run(main())    elapsed = time.perf_counter() - s    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

当你执行这个文件时,注意你和只使用def和time.sleep()定义函数有什么不同:

$ python3 countasync.pyOneOneOneTwoTwoTwocountasync.py executed in 1.01 seconds.

输出的顺序性是Async IO的核心。与count()的每个调用通信的是单个事件循环或调度器。当每个任务到达await asyncio.sleep(1)时,该函数向事件循环发出信号,并将控制权交还给它,说:“我将休眠1秒。继续吧,在此期间做一些有意义的事情。”

对比一下同步版本:

#!/usr/bin/env python3# countsync.pyimport timedef count():    print("One")    time.sleep(1)    print("Two")def main():    for _ in range(3):        count()        if __name__ == "__main__":    s = time.perf_counter()    main()    elapsed = time.perf_counter() - s    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

当执行时,顺序和执行时间有一个轻微但关键的变化:

$ python3 countsync.pyOneTwoOneTwoOneTwocountsync.py executed in 3.01 seconds.

虽然使用time.sleep()和asyncio.sleep()看起来很平庸,但它们被用作任何涉及到等待时间的时间密集型服务的替代品。(您可以等待的最普通的事情是sleep()调用,它基本上什么都不做。)也就是说,time.sleep()可以表示任何耗时的阻塞函数调用,而asyncio.sleep()则用于代替非阻塞调用(但也需要一些时间才能完成)。

在下一节中您将看到,等待某些东西(包括asyncio.sleep())的好处是,周围的函数可以暂时将控制权让与另一个更容易立即执行某些操作的函数。相比之下,time.sleep()或任何其他阻塞调用都与异步Python代码不兼容,因为它将在睡眠时间期间停止运行中的一切。

Async IO的规则

学到这个地方已经能够清楚async、await和它们所创建的协程函数在执行的过程中是有顺序保证的。这部分内容有点密集,但是掌握async/await是很有用的,所以如果你需要的话可以回到这里:

语法async def引入了本机协程或异步生成器。表达式async with和async for也是针对with上下文控制和for循环控制引入的协程模型,下面内容将会介绍到。

关键字await将函数控制传递回事件循环。(它暂停了周围协程的执行。)如果Python在g()的作用域内遇到await f()表达式,await就是这样告诉事件循环的:“暂停g()的执行,直到我正在等待的内容(f()的结果)返回。”与此同时, 让出执行交由其他协程去运行。”

在代码中,第二点看起来大致是这样的:

async def g():    # Pause here and come back to g() when f() is ready    r = await f()    return r

关于何时以及如何使用async/await还有一组严格的规则。无论你是还在学习语法还是已经接触到async/await,掌握这些规则都很有用:

用async def引入的函数是协程。它可以使用await、return或yield,但所有这些都是可选的。声明async def noop(): pass有效:使用await和/或return创建一个协程函数。要调用协程函数,必须等待它获得结果。在async def块中使用yield不太常见(直到最近才在Python中合法)。这将创建一个异步生成器,您可以使用async for对其进行迭代。任何用async def定义的东西都不能使用yield from,这将引发SyntaxError。就像在def函数之外使用yield是一个SyntaxError,在async def协程之外使用await也是一个SyntaxError。只能在协程的主体中使用await。

以下是一些简单的例子,旨在总结上述几条规则:

async def f(x):    y = await z(x)  # OK - `await` and `return` allowed in coroutines    return y    async def g(x):    yield x  # OK - this is an async generator    async def m(x):    yield from gen(x)  # No - SyntaxError    def m(x):    y = await z(x)  # Still no - SyntaxError (no `async def` here)    return y

最后,当使用await f()时,要求f()是一个可等待的对象。那可没什么用,不是吗?现在,只需要知道可等待对象要么是

(1) 另一个协程

要么是

(2) 定义了返回迭代器的.__await__()方法的对象。

如果您正在编写一个程序,对于大多数目的,您应该只需要担心情况#1。

这就引出了另一个你可能会看到的技术区别:将一个函数标记为协程的较老方法是用@asyncio.coroutine修饰一个普通的def函数。结果是一个基于生成器的协程。自Python 3.5中引入async/await语法以来,这种结构就已经过时了

这两个协程本质上是等价的(都是可等待的),但第一个是基于生成器的,而第二个是原生协程:

import asyncio# python3.4及其之前版本@asyncio.coroutinedef py34_coro():    """Generator-based coroutine, older syntax"""    yield from stuff()    # python3.5以后的用法    async def py35_coro():    """Native coroutine, modern syntax"""    await stuff()

如果您自己编写任何代码,建议您直接使用async和await来创建协程。基于生成器的协程将在Python 3.10中删除

在本教程的后半部分,我们将接触基于生成器的协程,在这里用它只是为了进行解释引入async/await的原因是为了使协程成为Python的一个独立特性,可以很容易地与普通生成器函数区分开来,从而减少歧义。

废话不多说,让我们看几个更复杂的例子。

下面是Async IO如何减少等待时间的一个例子:给定一个协程makerrandom(),它不断生成范围为[0,10]的随机整数,直到其中一个整数超过阈值,您希望让这个协程的多个调用不需要彼此连续等待对方完成。你可以在很大程度上遵循上面两个脚本的模式,只需要稍作改动:

#!/usr/bin/env python3# rand.pyimport asyncioimport random# ANSI colorsc = (    "\033[0m",   # End of color    "\033[36m",  # Cyan    "\033[91m",  # Red    "\033[35m",  # Magenta)async def makerandom(idx: int, threshold: int = 6) -> int:    print(c[idx + 1] + f"Initiated makerandom({idx}).")    i = random.randint(0, 10)    while i <= threshold:        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")        await asyncio.sleep(idx + 1)        i = random.randint(0, 10)    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])    return i    async def main():    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))    return res    if __name__ == "__main__":    random.seed(444)    r1, r2, r3 = asyncio.run(main())    print()    print(f"r1: {r1}, r2: {r2}, r3: {r3}")

彩色输出比我能说的多得多,并让你了解这个脚本是如何执行的:

这个程序使用一个主协程makerrandom(),并在3个不同的输入上并发地运行它。大多数程序将包含小型的、模块化的协同程序和一个包装器函数,用于将每个较小的协同程序链接在一起。然后使用main()通过跨一些可迭代对象或map映射中心协程来收集任务(期货)。

在这个小型的例子中,池范围(3)。在后面给出的更完整的示例中,它是一组需要并发请求、解析和处理的URL, main()为每个URL封装了整个协程。

虽然“生成随机整数”(这是cpu绑定的)可能不是asyncio的最佳候选选项,但在本例中使用asyncio.sleep()是为了模拟涉及到不确定等待时间的io绑定程序。例如,asyncio.sleep()调用可能表示在消息应用程序中的两个客户机之间发送和接收不那么随机的整数。

Async IO设计模式

Async IO自带一组易用的开箱即用的模式,比如批量,分类,链式调用等,接下来会介绍这些。

chain协同程序

协程的一个关键特性是它们可以链接在一起。(记住,一个协程对象是可等待的,所以另一个协程可以等待它。)这允许你将程序分解成更小的、可管理的、可回收的协程:

#!/usr/bin/env python3# chained.pyimport asyncioimport randomimport timeasync def part1(n: int) -> str:    i = random.randint(0, 10)    print(f"part1({n}) sleeping for {i} seconds.")    await asyncio.sleep(i)    result = f"result{n}-1"    print(f"Returning part1({n}) == {result}.")    return resultasync def part2(n: int, arg: str) -> str:    i = random.randint(0, 10)    print(f"part2{n, arg} sleeping for {i} seconds.")    await asyncio.sleep(i)    result = f"result{n}-2 derived from {arg}"    print(f"Returning part2{n, arg} == {result}.")    return result    async def chain(n: int) -> None:    start = time.perf_counter()    p1 = await part1(n)    p2 = await part2(n, p1)    end = time.perf_counter() - start    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")    async def main(*args):    await asyncio.gather(*(chain(n) for n in args))    if __name__ == "__main__":    import sys    random.seed(444)    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])    start = time.perf_counter()    asyncio.run(main(*args))    end = time.perf_counter() - start    print(f"Program finished in {end:0.2f} seconds.")

请仔细注意输出,其中part1()休眠的时间是可变的,而part2()开始处理可用的结果:

$ python3 chained.py 9 6 3part1(9) sleeping for 4 seconds.part1(6) sleeping for 4 seconds.part1(3) sleeping for 0 seconds.Returning part1(3) == result3-1.part2(3, 'result3-1') sleeping for 4 seconds.Returning part1(9) == result9-1.part2(9, 'result9-1') sleeping for 7 seconds.Returning part1(6) == result6-1.part2(6, 'result6-1') sleeping for 4 seconds.Returning part2(3, 'result3-1') == result3-2 derived from result3-1.-->Chained result3 => result3-2 derived from result3-1 (took 4.00 seconds).Returning part2(6, 'result6-1') == result6-2 derived from result6-1.-->Chained result6 => result6-2 derived from result6-1 (took 8.01 seconds).Returning part2(9, 'result9-1') == result9-2 derived from result9-1.-->Chained result9 => result9-2 derived from result9-1 (took 11.01 seconds).Program finished in 11.01 seconds.

在这种设置中,main()的运行时将等于它收集和调度的任务的最大运行时。

使用一个queue

asyncio包提供的队列类被设计成类似于queue模块的类。到目前为止,在我们的示例中,我们还没有真正需要队列结构。在chain. py中,每个任务(future)都由一组协程组成,它们显式地相互等待,并通过每个chain的单个输入。

还有一种替代结构也可以与Async IO一起工作:一些彼此不关联的生产者向队列中添加项。每个生产者可以在交错的、随机的、未通知的时间向队列中添加多个项目。一群消费者在商品出现时,贪婪地从队列中拉出商品,没有等待任何其他信号。

在这种设计中,没有任何个体消费者与生产者之间的链接。消费者事先不知道生产者的数量,甚至不知道将添加到队列中的项目的累积数量。

单个生产者或消费者分别需要花费可变的时间从队列中放置和提取项目。队列作为一个吞吐量,可以与生产者和消费者进行通信,而不需要它们彼此直接通信。

注意:由于queue.Queue()的线程安全,队列经常被用在多线程程序中,当涉及到Async IO时,您不需要担心线程安全。(唯一的例外是将两者结合起来,但在本教程中没有这样做。)

队列的一个用例(就像这里的情况一样)是让队列充当生产者和消费者的传输器,它们之间没有直接链接或关联。

该程序的同步版本看起来相当糟糕:一组阻塞生产者连续向队列添加项,每次一个生产者。只有在所有生产者完成之后,队列才能被一个消费者逐项处理。这种设计有大量的延迟。项目可能闲置在队列中,而不是被立即提取和处理。

下面是异步版本asyncq.py。该工作流程的挑战性在于,需要向消费者发出生产已经完成的信号。否则,await q.get()将无限期挂起,因为队列将被完全处理,但消费者不会知道生产已经完成。

以下是完整的脚本:

#!/usr/bin/env python3# asyncq.pyimport asyncioimport itertools as itimport osimport randomimport timeasync def makeitem(size: int = 5) -> str:    return os.urandom(size).hex()async def randsleep(caller=None) -> None:    i = random.randint(0, 10)    if caller:        print(f"{caller} sleeping for {i} seconds.")    await asyncio.sleep(i)    async def produce(name: int, q: asyncio.Queue) -> None:    n = random.randint(0, 10)    for _ in it.repeat(None, n):  # Synchronous loop for each single producer        await randsleep(caller=f"Producer {name}")        i = await makeitem()        t = time.perf_counter()        await q.put((i, t))        print(f"Producer {name} added <{i}> to queue.")        async def consume(name: int, q: asyncio.Queue) -> None:    while True:        await randsleep(caller=f"Consumer {name}")        i, t = await q.get()        now = time.perf_counter()        print(f"Consumer {name} got element <{i}>"            f" in {now-t:0.5f} seconds.")        q.task_done()        async def main(nprod: int, ncon: int):    q = asyncio.Queue()    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]    await asyncio.gather(*producers)    await q.join()  # Implicitly awaits consumers, too    for c in consumers:    c.cancel()    if __name__ == "__main__":    import argparse    random.seed(444)    parser = argparse.ArgumentParser()    parser.add_argument("-p", "--nprod", type=int, default=5)    parser.add_argument("-c", "--ncon", type=int, default=10)    ns = parser.parse_args()    start = time.perf_counter()    asyncio.run(main(**ns.__dict__))    elapsed = time.perf_counter() - start    print(f"Program completed in {elapsed:0.5f} seconds.")

前几个协程是辅助函数,它们返回一个随机字符串、一个几分之一秒的性能计数器和一个随机整数。生产者将1到5个条目放入队列中。每个项都是(i, t)的元组,其中i是一个随机字符串,t是生产者试图将元组放入队列的时间。

当使用者取出一个元素时,它只需使用放入元素时的时间戳计算该元素在队列中停留的时间。

请记住,asyncio.sleep()用于模拟其他一些更复杂的协程,如果它是一个常规的阻塞函数,则会占用时间并阻塞所有其他执行。

下面是两个生产者和五个消费者的测试:

$ python3 asyncq.py -p 2 -c 5Producer 0 sleeping for 3 seconds.Producer 1 sleeping for 3 seconds.Consumer 0 sleeping for 4 seconds.Consumer 1 sleeping for 3 seconds.Consumer 2 sleeping for 3 seconds.Consumer 3 sleeping for 5 seconds.Consumer 4 sleeping for 4 seconds.Producer 0 added <377b1e8f82> to queue.Producer 0 sleeping for 5 seconds.Producer 1 added <413b8802f8> to queue.Consumer 1 got element <377b1e8f82> in 0.00013 seconds.Consumer 1 sleeping for 3 seconds.Consumer 2 got element <413b8802f8> in 0.00009 seconds.Consumer 2 sleeping for 4 seconds.Producer 0 added <06c055b3ab> to queue.Producer 0 sleeping for 1 seconds.Consumer 0 got element <06c055b3ab> in 0.00021 seconds.Consumer 0 sleeping for 4 seconds.Producer 0 added <17a8613276> to queue.Consumer 4 got element <17a8613276> in 0.00022 seconds.Consumer 4 sleeping for 5 seconds.Program completed in 9.00954 seconds.

在本例中,条目的处理时间不到一秒。延迟可能是由于两个原因:

标准的,基本上不可避免的开销

当一个项出现在队列中时,所有消费者都处于睡眠状态

至于第二个原因,幸运的是,扩展到成百上千的消费者是很正常的。使用python3 asyncq.py -p 5 -c 100应该没有问题。这里的要点是,从理论上讲,可以让不同系统上的不同用户控制生产者和消费者的管理,队列充当缓冲。

到目前为止,您已经陷入了困境,看到了三个相关的asyncio调用用async和await定义的协程的示例。如果您没有完全理解,或者只是想深入了解Python中现代协程是如何产生的机制,那么您可以从下一节从头开始。

Async IO在生成器中的源头

在前面,您看到了一个老式的基于生成器的协同程序示例,它已经被更显式的本机协同程序淘汰了。这个例子值得重新展示,只需要稍微调整一下:

import asyncio@asyncio.coroutinedef py34_coro():    """Generator-based coroutine"""    # No need to build these yourself, but be aware of what they are    s = yield from stuff()    return s    async def py35_coro():    """Native coroutine, modern syntax"""    s = await stuff()    return s    async def stuff():    return 0x10, 0x20, 0x30

作为实验,如果单独调用py34_coro()或py35_coro(),而不调用await,或不调用asyncio.run()或其他asyncio 声明的函数,会发生什么? 孤立地调用协程将返回一个协程对象:

>>> py35_coro()<coroutine object py35_coro at 0x10126dcc8>

表面上看没什么意思。单独调用协程的结果是一个可等待的协程对象。

测试时间到了:Python的其他特性是这样的吗?(Python的哪些特性在单独调用时实际上没有“做很多事情”?)

希望您正在考虑生成器作为这个问题的答案,因为协同程序在底层是增强的生成器。这方面的行为类似:

>>> def gen():...     yield 0x10, 0x20, 0x30...>>> g = gen()>>> g  # Nothing much happens - need to iterate with `.__next__()`<generator object gen at 0x1012705e8>>>> next(g)(16, 32, 48)

生成器函数是async IO的基础(不管你是用async def还是用旧的@asyncio来声明协程)。协同程序包装)。从技术上讲,await更类似于yield from而不是yield。(但是请记住,yield from x()只是替换x()中i的语法糖:yield i。)

生成器与Async IO相关的一个关键特性是,它们可以有效地随意停止和重启。例如,您可以停止对生成器对象的迭代,然后稍后对其余值恢复迭代。当生成器函数达到yield时,它会产生该值,但随后它会处于闲置状态,直到被告知产生后续值。

这可以通过一个例子来具体化:

>>> from itertools import cycle>>> def endless():...     """Yields 9, 8, 7, 6, 9, 8, 7, 6, ... forever"""...     yield from cycle((9, 8, 7, 6))>>> e = endless()>>> total = 0>>> for i in e:...     if total < 30:...         print(i, end=" ")...         total += i...     else:...         print()...         # Pause execution. We can resume later....         break9 8 7 6 9 8 7 6 9 8 7 6 9 8>>> # Resume>>> next(e), next(e), next(e)(6, 9, 8)

await关键字的行为类似,它标记一个断点,在这个断点,协程将挂起自己,并让其他协程工作。在本例中,“挂起”意味着暂时放弃控制但未完全退出或完成的协程。请记住,yield,以及延伸的yield from和await标志着生成器执行中的一个断点。

这是函数和生成器之间的基本区别。函数是全或无的。一旦它开始,它不会停止,直到它遇到一个返回,然后把那个值推给调用者(调用它的函数)。另一方面,调度器每次达到限制时就会暂停,不再继续前进。它不仅可以将这个值推入调用堆栈,而且可以在调用next()来恢复它时保留它的局部变量。

生成器还有一个鲜为人知的特性也很重要。还可以通过生成器的.send()方法将值发送到生成器。这允许生成器(和协程)相互调用(等待)而不阻塞。我不会深入讨论这个特性的具体细节,因为它主要关系到协同协程在幕后的实现,但是您不应该真的需要自己直接使用它。

如果您有兴趣深入了解,可以从PEP 342开始,在这里正式介绍了协程。Brett Cannon的《How the Heck Does Async-Await Work in Python》也是很好的阅读材料,PYMOTW关于asyncio的文章也是。最后,还有David Beazley的《关于协同程序和并发性的好奇课程》,它深入研究了协同程序运行的机制。

让我们试着把上面的所有文章都压缩成几句话:这些协同程序实际上是通过一种特别非常规的机制运行的。它们的结果是在调用.send()方法时抛出的异常对象的属性。所有这些都有一些不可靠的细节,但它可能不会帮助您在实践中使用这部分语言,所以我们现在继续。

下面是关于协程作为生成器的一些要点:

协程是利用生成器方法特性的重新使用的生成器。

旧的基于生成器的协程使用yield from来等待协程结果。原生协程中的现代Python语法简单地用await代替yield from,作为等待协程结果的方法。await类似于yield from,这样想通常会有所帮助。

await的使用是一个标记断点的信号。它允许协程暂时暂停执行,并允许程序稍后返回。

其他特性:async for和async生成器+推导式

除了普通的async/await之外,Python还支持async for在异步迭代器上进行迭代。异步迭代器的目的是让它能够在迭代时的每个阶段调用异步代码。

这个概念的自然扩展是异步生成器。回想一下,您可以在本机协程中使用await、return或yield。Python 3.6(通过PEP 525)在协程中使用yield成为可能,它引入了异步生成器,目的是允许await和yield在同一个协程函数体中使用:

>>> async def mygen(u: int = 10):...     """Yield powers of 2."""...     i = 0...     while i < u:...         yield 2 ** i...         i += 1...         await asyncio.sleep(0.1)

最后但并非最不重要的是,Python支持使用async for实现异步理解。和它的同步模式一样,让你用起来很自然:

>>> async def main():...     # This does *not* introduce concurrent execution...     # It is meant to show syntax only...     g = [i async for i in mygen()]...     f = [j async for j in mygen() if not (j // 3 % 5)]...     return g, f...>>> g, f = asyncio.run(main())>>> g[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]>>> f[1, 2, 16, 32, 256, 512]

这是一个重要的区别:异步生成器和推导式都不能使迭代并发。它们所做的只是提供它们的同步对等体的观感,但能够让所讨论的循环放弃对事件循环的控制,以便运行其他协程。

换句话说,异步迭代器和异步生成器并不是设计为在序列或迭代器上并发映射某个函数。它们的设计只是为了让封闭的协程允许其他任务轮流执行。async for和async with语句只在使用普通for或with会“破坏”协程中await的性质的情况下才需要。异步性和并发性之间的区别是需要把握的关键。

事件循环和asyncio.run()

您可以将事件循环视为类似于while True循环的东西,它监视协程,获取关于空闲内容的反馈,并查看可以在此期间执行的内容。它能够在空闲的协程所等待的东西可用时唤醒该协程。

到目前为止,事件循环的整个管理都是由一个函数调用隐式处理的:

asyncio.run(main())  # Python 3.7+

在Python 3.7中引入的asyncio.run()负责获取事件循环,运行任务直到它们被标记为完成,然后关闭事件循环。

还有一种更冗长的管理asyncio事件循环的方法,即get_event_loop()。典型的模式是这样的:

loop = asyncio.get_event_loop()try:    loop.run_until_complete(main())finally:    loop.close()

您可能会看到loop.get_event_loop()在较早的示例中出现,但除非您有对事件循环管理进行微调的特定需要,否则对于大多数程序来说asyncio.run()应该足够了。

如果确实需要与Python程序中的事件循环进行交互,loop是一个很好的老式Python对象,它支持使用loop.is_running()和loop.is_closed()进行内省。如果需要获得更精细的控制,可以对其进行操作,例如通过将循环作为参数传递来调度回调。

更重要的是要理解事件循环机制的深层含义。下面是关于事件循环值得强调的几点。

#1:协程在被绑定到事件循环之前自己不会做太多事情。

在之前关于生成器的解释中,您已经看到了这一点,但有必要重申一下。如果你有一个主协程在等待其他协程,简单地单独调用它几乎没有效果:

>>> import asyncio>>> async def main():...     print("Hello ...")...     await asyncio.sleep(1)...     print("World!")>>> routine = main()>>> routine<coroutine object main at 0x1027a6150>

记住使用asyncio.run()在事件循环中调度main()协程(future对象)来强制执行:

>>> asyncio.run(routine)Hello ...World!

(其他协程可以用await执行。通常在asyncio.run()中只包装main(),然后从那里调用带有await的链式协程。)

#2:默认情况下,Async IO事件循环运行在单个线程和单个CPU核心上。通常,在一个CPU核心中运行一个单线程事件循环就足够了。

# 3。事件循环是可插入的。也就是说,如果您真的想,您可以编写自己的事件循环实现,并让它运行相同的任务。这在uvloop包中得到了很好的演示,它是Cython中事件循环的实现。

这就是术语“可插入事件循环”的含义:您可以使用事件循环的任何工作实现,与协程本身的结构无关。asyncio包本身附带了两个不同的事件循环实现,默认的是基于selector模块。(第二个实现仅针对Windows构建。)

完整程序:异步请求

你已经走到这一步了,现在是时候开始有趣而轻松的部分了。在本节中,您将使用aiohttp构建一个web抓取URL收集器areq.py, aiohttp是一个非常快速的异步HTTP客户端/服务器框架。(我们只需要客户部分。)这种工具可以用来绘制站点集群之间的连接图,使链接形成一个有向图。

注意:你可能想知道为什么Python的请求包不兼容Async IO。request构建在urllib3之上,而urllib3又使用Python的http和socket模块。

缺省情况下,套接字操作是阻塞的。这意味着Python不喜欢await requests.get(url),因为.get()是不可等待的。相比之下,aiohttp中的几乎所有内容都是可等待的协程,例如session.request()和response.text()。在其他方面,它是一个很棒的包,但在异步代码中使用请求会给自己带来不利影响。

高级程序结构看起来像这样:

从本地文件URLs .txt中读取url序列。为url发送GET请求并解码结果内容。如果此操作失败,则在此停止以获取URL。在响应的HTML中搜索href标记中的url。将结果写入foundals .txt。尽可能异步和并发地完成以上所有工作。(对请求使用aiohttp,对文件追加使用aiofiles。这是两个非常适合Async IO模型的主要IO示例。)

以下是urls.txt的内容。它并不大,包含了大部分高流量的网站:

$ cat urls.txt

列表中的第二个URL应该返回一个404响应,您需要优雅地处理它。如果您正在运行该程序的扩展版本,那么您可能需要处理比这更棘手的问题,比如服务器断开连接和无休止的重定向。

请求本身应该使用单个会话发出,以利用会话的内部连接池的重用。

让我们来看看完整的程序。我们将一步一步地讲完:

#!/usr/bin/env python3# areq.py"""Asynchronously get links embedded in multiple pages' HMTL."""import asyncioimport loggingimport reimport sysfrom typing import IOimport urllib.errorimport urllib.parseimport aiofilesimport aiohttpfrom aiohttp import ClientSessionlogging.basicConfig(    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",    level=logging.DEBUG,    datefmt="%H:%M:%S",    stream=sys.stderr,)logger = logging.getLogger("areq")logging.getLogger("chardet.charsetprober").disabled = TrueHREF_RE = re.compile(r'href="(.*?)"')async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:    """GET request wrapper to fetch page HTML.        kwargs are passed to `session.request()`.    """        resp = await session.request(method="GET", url=url, **kwargs)    resp.raise_for_status()    logger.info("Got response [%s] for URL: %s", resp.status, url)    html = await resp.text()    return html    async def parse(url: str, session: ClientSession, **kwargs) -> set:    """Find HREFs in the HTML of `url`."""    found = set()    try:        html = await fetch_html(url=url, session=session, **kwargs)    except (        aiohttp.ClientError,        aiohttp.http_exceptions.HttpProcessingError,    ) as e:        logger.error(            "aiohttp exception for %s [%s]: %s",            url,            getattr(e, "status", None),            getattr(e, "message", None),        )        return found    except Exception as e:        logger.exception(            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})        )        return found    else:        for link in HREF_RE.findall(html):            try:                abslink = urllib.parse.urljoin(url, link)            except (urllib.error.URLError, ValueError):                logger.exception("Error parsing URL: %s", link)                pass            else:                found.add(abslink)        logger.info("Found %d links for %s", len(found), url)        return found        async def write_one(file: IO, url: str, **kwargs) -> None:    """Write the found HREFs from `url` to `file`."""    res = await parse(url=url, **kwargs)    if not res:        return None    async with aiofiles.open(file, "a") as f:        for p in res:            await f.write(f"{url}\t{p}\n")        logger.info("Wrote results for source URL: %s", url)        async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:    """Crawl & write concurrently to `file` for multiple `urls`."""    async with ClientSession() as session:        tasks = []        for url in urls:            tasks.append(                write_one(file=file, url=url, session=session, **kwargs)            )        await asyncio.gather(*tasks)        if __name__ == "__main__":    import pathlib    import sys        assert sys.version_info >= (3, 7), "Script requires Python 3.7+."    here = pathlib.Path(__file__).parent        with open(here.joinpath("urls.txt")) as infile:        urls = set(map(str.strip, infile))            outpath = here.joinpath("foundurls.txt")    with open(outpath, "w") as outfile:        outfile.write("source_url\tparsed_url\n")            asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

这个脚本比我们最初的玩具程序要长,所以让我们将其分解。

常量HREF_RE是一个正则表达式,用于提取我们最终要搜索的内容,即HTML中的href标签:

>>> HREF_RE.search('Go to <a href=";>Real Python</a>')<re.Match object; span=(15, 45), match='href=";'>

协程fetch_html()是一个围绕GET请求的包装器,用于发出请求并解码结果页面HTML。它发出请求,等待响应,并在非200状态的情况下立即抛出:

resp = await session.request(method="GET", url=url, **kwargs)resp.raise_for_status()

如果状态正常,fetch_html()返回页面HTML(一个str)。值得注意的是,在这个函数中没有进行异常处理。逻辑是将异常传播给调用者,并在那里处理它:

html = await resp.text()

我们等待session.request()和respect .text(),因为它们是可等待的协程。否则,请求/响应循环将是应用程序的长尾、耗时的部分,但使用Async IO, fetch_html()可以让事件循环工作在其他现成的作业上,比如解析和写入已经获取的url。

协程链中的下一个是parse(),它为给定的URL等待fetch_html(),然后从该页面的HTML中提取所有href标记,确保每个标记都是有效的,并将其格式化为绝对路径。

不可否认,parse()的第二部分是阻塞的,但它由一个快速的正则表达式匹配组成,并确保所发现的链接成为绝对路径。

在这种情况下,同步代码应该是快速且不显眼的。但是请记住,给定协程中的任何一行都将阻塞其他协程,除非该行使用yield、await或return。如果解析是一个更密集的进程,您可能会考虑使用loop.run_in_executor()在它自己的进程中运行这一部分。

接下来,协程write()接受一个文件对象和一个URL,并等待parse()返回一组解析后的URL,通过使用aiofiles(异步文件IO的一个包)将每个URL与其源URL异步写入文件。

最后,bulk_crawl_and_write()充当脚本协同程序链的主要入口点。它使用单一会话,并为最终从urls.txt中读取的每个URL创建一个任务。

这里还有一些值得一提的地方:

默认的ClientSession有一个适配器,其打开的连接最多为100个。要改变这一点,将asyncio.connector.TCPConnector的实例传递给ClientSession。您还可以在每个主机的基础上指定限制。您可以为整个会话和单个请求指定最大超时。该脚本还使用async with,它与异步上下文管理器一起工作。由于从同步到异步上下文管理器的转换相当简单,所以我没有花一整节的时间来讨论这个概念。后者必须定义.__aenter__()和.__aexit__()而不是.__exit__()和.__enter__()。如您所料,async with只能在用async def声明的协程函数中使用。

如果你想了解更多,可以在GitHub上找到本教程的配套文件,其中还附带了注释和文档字符串。

下面是执行的全部荣耀,areq.py在一秒钟内获取、解析并保存9个url的结果:

$ python3 areq.py21:33:22 DEBUG:asyncio: Using selector: KqueueSelector21:33:22 INFO:areq: Got response [200] for URL:  INFO:areq: Found 115 links for  INFO:areq: Got response [200] for URL:  INFO:areq: Got response [200] for URL:  INFO:areq: Got response [200] for URL:  ERROR:areq: aiohttp exception for  [404]: Not Found21:33:22 INFO:areq: Found 120 links for  INFO:areq: Found 143 links for  INFO:areq: Wrote results for source URL:  INFO:areq: Found 0 links for  INFO:areq: Got response [200] for URL:  INFO:areq: Wrote results for source URL:  INFO:areq: Wrote results for source URL:  INFO:areq: Got response [200] for URL:  INFO:areq: Found 3 links for  INFO:areq: Wrote results for source URL:  INFO:areq: Found 36 links for  INFO:areq: Got response [200] for URL:  INFO:areq: Found 23 links for  INFO:areq: Wrote results for source URL:  INFO:areq: Wrote results for source URL: 

这并不太寒酸!作为完整性检查,您可以检查输出的行数。就我而言,是626,但请记住,这个数字可能会有所变化:

$ wc -l foundurls.txt     626 foundurls.txt$ head -n 3 foundurls.txtsource_url  parsed_url  

下一步:如果你想提高赌注,让这个网络爬虫递归。您可以使用aio-redis来跟踪哪些url在树中被爬行,以避免两次请求它们,并使用Python的networkx库连接链接。

记住要友善。向一个毫无防备的小网站并发发送1000个请求是非常非常非常非常糟糕的。有一些方法可以限制在一个批处理中发出的并发请求的数量,例如使用asyncio的信号量对象或使用像这样的模式。如果您不注意这个警告,您可能会得到大量的TimeoutError异常,最终只会损害您自己的程序。

上下文Async IO

现在您已经看到了大量的代码,让我们后退一分钟,考虑一下何时Async IO是一个理想的选择,以及如何进行比较以得出结论或选择不同的并发模型。

为什么Async IO是正确的选择?

本教程不提供关于Async IO、线程和多进程的扩展论述。然而,了解Async IO何时可能是这三种方法中的最佳候选是很有用的。

Async IO与多进程之间的战争根本就不是一场真正的战争。事实上,它们可以协同使用。如果有多个相当统一的cpu绑定任务(一个很好的例子是在像scikit-learn或keras这样的库中进行网格搜索),那么多进程应该是一个不错的选择。

如果所有函数都使用阻塞调用,那么简单地将async放在每个函数之前是一个坏主意。(这实际上会降低你的代码性能。)但是正如前面所提到的,有些地方Async IO和多进程可以和谐共存。

Async IO和线程之间的竞争更直接一些。我在介绍中提到过“多线程处理是困难的”。整个故事是这样的,即使在线程似乎很容易实现的情况下,由于竞争条件和内存使用等原因,它仍然可能导致臭名昭著的无法跟踪的bug,比如死锁。

线程的可伸缩性也不如Async IO,因为线程是一种可用性有限的系统资源。在许多机器上创建数千个线程会失败,我不建议一开始就尝试多线程。但是创建数千个Async IO任务是完全可行的。

当你有多个IO绑定的任务时,Async IO就会排上用场,否则这些任务将被阻塞IO绑定的等待时间所支配,例如:

网络IO,无论您的程序是服务器端还是客户端无服务器设计,如点对点、多用户网络,如群聊天室读/写操作,您希望模拟“阅后即焚”风格,不用担心对正在读写的对象锁的控制

不使用它的最大原因是await只支持特定的对象集,这些对象集定义特定的方法集。如果您想对某个DBMS执行异步读操作,您不仅需要为该DBMS找到一个Python安装包,而且需要找到一个支持async/await语法的安装包。包含同步调用的协程会阻止其他协程和任务的运行。

有关使用async/await的库的简短列表,请参阅本教程末尾的列表。

这是Async IO,但哪一个?

本教程重点介绍async IO,即async/await语法,以及使用asyncio进行事件循环管理和指定任务。asyncio当然不是唯一的async IO库。

虽然使用了不同的api和不同的方法,但有几个著名的替代方案可以完成asyncio所做的工作,它们是curio和trio。我个人认为,如果您正在构建一个中等大小的、简单的程序,那么仅使用asyncio就足够了,而且可以理解,并且可以避免在Python标准库之外添加另一个大型依赖项。

但无论如何,看看curio和trio,你可能会发现它们以一种对用户更直观的方式完成了相同的任务。这里介绍的许多与包无关的概念也应该适用于可选的Async IO包。

零碎的东西

在接下来的几节中,您将介绍asyncio和async/await的一些杂项内容,到目前为止,这些内容还不能很好地融入本教程,但对于构建和理解完整的程序仍然很重要。

其他顶级asyncio函数

除了asyncio.run()之外,您还看到了其他一些包级函数,如asyncio.create_task()和asyncio.gather()。

你可以使用create_task()来调度协程对象的执行,后面跟着asyncio.run():

>>> import asyncio>>> async def coro(seq) -> list:...     """'IO' wait time is proportional to the max element."""...     await asyncio.sleep(max(seq))...     return list(reversed(seq))...>>> async def main():...     # This is a bit redundant in the case of one task...     # We could use `await coro([3, 2, 1])` on its own...     t = asyncio.create_task(coro([3, 2, 1]))  # Python 3.7+...     await t...     print(f't: type {type(t)}')...     print(f't done: {t.done()}')...>>> t = asyncio.run(main())t: type <class '_asyncio.Task'>t done: True

这个模式有一个微妙之处:如果不在main()中等待t,它可能会在main()本身发出完成信号之前完成。因为asyncio.run(main())调用loop.run_until_complete(main()),事件循环只关心(没有等待)main()是否完成,而不关心在main()中创建的任务是否完成。如果没有await t,循环的其他任务将被取消,可能在它们完成之前。如果需要获取当前挂起的任务列表,可以使用asyncio.Task.all_tasks()。

注意:asyncio.create_task()是在Python 3.7中引入的。在Python 3.6或更低版本中,使用asyncio.ensure_future()来代替create_task()。

另外,还有asyncio.gather()。虽然gather()没有做任何特别的事情,但它的目的是将一组协程(未来)整齐地放入一个未来中。结果,它返回一个单独的future对象,并且,如果等待asyncio.gather()并指定多个任务或协程,则是在等待它们全部完成。(这在一定程度上与前面示例中的queue.join()类似。)gather()的结果将是跨输入的结果列表:

>>> import time>>> async def main():...     t = asyncio.create_task(coro([3, 2, 1]))...     t2 = asyncio.create_task(coro([10, 5, 0]))  # Python 3.7+...     print('Start:', time.strftime('%X'))...     a = await asyncio.gather(t, t2)...     print('End:', time.strftime('%X'))  # Should be 10 seconds...     print(f'Both tasks done: {all((t.done(), t2.done()))}')...     return a...>>> a = asyncio.run(main())Start: 16:20:11End: 16:20:21Both tasks done: True>>> a[[1, 2, 3], [0, 5, 10]]

您可能注意到gather()会等待您传递给它的Futures或协程的整个结果集。或者,您可以循环使用asyncio.as_completed(),以在任务完成时按完成顺序获取任务。该函数返回一个迭代器,在任务完成时生成任务。下面,coro([3,2,1])的结果将在coro([10,5,0])完成之前可用,这与gather()不同:

>>> async def main():...     t = asyncio.create_task(coro([3, 2, 1]))...     t2 = asyncio.create_task(coro([10, 5, 0]))...     print('Start:', time.strftime('%X'))...     for res in asyncio.as_completed((t, t2)):...         compl = await res...         print(f'res: {compl} completed at {time.strftime("%X")}')...     print('End:', time.strftime('%X'))...     print(f'Both tasks done: {all((t.done(), t2.done()))}')...>>> a = asyncio.run(main())Start: 09:49:07res: [1, 2, 3] completed at 09:49:10res: [0, 5, 10] completed at 09:49:17End: 09:49:17Both tasks done: True

最后,您还可能看到asyncio.ensure_future()。您应该很少需要它,因为它是一个较低级别的管道API,在很大程度上被稍后引入的create_task()所取代。

await的优先级

虽然它们的行为有点类似,但await关键字的优先级明显高于yield。这意味着,由于绑定更紧密,在很多情况下,yield from语句中需要括号,而在类似的await语句中则不需要。有关更多信息,请参见PEP 492中的await表达式示例。

结论

您现在可以使用async/await和基于它构建的库了。以下是你所报道的内容的概述:

Async IO是一种与语言无关的模型,也是一种通过让协程相互间接通信来实现并发性的方法

Python新的async和await关键字的细节,用于标记和定义协程

asyncio,提供运行和管理协程的API的Python包

资源Python版本细节

Python中的Async IO发展迅速,很难跟踪什么时候发生了什么。下面是与asyncio相关的Python小版本更改和介绍列表:

3.3: yield from表达式允许生成器委托。

3.4: asyncio是在Python标准库中引入的,带有临时API状态。

3.5: async和await成为Python语法的一部分,用于表示协程并等待。它们还不是保留的关键字。(您仍然可以定义名为async和await的函数或变量。)

3.6:介绍了异步生成器和异步推导。asyncio的API被声明为稳定的而不是临时的。

3.7: async和await成为保留关键字。(它们不能用作标识符。)它们用于替换asyncio.coroutine()装饰器。在asyncio包中引入了asyncio.run()以及其他一些特性。

如果您希望安全(并能够使用asyncio.run()),请使用Python 3.7或更高版本以获得完整的特性集。

标签: #python中sleep怎么用 #python异步回调原理 #python 每分钟运行一次 #python等待一秒代码 #python 微秒