截至目前,我们已经学会了创建多个进程一起工作,但是我们仍然面临一个问题:如果每来一个任务就创建一个进程,会很浪费系统资源,因为进程创建/销毁成本很高,当有大量任务时,系统资源浪费严重。
那么进程池就可以解决这个问题。使用进程池的优势:如何使用进程池统一管理多个子进程,避免频繁创建/销毁进程带来的开销,因为进程池会提前创建固定数量的进程,反复使用它们来执行任务。
准确讲是创建『进程池执行器 ProcessPoolExecutor』,然后使用 submit 方法提交任务,使用 shutdown 方法等待任务完成。
注意:
submit方法只负责“提交任务”,不会阻塞主进程。shutdown(wait=True)方法的作用是不再接收新的任务,参数wait=True的作用是阻塞主进程,等待进程池中所有任务执行完毕。import os, time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'work 正在执行任务 {n} ......... {os.getpid()}')
time.sleep(1)
if __name__ == '__main__':
print('------------ Main Process Start ------------')
# 创建一个进程池执行器
executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程
# 使用 submit 方法提交任务
executor.submit(work, 1)
executor.submit(work, 2)
executor.submit(work, 3)
executor.submit(work, 4)
executor.submit(work, 5)
executor.submit(work, 6)
executor.submit(work, 7)
# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
executor.shutdown(wait=True)
print('------------ Main Process End ------------')
获取子进程执行后的返回结果(Future 类的实例对象 + result 方法)。
import os, time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'work 正在执行任务 {n} ......... {os.getpid()}')
time.sleep(1)
return f'我是任务{n}的结果'
if __name__ == '__main__':
print('------------ Main Process Start ------------')
# 创建一个进程池执行器
executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程
# 使用 submit 方法提交任务
# f1 = executor.submit(work, 1)
# f2 = executor.submit(work, 2)
# f3 = executor.submit(work, 3)
# f4 = executor.submit(work, 4)
# f5 = executor.submit(work, 5)
# f6 = executor.submit(work, 6)
# f7 = executor.submit(work, 7)
futures = [executor.submit(work, index) for index in range(1, 8)]
# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
executor.shutdown(wait=True)
# print(f1.result())
# print(f2.result())
# print(f3.result())
# print(f4.result())
# print(f5.result())
# print(f6.result())
# print(f7.result())
for future in futures:
print(future.result())
print('------------ Main Process End ------------')
使用 as_completed 按“完成顺序”获取结果。
import os, time
from concurrent.futures import ProcessPoolExecutor, as_completed
def work(n):
print(f'work 正在执行任务 {n} ......... {os.getpid()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'我是任务{n}的结果'
if __name__ == '__main__':
print('------------ Main Process Start ------------')
# 创建一个进程池执行器
executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程
# 使用 submit 方法提交任务
futures = [executor.submit(work, index) for index in range(1, 8)]
# 收集每个任务的结果
result_list = []
for future in as_completed(futures):
result_list.append(future.result())
# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
executor.shutdown(wait=True)
# 打印最终结果
print(result_list)
print('------------ Main Process End ------------')
使用 add_done_callback 方法,为任务添加完成时的回调函数。
import os, time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'work 正在执行任务 {n} ......... {os.getpid()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'我是任务{n}的结果'
if __name__ == '__main__':
print('------------ Main Process Start ------------')
# 创建一个进程池执行器
executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程
# 收集每个任务的结果
result_list = []
# 定义任务完成后的回调函数
def done_func(future):
result_list.append(future.result())
# 开启7个任务,并指定回调函数
for index in range(1, 8):
f = executor.submit(work, index)
f.add_done_callback(done_func)
# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
executor.shutdown(wait=True)
# 打印最终结果
print(result_list)
print('------------ Main Process End ------------')
使用 map 方法批量提交任务。
注意:map方法本身不阻塞,但读取其返回的生成器对象是阻塞的,并且得到结果的顺序,与任务分配的顺序是一致的。
map方法会把这一批任务提交到进程池里执行,它会立刻返回一个生成器,真正的阻塞发生在:生成器取结果时,如list(result)。
import os, time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'work 正在执行任务 {n} ......... {os.getpid()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'我是任务{n}的结果'
if __name__ == '__main__':
print('------------ Main Process Start ------------')
# 创建一个进程池执行器
executor = ProcessPoolExecutor(3) # 3 表示进程池内启动3个进程
# 通过 map 方法批量提交任务(结果按照提交的顺序来)
results = executor.map(work, range(1, 8))
# 获取 results 生成器中的内容
print(list(results))
# 使用 shutdown 方法阻塞并等待进程池中所有任务执行完毕
executor.shutdown(wait=True)
print('------------ Main Process End ------------')
使用 with:进程池的“自动回收”写法,离开 with 代码块时自动执行 shudown(wait=True)
import os, time
from concurrent.futures import ProcessPoolExecutor
def work(n):
print(f'work 正在执行任务 {n} ......... {os.getpid()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'我是任务{n}的结果'
if __name__ == '__main__':
print('------------ Main Process Start ------------')
# 创建一个进程池执行器,进程池内启动3个进程
with ProcessPoolExecutor(3) as executor:
# 通过 map 方法批量提交任务(结果按照提交的顺序来)
results = executor.map(work, range(1, 8))
# 获取 results 生成器中的内容
print(list(results))
print('------------ Main Process End ------------')
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。