python快速入门学习笔记(进阶篇)三十四:进程(六)进程池 ProcessPoolExecutor

  • 原创
  • 作者:程序员三丰
  • 发布时间:2026-06-02 09:59
  • 浏览量:11
Python入门第三十四课,主要是学习了如何使用进程池统一管理多个子进程,降低频繁创建/销毁进程带来的开销,避免浪费系统资源。

概述

截至目前,我们已经学会了创建多个进程一起工作,但是我们仍然面临一个问题:如果每来一个任务就创建一个进程,会很浪费系统资源,因为进程创建/销毁成本很高,当有大量任务时,系统资源浪费严重。

那么进程池就可以解决这个问题。使用进程池的优势:如何使用进程池统一管理多个子进程,避免频繁创建/销毁进程带来的开销,因为进程池会提前创建固定数量的进程,反复使用它们来执行任务。

一、创建进程池

准确讲是创建『进程池执行器 ProcessPoolExecutor』,然后使用 submit 方法提交任务,使用 shutdown 方法等待任务完成。

注意:

  • submit方法只负责“提交任务”,不会阻塞主进程。
  • shutdown(wait=True)方法的作用是不再接收新的任务,参数wait=True的作用是阻塞主进程,等待进程池中所有任务执行完毕。
import os, time
from concurrent.futures import ProcessPoolExecutor


def work(n):
    print(f'work 正在执行任务 {n} ......... {os.getpid()}')
    time.sleep(1)


if __name__ == '__main__':
    print('------------ Main Process Start ------------')

    # 创建一个进程池执行器
    executor = ProcessPoolExecutor(3)  # 3 表示进程池内启动3个进程
    # 使用 submit 方法提交任务
    executor.submit(work, 1)
    executor.submit(work, 2)
    executor.submit(work, 3)
    executor.submit(work, 4)
    executor.submit(work, 5)
    executor.submit(work, 6)
    executor.submit(work, 7)

    # 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
    executor.shutdown(wait=True)

    print('------------ Main Process End ------------')

二、获取进程执行结果

获取子进程执行后的返回结果(Future 类的实例对象 + result 方法)。

import os, time
from concurrent.futures import ProcessPoolExecutor


def work(n):
    print(f'work 正在执行任务 {n} ......... {os.getpid()}')
    time.sleep(1)
    return f'我是任务{n}的结果'


if __name__ == '__main__':
    print('------------ Main Process Start ------------')

    # 创建一个进程池执行器
    executor = ProcessPoolExecutor(3)  # 3 表示进程池内启动3个进程
    # 使用 submit 方法提交任务
    # f1 = executor.submit(work, 1)
    # f2 = executor.submit(work, 2)
    # f3 = executor.submit(work, 3)
    # f4 = executor.submit(work, 4)
    # f5 = executor.submit(work, 5)
    # f6 = executor.submit(work, 6)
    # f7 = executor.submit(work, 7)
    futures = [executor.submit(work, index) for index in range(1, 8)]

    # 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
    executor.shutdown(wait=True)

    # print(f1.result())
    # print(f2.result())
    # print(f3.result())
    # print(f4.result())
    # print(f5.result())
    # print(f6.result())
    # print(f7.result())
    for future in futures:
        print(future.result())

    print('------------ Main Process End ------------')

三、按“完成顺序”获取结果

使用 as_completed 按“完成顺序”获取结果。

import os, time
from concurrent.futures import ProcessPoolExecutor, as_completed


