龙空技术网

Python异步并发编程入门指南

曹CHG 135

前言:

目前各位老铁们对“python的并发”可能比较注重,兄弟们都需要学习一些“python的并发”的相关内容。那么小编同时在网上汇集了一些有关“python的并发””的相关资讯,希望姐妹们能喜欢,咱们快快来学习一下吧!

Python是一门非常强大和灵活的编程语言,它支持多种编程范式,如面向对象、函数式、过程式等。Python也支持异步并发编程,这是一种同时处理多个任务的编程方式,可以提高程序的性能和效率,尤其是在面对IO密集型或网络密集型的任务时。

本文将介绍Python中的异步并发编程的基本概念、原理、模块、工具、设计模式、最佳实践、性能测试和评估等方面,帮助你快速掌握Python中的异步并发编程。本文还将给出一些使用Python中的异步并发编程的代码例子,让你更好地理解和应用Python中的异步并发编程。

什么是异步并发编程

异步并发编程是一种编程范式,用于同时处理多个任务,而不需要等待每个任务完成。异步并发编程可以提高程序的响应性和吞吐量,尤其是在面对IO密集型或网络密集型的任务时。

异步并发编程与同步编程、多线程、多进程的区别和联系:

同步编程是一种顺序执行任务的方式,每个任务必须等待前一个任务完成才能开始。多线程是一种在同一个进程中创建多个执行流的方式,每个线程可以执行不同的任务,但共享内存空间。多进程是一种在不同的进程中创建多个执行流的方式,每个进程可以执行不同的任务,但拥有独立的内存空间。异步并发编程是一种在单个或多个执行流中创建多个协作单元(coroutine)的方式,每个协作单元可以执行不同的任务,并在合适的时机切换。

为什么要使用异步并发编程

异步并发编程有以下优势:

可以提高程序的响应性,避免阻塞在等待IO或网络操作上,提升用户体验。可以提高程序的吞吐量,利用CPU和IO设备之间的空闲时间,提升资源利用率。可以简化程序的逻辑,使用async/await语法来表示异步操作,提升代码可读性。可以减少程序的开销,使用协作单元代替线程或进程,减少切换和通信的成本。

异步并发编程有以下局限性:

不适用于CPU密集型或计算密集型的任务,因为这类任务无法释放CPU资源给其他协作单元。不适用于依赖顺序或同步执行的任务,因为这类任务无法利用异步操作带来的优势。不适用于不支持异步操作或不兼容asyncio模块的库或框架,因为这类库或框架无法与协作单元协作。

Python中如何实现异步并发编程

Python中实现异步并发编程主要有以下模块和工具:

asyncio模块:这是Python标准库中提供的一个基于事件循环(event loop)和协作单元(coroutine)的异步IO框架。它提供了一系列的API和工具,用于创建、管理、执行和取消协作单元,以及处理网络、文件、进程、信号等异步操作。async/await语法:这是Python 3.5中引入的两个新的关键字,用于定义和调用协作单元。async关键字用于声明一个函数或方法是一个协作单元,await关键字用于暂停当前协作单元,并等待另一个协作单元或异步操作的结果。aiohttp模块:这是一个基于asyncio模块的异步HTTP客户端和服务器库。它提供了一系列的API和工具,用于创建、发送、接收和处理HTTP请求和响应,以及支持WebSocket、ClientSession、Web Application等高级功能。aiofiles模块:这是一个基于asyncio模块的异步文件操作库。它提供了一系列的API和工具,用于在协作单元中打开、读写、关闭文件,以及支持临时文件、os模块等高级功能。aiomysql模块:这是一个基于asyncio模块的异步MySQL数据库访问库。它提供了一系列的API和工具,用于在协作单元中连接、查询、插入、更新、删除数据库,以及支持连接池、游标、事务等高级功能。

Python中的异步并发编程有哪些常见的设计模式和最佳实践

Python中的异步并发编程有以下常见的设计模式和最佳实践:

使用asyncio.run()函数来运行主协程函数,它会创建一个新的事件循环,并在其上执行主协程函数,直到完成或出错。使用asyncio.create_task()函数或asyncio.gather()函数来创建和执行多个协作单元,并返回一个Task对象或Future对象,用于获取结果或异常。使用async with语句来管理异步上下文管理器(如文件对象、客户端会话对象等),它会自动调用其__aenter__()和__aexit__()方法,进行初始化和清理操作。使用async for语句来处理异步迭代器(如文件对象、响应对象等),它会自动调用其__aiter__()和__anext__()方法,进行迭代操作。使用try/except/finally语句来处理异常和清理资源,它可以捕获协作单元中抛出的异常,并执行必要的清理操作。使用asyncio.sleep()函数来模拟耗时操作或延迟执行,它可以让出CPU资源给其他协作单元,并在指定时间后恢复执行。使用asyncio.wait_for()函数或asyncio.wait()函数来设置超时或等待多个协作单元或异步操作完成,并返回结果或异常。使用asyncio.Queue()类来实现生产者-消费者模式,它可以在多个协作单元之间传递数据,并提供同步机制。

