python快速入门学习笔记(进阶篇)四十一:协程 Coroutine 进阶 asyncio 与 IO 密集型场景深度对比(下)

  • 原创
  • 作者:程序员三丰
  • 发布时间:2026-06-13 21:01
  • 浏览量:4
Python入门第四十一课,主要学习了 asyncio 的核心API,异步上下文管理器,异步锁和异步信号量。

四、asyncio 核心API(进阶实战)

1、并发任务管理(核心)需求如下:

(1)批量一次性执行多个任务,包含自动和自定义任务;

(2)灵活自定义并发任务;

(3)灵活调试并发任务。

2、要实现上面的并发任务管理需求,需要先熟悉两个asyncio的核心API:

  • asyncio.gather(*aws, return_exceptions=False)
    • 功能:并发执行多个可等待对象(协程对象或任务对象),并等待所有完成,按照传入顺序返回结果列表。
    • 原理:当向asyncio.gather()传入『协程对象』时,gather内部会自动调用asyncio.create_task()(或等价的底层写法asyncio.ensure_future()),把协程对象封装成『Task对象』(而非直接的 Future),然后将这些 Task 交给事件循环调度执行。
    • 异常处理:如果 return_exceptions 为 False (默认),所引发的首个异常会立即传播给等待 gather() 的任务。aws 序列中的其他可等待对象 不会被取消 并将继续运行。如果 return_exceptions 为 True,异常会和成功的结果一样处理,并聚合至结果列表。
    • 适用场景:需要一次性拿到所有任务的执行结果,并按原始顺序处理。
  • asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
    • 功能:并发运行多个可等待对象,可控制等待的结束条件。返回值是两个任务结合(done, pending)分别是已完成和为完成的任务。
    • 参数return_when):指定此函数应在何时返回,可选值:
      • asyncio.FIRST_COMPLETETD任意一个任务完成立即返回。
      • asyncio.FIRST_EXCEPTION任意一个任务抛出异常立即返回(若没有异常则等同于ALL_COMPLETED)。
      • asyncio.ALL_COMPLETED所有任务完成返回。(默认)
    • 适用场景:需要实时响应完成的任务(超时、竞速、部分结果优先处理),或需要手动管理未完成任务的生命周期。

3、代码示例

  • 示例一:使用asyncio.gather()API传入「协程对象」实现『批量一次性执行多个任务,包含自动和自定义任务』。
import asyncio
from time import time

async def worker(num: int) -> int:
    await asyncio.sleep(num)
    return num


async def run_multiple_tasks():
    begin_time = time()
    coroutines = [worker(1), worker(2), worker(3)]
    res = await asyncio.gather(*coroutines)
    spent_time = time() - begin_time
    print(f'结果:{res},耗时:{spent_time}')


if __name__ == '__main__':
    asyncio.run(run_multiple_tasks())

上面的代码运行结果:

结果:[1, 2, 3],耗时:3.000540018081665秒
  • 示例二:使用asyncio.gather()API传入asyncio.create_task(协程对象)创建「Task对象」实现『灵活自定义并发任务』。
import asyncio
from time import time

async def worker(num: int) -> int:
    await asyncio.sleep(num)
    return num

def task_done_callback(task: asyncio.Task):
    try:
        # 可以在这里根据回调结果处理其他相关业务
        # print(f'回调:任务完成,结果为:{task.result()}', task)
        pass
    except Exception as e:
        print(f'回调:任务发生异常:{e}')


async def run_custom_tasks():
    begin_time = time()
    tasks = []
    for x in range(1, 4):
        now_task = asyncio.create_task(worker(x))

        # 可以对 Task 对象进行自定义操作
        # print(now_task.get_coro()) # 获取由 Task 包装的协程对象
        # 给 Task 对象添加一个回调,将在 Task 对象完成时被运行
        now_task.add_done_callback(task_done_callback)

        tasks.append(now_task)
    # print(tasks)
    res = await asyncio.gather(*tasks)
    spent_time = time() - begin_time
    print(f'结果:{res}, 耗时:{spent_time}秒')

if __name__ == '__main__':
    asyncio.run(run_custom_tasks())

上面的代码运行结果:

结果:[1, 2, 3], 耗时:3.0011937618255615秒
  • 示例三:使用asyncio.wait()API传入asyncio.create_task(协程对象)创建「Task对象」实现『灵活调试并发任务』。
import asyncio
from time import time


async def worker(num: int) -> int:
    await asyncio.sleep(num)
    return num

async def run_debug_tasks():
    begin_time = time()
    tasks = []
    for x in range(1, 4):
        tasks.append(asyncio.create_task(worker(x)))

    # 建议运行代码前,先理解参数return_when和timeout,然后设置不同的值的组合进行测试查看结果
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION, timeout=2)
    spent_time = time() - begin_time
    print(f'结果:{len(done)}完成,{len(pending)}未完成, 耗时:{spent_time}秒')

if __name__ == '__main__':
    asyncio.run(run_debug_tasks())

