1、并发任务管理(核心)需求如下:
(1)批量一次性执行多个任务,包含自动和自定义任务;
(2)灵活自定义并发任务;
(3)灵活调试并发任务。
2、要实现上面的并发任务管理需求,需要先熟悉两个asyncio的核心API:
asyncio.gather(*aws, return_exceptions=False)asyncio.gather()传入『协程对象』时,gather内部会自动调用asyncio.create_task()(或等价的底层写法asyncio.ensure_future()),把协程对象封装成『Task对象』(而非直接的 Future),然后将这些 Task 交给事件循环调度执行。asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)(done, pending)分别是已完成和为完成的任务。return_when):指定此函数应在何时返回,可选值:asyncio.FIRST_COMPLETETD任意一个任务完成立即返回。asyncio.FIRST_EXCEPTION任意一个任务抛出异常立即返回(若没有异常则等同于ALL_COMPLETED)。asyncio.ALL_COMPLETED所有任务完成返回。(默认)3、代码示例
asyncio.gather()API传入「协程对象」实现『批量一次性执行多个任务,包含自动和自定义任务』。import asyncio
from time import time
async def worker(num: int) -> int:
await asyncio.sleep(num)
return num
async def run_multiple_tasks():
begin_time = time()
coroutines = [worker(1), worker(2), worker(3)]
res = await asyncio.gather(*coroutines)
spent_time = time() - begin_time
print(f'结果:{res},耗时:{spent_time}')
if __name__ == '__main__':
asyncio.run(run_multiple_tasks())
上面的代码运行结果:
结果:[1, 2, 3],耗时:3.000540018081665秒
asyncio.gather()API传入asyncio.create_task(协程对象)创建「Task对象」实现『灵活自定义并发任务』。import asyncio
from time import time
async def worker(num: int) -> int:
await asyncio.sleep(num)
return num
def task_done_callback(task: asyncio.Task):
try:
# 可以在这里根据回调结果处理其他相关业务
# print(f'回调:任务完成,结果为:{task.result()}', task)
pass
except Exception as e:
print(f'回调:任务发生异常:{e}')
async def run_custom_tasks():
begin_time = time()
tasks = []
for x in range(1, 4):
now_task = asyncio.create_task(worker(x))
# 可以对 Task 对象进行自定义操作
# print(now_task.get_coro()) # 获取由 Task 包装的协程对象
# 给 Task 对象添加一个回调,将在 Task 对象完成时被运行
now_task.add_done_callback(task_done_callback)
tasks.append(now_task)
# print(tasks)
res = await asyncio.gather(*tasks)
spent_time = time() - begin_time
print(f'结果:{res}, 耗时:{spent_time}秒')
if __name__ == '__main__':
asyncio.run(run_custom_tasks())
上面的代码运行结果:
结果:[1, 2, 3], 耗时:3.0011937618255615秒
asyncio.wait()API传入asyncio.create_task(协程对象)创建「Task对象」实现『灵活调试并发任务』。import asyncio
from time import time
async def worker(num: int) -> int:
await asyncio.sleep(num)
return num
async def run_debug_tasks():
begin_time = time()
tasks = []
for x in range(1, 4):
tasks.append(asyncio.create_task(worker(x)))
# 建议运行代码前,先理解参数return_when和timeout,然后设置不同的值的组合进行测试查看结果
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION, timeout=2)
spent_time = time() - begin_time
print(f'结果:{len(done)}完成,{len(pending)}未完成, 耗时:{spent_time}秒')
if __name__ == '__main__':
asyncio.run(run_debug_tasks())
回顾:自定义同步上下文管理器,需要实现两个方法『进入
__enter__()』和『退出__exit__()』。
那么自定义异步上下文管理器,需要实现两个方法『进入__aenter__()』和『退出__aexit__()』。
示例:自定义异步上下文管理器
import asyncio
class AsyncWith:
async def __aenter__(self):
"""
作用:异步上下文管理器 async with 中的代码执行之前调用该方法
接收参数:异步上下文管理器对象本身,就是self
返回值:异步上下文管理器对象本身,就是self
"""
print('=== ENTER 异步上下文管理器')
# 模拟业务处理耗时
await asyncio.sleep(3)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
作用:异步上下文管理器 async with 中的代码执行结束后调用该方法
接收参数:
:param exc_type: 异常类型
:param exc_val: 异常信息
:param exc_tb: 异常 trace
:return: bool True 则不抛出异常,False 则抛出异常 (默认)
"""
print(f'=== EXIT')
if exc_type is not None:
print(f'异常类型:{exc_type}')
print(f'异常对象:{exc_val}')
print(f'异常追踪信息:{exc_tb}')
try:
await asyncio.sleep(3)
except Exception as e:
print(f'=== EXIT EXCEPTION {e}')
return True
async def main():
async with AsyncWith() as a:
print('成功进入异步上下文管理器代码块')
# todo: 此处可以写实际业务代码
raise IndexError # 手动抛出异常
print('即将退出上下文管理器代码块')
if __name__ == '__main__':
asyncio.run(main())
asyncio.Lock是asyncio模块提供的异步互斥锁,用于保护共享资源的协程间同步,它确保同一时刻只有一个协程能执行被锁保护的代码块,避免数据竞争。
当多个协程访问『共享资源』(如全局变量、文件、数据库连接)时,需用异步锁保证数据一致性。
"""
异步锁:模拟银行存款
"""
import asyncio
# 获取锁
lock = asyncio.Lock()
# 全局余额
balance = 0
async def worker():
global balance
for _ in range(100):
# 此处添加锁
async with lock:
current = balance
await asyncio.sleep(0.0001) # 模拟业务处理耗时
balance = current + 1
return balance
async def main():
tasks = [asyncio.create_task(worker()) for _ in range(10)]
res = await asyncio.gather(*tasks, return_exceptions=True)
print(f'[res]: {res}, balance: {balance}')
if __name__ == '__main__':
asyncio.run(main())
上面的代码运行结束输出的全局变量balance的值为 1000。
asyncio.Semaphore是async模块中的异步信号量,用于限制同时访问某资源的协程数量。
它非常适合控制协程并发量,防止资源过载,常见场景如:限制同时发起的网络请求数、数据库连接等。
下面是一段简单的测试代码:
import asyncio
async def worker(sem, worker_id):
async with sem:
print(f'✅ Worker {worker_id} 获得了信号量')
await asyncio.sleep(2)
print(f'❌ Worder {worker_id} 释放了信号量')
async def main():
sem = asyncio.Semaphore(2)
tasks = [worker(sem, i) for i in range(5)]
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
运行上面的代码输出效果:前 2 个 worker 立即开始,2 秒后结束并释放槽位,后 2 个 worker 接着运行,直到全部完成。
✅ Worker 0 获得了信号量
✅ Worker 1 获得了信号量
❌ Worder 0 释放了信号量
❌ Worder 1 释放了信号量
✅ Worker 2 获得了信号量
✅ Worker 3 获得了信号量
❌ Worder 2 释放了信号量
❌ Worder 3 释放了信号量
✅ Worker 4 获得了信号量
❌ Worder 4 释放了信号量
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。