龙空技术网

在Python中使用Asyncio系统(3-5)​异步的语境管理器

李保银 307

前言:

眼前朋友们对“pythoniterkeys”大约比较看重,姐妹们都想要知道一些“pythoniterkeys”的相关知识。那么小编同时在网摘上汇集了一些有关“pythoniterkeys””的相关资讯,希望我们能喜欢,看官们一起来学习一下吧!

异步的语境管理器:async with

语境管理器中对协程的支持非常方便。这是有意义的,因为许多情况需要在定义良好的范围内打开和关闭网络资源(比如连接)。

理解async with的关键是认识到语境管理器的操作是由方法调用驱动的,然后考虑:如果这些方法是协程函数会怎样?实际上,这正是它的工作方式,如示例3-20所示。

示例3-20 异步的语境管理器

class Connection:    def __init__(self, host, port):        self.host = host        self.port = port    async def __aenter__(self):          self.conn = await get_conn(self.host, self.port)        return conn    async def __aexit__(self, exc_type, exc, tb):          await self.conn.close()    async with Connection('localhost', 9001) as conn:        <do stuff with conn>
(L5) 这里使用的不是用于同步语境管理器的__enter__()特殊方法,而是新的__aenter__()特殊方法。这个特殊的方法必须是一个async def方法。(L8) 同样地,使用__aexit__(),而不是__exit__()。这些参数与__exit__()的参数相同,当语境管理器的主体中引发异常时被调用。

注意:只有你在程序中使用了asyncio才需要使用异步的语境管理器,这并不意味着你的所有语境管理器都必须像异步这么用。它们只有在你需要await进入和退出方法时才有用。如果没有阻塞的I/O代码,就使用常规的语境管理器就可以了。

但是,我不太喜欢使用这种显式风格的语境管理器,因为在标准库的contextlib模块中有一个很棒的@contextmanager装饰器。正如你可能猜到的,异步版本的@asynccontextmanager装饰器也有,它使创建简单的异步语境管理器变得更加容易。

contextlib实现方式

这种方法类似于contextlib标准库中的@contextmanager装饰器。例3-21首先介绍了阻塞方式。

示例3-21 阻塞方式

from contextlib import contextmanager@contextmanager  def web_page(url):    data = download_webpage(url)      yield data    update_stats(url)  with web_page('google.com') as data:      process(data)
(L3) @contextmanager装饰器把生成器函数转换为语境管理器管理器。(L5) 我为这个示例编写的这个函数调用看起来很像想要使用网络接口的那种东西,它比cpu-bound的代码慢很多个数量级。这个语境管理器必须在专用线程中使用;否则,整个程序将在等待数据时暂停。(L7) 假设我们每次处理来自URL的数据时都会更新一些统计数据,比如URL被下载的次数。从并发的角度来看,我们需要知道这个函数是否涉及内部I/O,例如通过网络向数据库写入。如果是,update_stats()也是一个阻塞调用。(L9) 使用了语境管理器。特别注意一下网络调用download_webpage()是如何隐藏在语境管理器中的结构中的。(L10) 这个函数调用process()也可能阻塞。我们必须看看这个函数是做什么的,因为阻塞和非阻塞之间的区别并不明确。它可能是:非阻塞(快速并且CPU-bound)轻度阻塞(快速并且I/O-bound,可能类似于快速磁盘访问而不是网络I/O)阻塞(慢并且I/O-bound)严重阻塞(慢并且CPU-bound)为了简单起见,我们假设对process()的调用是一个快速的、cpu绑定的操作,因此是非阻塞的。

示例3-22是完全相同的示例,但使用了Python 3.7中引入的新的异步感知helper。

示例3-22 非阻塞方式