五、异步上下文管理器

回顾:自定义同步上下文管理器,需要实现两个方法『进入__enter__()』和『退出__exit__()』。

那么自定义异步上下文管理器,需要实现两个方法『进入__aenter__()』和『退出__aexit__()』。

示例:自定义异步上下文管理器

import asyncio


class AsyncWith:
    async def __aenter__(self):
        """
        作用:异步上下文管理器 async with 中的代码执行之前调用该方法
        接收参数:异步上下文管理器对象本身,就是self
        返回值:异步上下文管理器对象本身,就是self
        """
        print('=== ENTER 异步上下文管理器')
        # 模拟业务处理耗时
        await asyncio.sleep(3)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """
        作用:异步上下文管理器 async with 中的代码执行结束后调用该方法
        接收参数:
        :param exc_type: 异常类型
        :param exc_val: 异常信息
        :param exc_tb: 异常 trace
        :return: bool True 则不抛出异常,False 则抛出异常 (默认)
        """
        print(f'=== EXIT')
        if exc_type is not None:
            print(f'异常类型:{exc_type}')
            print(f'异常对象:{exc_val}')
            print(f'异常追踪信息:{exc_tb}')
        try:
            await asyncio.sleep(3)
        except Exception as e:
            print(f'=== EXIT EXCEPTION {e}')
        return True

async def main():
    async with AsyncWith() as a:
        print('成功进入异步上下文管理器代码块')
        # todo: 此处可以写实际业务代码
        raise IndexError # 手动抛出异常
        print('即将退出上下文管理器代码块')


if __name__ == '__main__':
    asyncio.run(main())

六、异步锁(Lock):解决并发资源竞争

asyncio.Lockasyncio模块提供的异步互斥锁,用于保护共享资源的协程间同步,它确保同一时刻只有一个协程能执行被锁保护的代码块,避免数据竞争。

当多个协程访问『共享资源』(如全局变量、文件、数据库连接)时,需用异步锁保证数据一致性。

"""
异步锁:模拟银行存款
"""
import asyncio

# 获取锁
lock = asyncio.Lock()

# 全局余额
balance = 0


async def worker():
    global balance
    for _ in range(100):
        # 此处添加锁
        async with lock:
            current = balance
            await asyncio.sleep(0.0001)  # 模拟业务处理耗时
            balance = current + 1
    return balance


async def main():
    tasks = [asyncio.create_task(worker()) for _ in range(10)]
    res = await asyncio.gather(*tasks, return_exceptions=True)
    print(f'[res]: {res}, balance: {balance}')

if __name__ == '__main__':
    asyncio.run(main())

上面的代码运行结束输出的全局变量balance的值为 1000。

七、异步信号量(Semaphore):控制并发数

asyncio.Semaphoreasync模块中的异步信号量,用于限制同时访问某资源的协程数量

它非常适合控制协程并发量,防止资源过载,常见场景如:限制同时发起的网络请求数、数据库连接等。

下面是一段简单的测试代码:

import asyncio

async def worker(sem, worker_id):
    async with sem:
        print(f'✅ Worker {worker_id} 获得了信号量')
        await asyncio.sleep(2)
        print(f'❌ Worder {worker_id} 释放了信号量')


async def main():
    sem = asyncio.Semaphore(2)
    tasks = [worker(sem, i) for i in range(5)]
    await asyncio.gather(*tasks)


if __name__ == '__main__':
    asyncio.run(main())

运行上面的代码输出效果:前 2 个 worker 立即开始,2 秒后结束并释放槽位,后 2 个 worker 接着运行,直到全部完成。

✅ Worker 0 获得了信号量
✅ Worker 1 获得了信号量
❌ Worder 0 释放了信号量
❌ Worder 1 释放了信号量
✅ Worker 2 获得了信号量
✅ Worker 3 获得了信号量
❌ Worder 2 释放了信号量
❌ Worder 3 释放了信号量
✅ Worker 4 获得了信号量
❌ Worder 4 释放了信号量
声明:本文为原创文章,51blog.xyz和作者拥有版权,如需转载,请注明来源于51blog.xyz并保留原文链接:https://www.51blog.xyz/article/143.html

文章归档

推荐文章

buildadmin logo
Thinkphp8 Vue3 Element PLus TypeScript Vite Pinia

🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。

热门标签

PHP ThinkPHP ThinkPHP5.1 Go Mysql Mysql5.7 Redis Linux CentOS7 Git HTML CSS CSS3 Javascript JQuery Vue LayUI VMware Uniapp 微信小程序 docker wiki Confluence7 学习笔记 uView ES6 Ant Design Pro of Vue React ThinkPHP6.0 chrome 扩展 翻译工具 Nuxt SSR 服务端渲染 scrollreveal.js ThinkPHP8.0 Mac webman 跨域CORS vscode GitHub ECharts Canvas vue3 three.js 微信支付 PHP全栈开发 Python AI 人工智能 AI生成 工作经验 实战笔记