python快速入门学习笔记(进阶篇)三十六:线程(二)线程池 ThreadPoolExecutor

  • 原创
  • 作者:程序员三丰
  • 发布时间:2026-06-04 15:19
  • 浏览量:10
Python入门第三十六课,主要是学习了线程池,与进程池机制一样,线程池管理一组可复用的线程,减少创建销毁开销,高效处理并发任务,提升资源利用率。

一、创建线程池

创建『线程池执行器 ProcessPoolExecutor』,然后使用 submit 方法提交任务,使用 shutdown 方法等待任务完成。

注意:

  • submit方法只负责“提交任务”,不会阻塞。
  • shutdown(wait=True)方法的作用是不再接收新的任务,参数wait=True的作用是阻塞,等待线程池中所有任务执行完毕。
import time, os
from concurrent.futures import ThreadPoolExecutor
from threading import get_native_id, RLock

def work(n, lock):
    with lock:
        print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
    time.sleep(1)

if __name__ == '__main__':
    print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
    # 创建一个线程池执行器,默认启动3个线程
    executor = ThreadPoolExecutor(3)
    # 创建线程锁
    lock = RLock()
    # 使用 submit 方法提交任务
    executor.submit(work, 1, lock)
    executor.submit(work, 2, lock)
    executor.submit(work, 3, lock)
    executor.submit(work, 4, lock)
    executor.submit(work, 5, lock)
    executor.submit(work, 6, lock)
    executor.submit(work, 7, lock)
    # 阻塞等待线程池中所有任务执行完毕。
    executor.shutdown(wait=True)
    print('--------- Main Process End -------------')

二、获取线程执行结果

获取线程执行后的返回结果(Future 类的实例对象 + result 方法)。

import time, os
from concurrent.futures import ThreadPoolExecutor
from threading import get_native_id, RLock


def work(n, lock):
    with lock:
        print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
    time.sleep(1)
    return f'任务{n}的结果'


if __name__ == '__main__':
    print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
    # 创建一个线程池执行器,默认启动3个线程
    executor = ThreadPoolExecutor(3)
    # 创建线程锁
    lock = RLock()
    # 使用 submit 方法提交任务
    futures = [executor.submit(work, index, lock) for index in range(1, 8)]
    # 阻塞等待线程池中所有任务执行完毕。
    executor.shutdown(wait=True)
    # 打印结果
    for future in futures:
        print(future.result())
    print('--------- Main Process End -------------')

三、按“完成顺序”获取结果

使用 as_completed 按“完成顺序”获取结果。

import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock


def work(n, lock):
    with lock:
        print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'任务{n}的结果'


if __name__ == '__main__':
    print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
    # 创建一个线程池执行器,默认启动3个线程
    executor = ThreadPoolExecutor(3)
    # 创建线程锁
    lock = RLock()

    # 使用 submit 方法提交任务
    futures = [executor.submit(work, index, lock) for index in range(1, 8)]

    # 收集每个线程返回的结果
    result_list = []
    for future in as_completed(futures):
        result_list.append(future.result())

    # 阻塞等待线程池中所有任务执行完毕。
    executor.shutdown(wait=True)

    # 打印结果
    print(result_list)

    print('--------- Main Process End -------------')

四、完成回调函数

使用 add_done_callback 方法,为任务添加完成时的回调函数。

import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock


def work(n, lock):
    with lock:
        print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'任务{n}的结果'


if __name__ == '__main__':
    print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
    # 创建一个线程池执行器,默认启动3个线程
    executor = ThreadPoolExecutor(3)
    # 创建线程锁
    lock = RLock()

    # 收集每个线程返回的结果
    result_list = []

    # 定义一个线程执行成功后的回调函数
    def done_func(f):
        result_list.append(f.result())

    # 使用 submit 方法提交任务,并指定回调函数
    for index in range(1, 8):
        f = executor.submit(work, index, lock)
        f.add_done_callback(done_func)

    # 阻塞等待线程池中所有任务执行完毕。
    executor.shutdown(wait=True)

    # 打印结果
    print(result_list)

    print('--------- Main Process End -------------')

五、map 批量提交任务

使用 map 方法批量提交任务。

注意:map方法本身不阻塞,但读取其返回的生成器对象是阻塞的,并且得到结果的顺序,与任务分配的顺序是一致的。

map方法会把这一批任务提交到线程池里执行,它会立刻返回一个生成器,真正的阻塞发生在:生成器取结果时,如list(result)

import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock


def work(n, lock):
    with lock:
        print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'任务{n}的结果'


if __name__ == '__main__':
    print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
    # 创建一个线程池执行器,默认启动3个线程
    executor = ThreadPoolExecutor(3)
    # 创建线程锁
    lock = RLock()

    # 使用 map 方法批量提交任务
    result = executor.map(work, range(1, 8), [lock] * 7)

    # 打印结果
    print(list(result))

    # 阻塞等待线程池中所有任务执行完毕。
    executor.shutdown(wait=True)

    print('--------- Main Process End -------------')

六、自动回收的写法

使用 with:线程池的“自动回收”写法,离开 with 代码块时自动执行 shudown(wait=True)

import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock


def work(n, lock):
    with lock:
        print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
    if n == 1:
        time.sleep(15)
    elif n == 2:
        time.sleep(10)
    else:
        time.sleep(1)
    return f'任务{n}的结果'


if __name__ == '__main__':
    print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')

    with ThreadPoolExecutor(3) as executor:
        # 创建线程锁
        lock = RLock()

        # 使用 map 方法批量提交任务
        result = executor.map(work, range(1, 8), [lock] * 7)

        # 打印结果
        print(list(result))

    print('--------- Main Process End -------------')
声明:本文为原创文章,51blog.xyz和作者拥有版权,如需转载,请注明来源于51blog.xyz并保留原文链接:https://www.51blog.xyz/article/138.html

文章归档

推荐文章

buildadmin logo
Thinkphp8 Vue3 Element PLus TypeScript Vite Pinia

🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。

热门标签

PHP ThinkPHP ThinkPHP5.1 Go Mysql Mysql5.7 Redis Linux CentOS7 Git HTML CSS CSS3 Javascript JQuery Vue LayUI VMware Uniapp 微信小程序 docker wiki Confluence7 学习笔记 uView ES6 Ant Design Pro of Vue React ThinkPHP6.0 chrome 扩展 翻译工具 Nuxt SSR 服务端渲染 scrollreveal.js ThinkPHP8.0 Mac webman 跨域CORS vscode GitHub ECharts Canvas vue3 three.js 微信支付 PHP全栈开发 Python AI 人工智能 AI生成 工作经验 实战笔记