from contextlib import asynccontextmanager@asynccontextmanager  async def web_page(url):      data = await download_webpage(url)      yield data      await update_stats(url)  async with web_page('google.com') as data:      process(data)
(L3) 新的@asynccontextmanager装饰器跟以前完全相同的使用方式。(L4) 不同的是,它要求用async def声明修饰过的生成器函数。(L5) 与前面一样,我们在语境管理器主体使用数据之前从URL获取数据。我已经添加了await关键字,它告诉我们这个协程将允许事件循环在等待网络调用完成时运行其他任务。注意,我们不能简单地将await关键字附加到任何东西上。这个更改的前提是我们也能够修改download_webpage()函数本身,并将其转换为与await关键字兼容的协程。当不可能修改函数时,就需要采用不同的方法;我们将在下一个示例中讨论它。(L6) 与前面一样,语境管理器的主体可以使用数据。我试图保持代码简单,因此我省略了通常应该编写的try/finally处理程序,该处理程序用于处理调用者主体中引发的异常。注意yield的存在将函数变成生成器函数;第1点中附加的async def关键字使它成为一个异步生成器函数。当调用它时,它将返回一个异步生成器。inspect模块有两个函数可以对此进行测试:分别是isasyncgenfunction()和isasyncgen()。(L7) 这里,假设我们还修改了update_stats()函数中的代码,允许它生成协程。然后我们可以使用await关键字,它允许在等待I/O绑定的工作完成时将语境切换到事件循环。(L9) 语境管理器本身的使用需要做另一个改动:我们需要使用async with而不是普通的with

希望这个例子展示了新的@asynccontextmanager与原来的@contextmanager装饰器完全类似。

在编号3和5中,我说过有必要修改一些函数来返回协程;它们是download_webpage()和update_stats()。当然一般情况下这并不是那么容易做到的,因为需要在socket级别添加异步支持。前面示例的重点只是展示新的@asynccontextmanager装饰器,而不是展示如何将阻塞函数转换为非阻塞函数。更常见的情况是,你想在程序中使用一个阻塞函数,但无法修改该函数中的代码。

这种情况一般会发生在使用第三方库中,一个很好的例子就是requests库,这个库从头到尾都是使用阻塞调用。如果你不能修改正在调用的代码,还有另一种方法。这是一个很方便的方法,可以向你展示如何使用单独的executor来完成这一任务,如示例3-23所示。

示例3-23 非阻塞方式,并且把阻塞函数放到单独的执行器中

from contextlib import asynccontextmanager@asynccontextmanagerasync def web_page(url):      loop = asyncio.get_event_loop()    data = await loop.run_in_executor(        None, download_webpage, url)      yield data    await loop.run_in_executor(None, update_stats, url) async with web_page('google.com') as data:    process(data)
(L4) 就这个例子来说,假如我们无法修改这两个阻塞调用的代码,download_webpage()和update_stats();也就是说,我们不能把这两个函数修改成协程函数。这当然是很麻烦的,因为基于事件编程最严重的缺点是破坏了在任何情况下都不能阻止事件循环处理事件的规则。为了解决这个问题,我们将使用一个执行器在一个单独的线程中运行阻塞调用。执行器作为事件循环本身的属性提供给我们。(L7) 执行器的表达式是AbstractEventLoop.run_in_executor(executor, func, *args)。如果您想使用默认执行器(即ThreadPoolExecutor),则必须将None作为执行器参数的值传递。(是的,这个特别烦人。每次使用这个方法调用时,我都忍不住想知道为什么不首选更常见的executor=None作为关键字参数的习惯用法。)(L11) 和download_webpage()的调用一样,我们还在执行器中运行对update_stats()的另一个阻塞调用。注意,必须在前面使用await关键字。如果你忘记了,异步生成器(即异步语境文管理器)的执行不会等到调用完成后才继续。

很可能在许多基于异步的代码库中会大量使用异步语境管理器,所以很深刻地了解它们是非常重要的。你可以在官方文档中

了解更多关于这个新的@asynccontextmanager装饰器的信息。

异步迭代器:async for

接下来是for循环的异步版本。如果你首先认识到普通迭代(就像许多其他语言特性一样)是通过使用特殊方法实现的,通过名称中的双下划线即可识别,那么就很容易理解它是如何工作的。作为参考,示例3-24展示了如何通过__iter__() 和__next__()方法定义标准(非异步)迭代器。

示例3-24 传统的非异步迭代器

>>> class A:  ...     def __iter__(self):     ...         self.x = 0    ...         return self    ...     def __next__(self):    ...         if self.x > 2:  ...             raise StopIteration    ...         else:  ...             self.x += 1  ...             return self.x    >>> for i in A():  ...     print(i)  1  2  3
(L2) 迭代器必须有一个__iter__()方法。(L3) 把一些变量的状态初始化为初始状态。(L4) __iter__()特殊方法必须返回一个可迭代对象;例如,实现__next__()特殊方法的对象。在本例中,它是同一个实例,因为A本身也实现了__next__()特殊方法。(L5) 定义了__next__()方法。迭代序列中的每一步都会调用这个函数,直到…(L7) …触发StopIteration。(L10) 生成每个迭代的返回值。