Python中的异步并发编程有哪些性能测试和评估的方法和工具

Python中的异步并发编程有以下性能测试和评估的方法和工具:

使用time模块或timeit模块来测量程序或代码片段的运行时间,比较不同方案的效率差异。使用cProfile模块或profile模块来分析程序或代码片段的运行情况,查看各个函数或方法的调用次数、运行时间、消耗资源等信息。使用concurrent.futures模块或multiprocessing模块来实现多线程或多进程版本的程序,比较与异步并发版本的程序的性能优劣。使用requests模块或urllib.request模块来实现同步版本的程序,比较与异步并发版本的程序的性能优劣。使用aiohttp.ClientSession()类来管理HTTP客户端会话,提高网络请求的效率和稳定性。使用aiofiles.tempfile模块来创建临时文件,避免占用磁盘空间和影响文件IO的性能。使用aiomysql.create_pool()函数来创建数据库连接池,避免频繁创建和关闭数据库连接,提高数据库访问的效率和稳定性。Python中的异步并发编程的代码例子

为了让你更好地理解和应用Python中的异步并发编程,本文将给出一些使用Python中的异步并发编程的代码例子,分别涉及以下几个方面:

使用asyncio模块来实现一个简单的协作单元,并使用asyncio.run()函数来运行它。使用asyncio模块和async/await语法来实现一个简单的异步网络请求,并使用asyncio.create_task()函数和asyncio.gather()函数来创建和执行多个协作单元。使用aiohttp模块和async with语句来实现一个简单的异步HTTP客户端,并使用aiohttp.ClientSession()类来管理HTTP客户端会话。使用aiofiles模块和async with语句来实现一个简单的异步文件操作,并使用aiofiles.tempfile模块来创建临时文件。使用aiomysql模块和async with语句来实现一个简单的异步数据库访问,并使用aiomysql.create_pool()函数来创建数据库连接池。使用asyncio模块来实现一个简单的协作单元

以下是一个使用asyncio模块来实现一个简单的协作单元的代码例子:

# 导入asyncio模块

import asyncio

# 定义一个协作单元函数,用async关键字修饰

async def hello():

# 打印开始信息

print("Hello, world!")

# 模拟耗时操作,使用await关键字暂停当前协作单元,并等待另一个协作单元或异步操作的结果

await asyncio.sleep(1)

# 打印结束信息

print("Hello, again!")

# 定义一个主函数

def main():

# 使用asyncio.run()函数来运行主协程函数,它会创建一个新的事件循环,并在其上执行主协程函数,直到完成或出错

asyncio.run(hello())

# 运行主函数

if __name__ == "__main__":

main()

运行结果如下:

Hello, world!

Hello, again!

可以看到,这个代码例子中,我们定义了一个协作单元函数hello(),它打印两条信息,并在中间暂停一秒。我们使用async关键字修饰这个函数,表示它是一个协作单元。我们在这个函数中使用await关键字暂停当前协作单元,并等待另一个协作单元或异步操作的结果。在这个例子中,我们等待了asyncio.sleep(1)这个异步操作的结果,它会让出CPU资源给其他协作单元,并在一秒后恢复执行。我们在主函数中使用asyncio.run()函数来运行这个协作单元函数,它会创建一个新的事件循环,并在其上执行这个协作单元函数,直到完成或出错。

使用asyncio模块和async/await语法来实现一个简单的异步网络请求

以下是一个使用asyncio模块和async/await语法来实现一个简单的异步网络请求的代码例子:

# 导入asyncio模块

import asyncio

# 导入socket模块

import socket

# 定义一个协作单元函数,用于获取指定域名的IP地址,并打印

# 用async关键字修饰

async def get_ip(domain):

# 打印开始信息

print(f"Getting IP for {domain}...")

# 使用socket模块的gethostbyname()函数来获取IP地址,这是一个阻塞的操作,所以需要使用run_in_executor()函数来在一个线程或进程池中执行它,并返回一个Future对象

loop = asyncio.get_running_loop()

ip = await loop.run_in_executor(None, socket.gethostbyname, domain)

# 打印结束信息

print(f"{domain} IP is {ip}")

# 定义一个主协程函数,用于创建和执行多个协作单元,并等待结果

async def main():

# 定义要查询的域名列表