def work(n):
    print(f'work 正在执行任务 {n} ......... {os.getpid()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'我是任务{n}的结果'


if __name__ == '__main__':
    print('------------ Main Process Start ------------')

    # 创建一个进程池执行器
    executor = ProcessPoolExecutor(3)  # 3 表示进程池内启动3个进程

    # 使用 submit 方法提交任务
    futures = [executor.submit(work, index) for index in range(1, 8)]

    # 收集每个任务的结果
    result_list = []
    for future in as_completed(futures):
        result_list.append(future.result())

    # 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
    executor.shutdown(wait=True)

    # 打印最终结果
    print(result_list)

    print('------------ Main Process End ------------')

四、完成回调函数

使用 add_done_callback 方法,为任务添加完成时的回调函数。

import os, time
from concurrent.futures import ProcessPoolExecutor


def work(n):
    print(f'work 正在执行任务 {n} ......... {os.getpid()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'我是任务{n}的结果'


if __name__ == '__main__':
    print('------------ Main Process Start ------------')

    # 创建一个进程池执行器
    executor = ProcessPoolExecutor(3)  # 3 表示进程池内启动3个进程

    # 收集每个任务的结果
    result_list = []

    # 定义任务完成后的回调函数
    def done_func(future):
        result_list.append(future.result())

    # 开启7个任务,并指定回调函数
    for index in range(1, 8):
        f = executor.submit(work, index)
        f.add_done_callback(done_func)

    # 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
    executor.shutdown(wait=True)

    # 打印最终结果
    print(result_list)

    print('------------ Main Process End ------------')

五、map 批量提交任务

使用 map 方法批量提交任务。

注意:map方法本身不阻塞,但读取其返回的生成器对象是阻塞的,并且得到结果的顺序,与任务分配的顺序是一致的。

map方法会把这一批任务提交到进程池里执行,它会立刻返回一个生成器,真正的阻塞发生在:生成器取结果时,如list(result)

import os, time
from concurrent.futures import ProcessPoolExecutor


def work(n):
    print(f'work 正在执行任务 {n} ......... {os.getpid()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'我是任务{n}的结果'


if __name__ == '__main__':
    print('------------ Main Process Start ------------')

    # 创建一个进程池执行器
    executor = ProcessPoolExecutor(3)  # 3 表示进程池内启动3个进程

    # 通过 map 方法批量提交任务(结果按照提交的顺序来)
    results = executor.map(work, range(1, 8))

    # 获取 results 生成器中的内容
    print(list(results))

    # 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
    executor.shutdown(wait=True)

    print('------------ Main Process End ------------')

六、自动回收的写法

使用 with:进程池的“自动回收”写法,离开 with 代码块时自动执行 shudown(wait=True)

import os, time
from concurrent.futures import ProcessPoolExecutor


def work(n):
    print(f'work 正在执行任务 {n} ......... {os.getpid()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'我是任务{n}的结果'


if __name__ == '__main__':
    print('------------ Main Process Start ------------')

    # 创建一个进程池执行器,进程池内启动3个进程
    with ProcessPoolExecutor(3) as executor:
        # 通过 map 方法批量提交任务(结果按照提交的顺序来)
        results = executor.map(work, range(1, 8))

        # 获取 results 生成器中的内容
        print(list(results))

    print('------------ Main Process End ------------')
声明:本文为原创文章,51blog.xyz和作者拥有版权,如需转载,请注明来源于51blog.xyz并保留原文链接:https://www.51blog.xyz/article/136.html

文章归档

推荐文章

buildadmin logo
Thinkphp8 Vue3 Element PLus TypeScript Vite Pinia

🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。

热门标签

PHP ThinkPHP ThinkPHP5.1 Go Mysql Mysql5.7 Redis Linux CentOS7 Git HTML CSS CSS3 Javascript JQuery Vue LayUI VMware Uniapp 微信小程序 docker wiki Confluence7 学习笔记 uView ES6 Ant Design Pro of Vue React ThinkPHP6.0 chrome 扩展 翻译工具 Nuxt SSR 服务端渲染 scrollreveal.js ThinkPHP8.0 Mac webman 跨域CORS vscode GitHub ECharts Canvas vue3 three.js 微信支付 PHP全栈开发 Python AI 人工智能 AI生成 工作经验 实战笔记