现在你会问:如果你把__next__()特殊方法声明为一个async def协程函数会发生什么?这将允许它等待某种I/O限制的操作——这几乎就是async for工作的方式,除了一些关于命名的小细节。PEP 492中的规范表明,要在异步迭代器上使用async for,异步迭代器本身需要做以下几件事:

你必须实现def __aiter__()方法的定义。(注意:不是使用async def!)__aiter__()必须返回一个实现async def __anext__()的对象。__anext__()必须为每次迭代返回一个值,并在完成时引发StopAsyncIteration。

让我们快速了解一下它是如何工作的。假设我们在Redis数据库中有一串键,我们想要遍历它们的数据,但我们只能根据需要获取数据。它的异步迭代器可能类似于示例3-25所示。

示例3-25 从Redis获取数据的异步迭代器

import asynciofrom aioredis import create_redisasync def main():      redis = await create_redis(('localhost', 6379))      keys = ['Americas', 'Africa', 'Europe', 'Asia']      async for value in OneAtATime(redis, keys):          await do_something_with(value)  class OneAtATime:    def __init__(self, redis, keys):          self.redis = redis        self.keys = keys    def __aiter__(self):          self.ikeys = iter(self.keys)        return self    async def __anext__(self):          try:            k = next(self.ikeys)          except StopIteration:              raise StopAsyncIteration        value = await redis.get(k)          return valueasyncio.run(main())
(L4) main()函数:我们在代码示例的底部使用asyncio.run()运行它。(L5) 使用aioredis中高级接口方法获取一个redis链接。(L6) 假设每个与这些键相关联的值都相当大,并且存储在Redis实例中。(L8) 我们使用async for:的关键点是迭代能够在等待下一个数据到达时挂起自己。(L9) 为了完整起见,假设我们还对获取的值执行一些I/O-bound的动作(比如,也许会是一些简单的数据转换),然后它被发送到另一个目的端。(L12) 这个类的初始化器很普遍:存储Redis连接实例和要迭代的键列表。(L15) 就像在前面的__iter__()代码示例中一样,我们使用__aiter__()为迭代设置内容。在键上创建一个普通的迭代器self.ikeys,因为OneAtATime也实现了__anext__()协程方法。(L18) 注意,__anext__()方法是用async def声明的,而__aiter__()方法则仅用def声明。(L20) 对于每个键,我们从Redis获取值:self.ikeys是一个常规迭代器,因此我们只需要使用next()来挪动它们。(L21) 当self.ikeys耗尽的时候,我们处理StopIteration并简单地将其转换为StopAsyncIteration!这就是在异步迭代器中发出停止信号的方法。(L24) 最后我们可以从Redis获得跟这个键相关联的数据。我们可以等待数据,这意味着当我们等待网络I/O时,其他代码可以在事件循环上运行。

希望这个例子说得够清晰:async for提供了保持简单便利地操作for循环的能力,即使在迭代本身执行I/O操作的数据上进行迭代也是如此。这样做的好处是,你可以用一个循环处理大量的数据,因为你必须以很小的批次处理每个数据块。

更简单的带有异步生成器的代码

异步生成器是包含yield关键字的Async def函数。异步生成器会让代码更简单。

但是,如果你有把生成器当作协程使用的经验,例如使用Twisted框架或Tornado框架,甚至有使用Python 3.4的asyncio中的yield from的经验,那么它们的概念可能会令人困惑。因此,在我们继续以下的内容之前,你最好能说服自己:

协程和生成器是完全不同的概念。异步生成器的行为很像普通生成器对于处理可迭代对象,使用\mintinline{python}{async for}用于遍历异步生成器,而不是普通的\mintinline{python}{for}用于遍历普通生成器。

如果我们使用异步生成器的话,那么上一节中用于演示与Redis交互的异步迭代器的示例会简单得多,如示例3-26所示。

示例3-26 更简单的使用异步生成器从Redis获取数据