domains = ["python.org", "realpython.com", "google.com"]

# 定义一个空列表,用于存储任务对象

tasks = []

# 遍历每个域名

for domain in domains:

# 创建一个任务对象,并添加到列表中

task = asyncio.create_task(get_ip(domain))

tasks.append(task)

# 等待所有任务完成,并获取结果

await asyncio.gather(*tasks)

# 运行主协程函数

if __name__ == "__main__":

asyncio.run(main())

运行结果如下:

Getting IP for python.org...

Getting IP for realpython.com...

Getting IP for google.com...

python.org IP is 45.55.99.72

realpython.com IP is 104.21.11.9

google.com IP is 142.250.67.46

可以看到,这个代码例子中,我们定义了一个协作单元函数get_ip(),它用于获取指定域名的IP地址,并打印。我们使用async关键字修饰这个函数,表示它是一个协作单元。我们在这个函数中使用await关键字暂停当前协作单元,并等待另一个协作单元或异步操作的结果。在这个例子中,我们等待了loop.run_in_executor()这个异步操作的结果,它会在一个线程或进程池中执行socket.gethostbyname()这个阻塞的操作,并返回一个Future对象。我们在主协程函数中使用asyncio.create_task()函数或asyncio.gather()函数来创建和执行多个协作单元,并返回一个Task对象或Future对象,用于获取结果或异常。我们在主函数中使用asyncio.run()函数来运行这个主协程函数,它会创建一个新的事件循环,并在其上执行这个主协程函数,直到完成或出错。

使用aiohttp模块来实现一个简单的异步HTTP客户端

以下是一个使用aiohttp模块来实现一个简单的异步HTTP客户端的代码例子:

# 爬取豆瓣电影Top250

import aiohttp

import asyncio

import time

# 定义一个异步函数,用于发送HTTP请求,并解析响应数据

async def fetch(session, url):

async with session.get(url) as response:

# 判断响应状态码是否为200

if response.status == 200:

# 获取响应内容

html = await response.text()

# 解析响应内容,提取电影信息

movies = parse(html)

# 打印电影信息

print(movies)

else:

# 打印错误信息

print(f"Error: {response.status}")

# 定义一个普通函数,用于解析HTML内容,并提取电影信息

def parse(html):

# 使用BeautifulSoup库进行解析

from bs4 import BeautifulSoup

soup = BeautifulSoup(html, "lxml")

# 获取电影列表

movie_list = soup.find("ol", class_="grid_view")

# 定义一个空列表,用于存储电影信息

movies = []

# 遍历每个电影项

for movie in movie_list.find_all("li"):

# 获取电影名称、评分、排名、链接等信息

name = movie.find("span", class_="title").text

score = movie.find("span", class_="rating_num").text

rank = movie.find("em").text

link = movie.find("a")["href"]

# 将电影信息组合成一个字典,并添加到列表中

movies.append({

"name": name,

"score": score,

"rank": rank,

"link": link

})

# 返回电影列表

return movies

# 定义一个主协程函数,用于创建客户端会话,并发发送多个请求

async def main():

# 创建一个客户端会话对象

async with aiohttp.ClientSession() as session:

# 定义一个空列表,用于存储任务对象

tasks = []

# 定义起始URL和页数

base_url = ";

page_num = 10

# 遍历每一页

for i in range(page_num):

# 拼接完整的URL

url = base_url + str(i * 25)

# 创建一个任务对象,并添加到列表中

task = asyncio.create_task(fetch(session, url))

tasks.append(task)

# 等待所有任务完成,并获取结果

results = await asyncio.gather(*tasks)

# 返回结果

return results

# 运行主协程函数,并记录运行时间

if __name__ == "__main__":

# 获取开始时间

start = time.time()

# 运行主协程函数

asyncio.run(main())

# 获取结束时间

end = time.time()

# 打印运行时间

print(f"Time: {end - start} seconds")

Web服务

# 提供一个简单的Web服务,返回当前时间和IP地址

import aiohttp

import asyncio

import datetime

# 定义一个异步函数,用于处理请求,并返回响应

async def handle(request):

# 获取当前时间和IP地址

now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

ip = request.remote

# 构造响应内容

text = f"Hello, the current time is {now}, and your IP address is {ip}."

# 返回响应对象

return aiohttp.web.Response(text=text)

# 定义一个主函数,用于创建Web应用和服务器

def main():

# 创建一个Web应用对象

app = aiohttp.web.Application()

# 添加一个路由到应用中,绑定请求处理函数

app.add_routes([aiohttp.web.get("/", handle)])

# 运行Web应用,监听本地8000端口

aiohttp.web.run_app(app, host="127.0.0.1", port=8000)

# 运行主函数

if __name__ == "__main__":

main()

以下是两个使用aiofiles的例子,一个是异步读取文件,另一个是异步写入文件:异步读取文件

# 异步读取文件

import aiofiles

import asyncio

# 定义一个异步函数,用于读取文件内容,并打印

async def read_file(filename):

# 使用aiofiles打开文件

async with aiofiles.open(filename, mode="r") as f:

# 读取所有内容

content = await f.read()

# 打印内容

print(content)

# 定义一个主协程函数,用于创建多个任务,并等待完成

async def main():

# 定义要读取的文件列表

filenames = ["file1.txt", "file2.txt", "file3.txt"]

# 定义一个空列表,用于存储任务对象

tasks = []

# 遍历每个文件名

for filename in filenames:

# 创建一个任务对象,并添加到列表中

task = asyncio.create_task(read_file(filename))

tasks.append(task)

# 等待所有任务完成,并获取结果

results = await asyncio.gather(*tasks)

# 返回结果

return results

# 运行主协程函数

if __name__ == "__main__":

asyncio.run(main())

异步写入文件

# 异步写入文件

import aiofiles

import asyncio

# 定义一个异步函数,用于写入一些内容到文件中,并返回写入的字节数

async def write_file(filename, content):

# 使用aiofiles打开文件

async with aiofiles.open(filename, mode="w") as f:

# 写入内容

n = await f.write(content)

# 返回写入的字节数

return n

# 定义一个主协程函数,用于创建多个任务,并等待完成

async def main():

# 定义要写入的文件和内容列表

files_and_contents = [

("file1.txt", "Hello, world!"),

("file2.txt", "Python is awesome!"),

("file3.txt", "Async IO is cool!")

]

# 定义一个空列表,用于存储任务对象

tasks = []

# 遍历每个文件和内容

for filename, content in files_and_contents:

# 创建一个任务对象,并添加到列表中

task = asyncio.create_task(write_file(filename, content))

tasks.append(task)

# 等待所有任务完成,并获取结果

results = await asyncio.gather(*tasks)

# 打印结果

print(results)

运行主协程函数

If name== “main”: asyncio.run(main())

以下是两个使用aiomysql的例子,一个是异步查询数据,另一个是异步插入数据:异步查询数据

# 异步查询数据

import aiomysql

定义一个异步函数,用于查询数据,并打印

async def query_data():

# 使用aiomysql连接数据库

conn = await aiomysql.connect(host=“localhost”, user=“root”, password=“123456”, db=“test”)

# 创建一个游标对象

cur = await conn.cursor()

# 执行一条SQL语句,查询数据

await cur.execute(“SELECT * FROM users”)

# 获取结果集

result = await cur.fetchall()

# 打印结果集

print(result)

# 关闭游标

await cur.close()

#关闭连接

await conn.close()

定义一个主协程函数,用于创建多个任务,并等待完成

async def main():

# 定义要查询的次数

query_num = 3

# 定义一个空列表,用于存储任务对象

tasks = []

#遍历每次查询

for i in range(query_num):

# 创建一个任务对象,并添加到列表中

task = asyncio.create_task(query_data())

tasks.append(task)

# 等待所有任务完成,并获取结果

results = await asyncio.gather(*tasks)

# 返回结果

return results

运行主协程函数

If name == “main”: asyncio.run(main())

异步插入数据

# 异步插入数据

import aiomysql

import asyncio

# 定义一个异步函数,用于插入数据,并返回影响的行数

async def insert_data(name, age):

# 使用aiomysql连接数据库

conn = await aiomysql.connect(host="localhost", user="root", password="123456", db="test")

# 创建一个游标对象

cur = await conn.cursor()

# 执行一条SQL语句,插入数据

n = await cur.execute("INSERT INTO users (name, age) VALUES (%s, %s)", (name, age))

# 提交事务

await conn.commit()

# 关闭游标

await cur.close()

# 关闭连接

await conn.close()

# 返回影响的行数

return n

定义一个主协程函数,用于创建多个任务,并等待完成

async def main():

# 定义要插入的数据列表

data_list = [ (“Alice”, 20), (“Bob”, 21), (“Charlie”, 22) ]

# 定义一个空列表,用于存储任务对象

tasks = []

# 遍历每个数据

for name, age in data_list:

# 创建一个任务对象,并添加到列表中

task = asyncio.create_task(insert_data(name, age)) tasks.append(task)

# 等待所有任务完成,并获取结果

results = await asyncio.gather(*tasks)

# 打印结果

print(results)

运行主协程函数

If name == “main”: asyncio.run(main())

标签: #python的并发 #python发送异步请求