import asynciofrom aioredis import create_redisasync def main():      redis = await create_redis(('localhost', 6379))    keys = ['Americas', 'Africa', 'Europe', 'Asia']    async for value in one_at_a_time(redis, keys):          await do_something_with(value)async def one_at_a_time(redis, keys):      for k in keys:        value = await redis.get(k)          yield value  asyncio.run(main())
(L4) main()函数和示例3-25中的完全一样。(L8) 好吧,还不全一样,应该是几乎完全一样:我把名称从驼峰命名法改成了下划线命名法。L11) 我们的函数现在也使用async def来声明,这样它就能成为一个协程函数了,由于这个函数还包含yield关键字,因此我们将其称为异步生成器函数。(L13) 我们不需要做之前在self.ikeys例子中那些复杂的事情。在这里,我们只需要直接循环键并获得值…(L14) …然后把它交给调用者,就像普通生成器一样。

如果你不常接触这些,它可能看起来很复杂,但我建议你自己在几个玩具示例中尝试一下。很快就会觉得很自然。异步生成器很可能在基于asyncio的代码库中变得流行,因为它们带来了与普通生成器相同的好处:使代码更短、更简单。

异步的解析表达式

既然我们已经看到了Python如何支持异步迭代,那么下一个自然要问的问题就是它是否也适用于列表解析——答案是肯定的。PEP 530中引入了这种支持,我建议你自己看看PEP;它很简短而且很易读懂。示例3-27展示了典型的异步解析表达式是如何布局的。

示例3-27 更简单的使用异步生成器从Redis获取数据

>>> import asyncio>>>>>> async def doubler(n):...     for i in range(n):...         yield i, i * 2  ...         await asyncio.sleep(0.1)  ...>>> async def main():...     result = [x async for x in doubler(3)]  ...     print(result)...     result = {x: y async for x, y in doubler(3)}  ...     print(result)...     result = {x async for x in doubler(3)}  ...     print(result)...>>> asyncio.run(main())[(0, 0), (1, 2), (2, 4)]{0: 0, 1: 2, 2: 4}{(2, 4), (1, 2), (0, 0)}
(L3) doubler()是一个非常的简单的异步生成器:给定一个上限值,它将遍历一个简单范围,生成一个由该值及其两倍的值组成的元组。(L6) 休眠一会儿,强调这的确是一个异步函数。(L9) 异步列表解析:注意如何使用async for代替普通的for。这个区别与前节的“异步的迭代器: Async for”中的例子相同。(L11) 异步字典解析;所有常用的技巧都起作用了,比如将元组解压缩到x和y中,这样它们就可以满足字典解析语法。(L13) 异步集合解析的工作方式也完全符合你的预期。

你也可以在解析表达式中使用await,如PEP 530所述。这不足为奇;await coro是一个正常的表达式,可以在你期望的大多数地方使用。

是async for使解析表达式成为一个异步的解析表达式的,而不是因为await的存在才变成异步的解析表达式的。要正确在解析表达式中使用await,只需要在协程函数体中使用它。在同一个列表推导式中使用await和async for实际上是结合了两个独立的概念,但我们还是会在示例 3-28中这样做,以确保你熟悉async语言语法。

示例 3-28 把await和async for一起使用

>>> import asyncio>>>>>> async def f(x):  ...   await asyncio.sleep(0.1)...   return x + 100...>>> async def factory(n):  ...   for x in range(n):...     await asyncio.sleep(0.1)...     yield f, x  ...>>> async def main():...   results = [await f(x) async for f, x in factory(3)]  ...   print('results = ', results)...>>> asyncio.run(main())results =  [100, 101, 102]
(L3) 一个简单的协程函数:休眠一会儿,然后返回参数加100。(L7) 这是一个异步生成器,我们将在一个更深入的异步列表解析中调用它,使用async for来驱动迭代。(L10) async生成器将生成一个包含f和迭代变量x的元组。f返回值是一个协程函数,还不是协程。(L13) 最后是异步列表解析。这个例子已经被设计来演示一个同时使用async for和await的列表解析。让我们来分析依稀这列表解析表达式里面发生了什么。首先,factory(3)调用返回一个异步生成器,它必须由迭代开始遍历。因为它是异步生成器,所以不能只用for;而必须使用异步的async for。async生成器生成的值是一个由协程函数f和整数组成的元组。调用协程函数f()将生成一个协程,这个协程函数必须使用await进行计算。注意,在解析式中,await的使用与async for的使用没有任何关系:它们在完全不同的对象上执行完全不同的操作。

前几节完成了asyncio的最基本要素:协程,事件循环,Task和Future。现在要说一下同样常用的异步语境管理器,async with, async for和解析表达式等

标签: #pythoniterkeys