创建『线程池执行器 ProcessPoolExecutor』,然后使用 submit 方法提交任务,使用 shutdown 方法等待任务完成。
注意:
submit方法只负责“提交任务”,不会阻塞。shutdown(wait=True)方法的作用是不再接收新的任务,参数wait=True的作用是阻塞,等待线程池中所有任务执行完毕。import time, os
from concurrent.futures import ThreadPoolExecutor
from threading import get_native_id, RLock
def work(n, lock):
with lock:
print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
time.sleep(1)
if __name__ == '__main__':
print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
# 创建一个线程池执行器,默认启动3个线程
executor = ThreadPoolExecutor(3)
# 创建线程锁
lock = RLock()
# 使用 submit 方法提交任务
executor.submit(work, 1, lock)
executor.submit(work, 2, lock)
executor.submit(work, 3, lock)
executor.submit(work, 4, lock)
executor.submit(work, 5, lock)
executor.submit(work, 6, lock)
executor.submit(work, 7, lock)
# 阻塞等待线程池中所有任务执行完毕。
executor.shutdown(wait=True)
print('--------- Main Process End -------------')
获取线程执行后的返回结果(Future 类的实例对象 + result 方法)。
import time, os
from concurrent.futures import ThreadPoolExecutor
from threading import get_native_id, RLock
def work(n, lock):
with lock:
print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
time.sleep(1)
return f'任务{n}的结果'
if __name__ == '__main__':
print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
# 创建一个线程池执行器,默认启动3个线程
executor = ThreadPoolExecutor(3)
# 创建线程锁
lock = RLock()
# 使用 submit 方法提交任务
futures = [executor.submit(work, index, lock) for index in range(1, 8)]
# 阻塞等待线程池中所有任务执行完毕。
executor.shutdown(wait=True)
# 打印结果
for future in futures:
print(future.result())
print('--------- Main Process End -------------')
使用 as_completed 按“完成顺序”获取结果。
import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock
def work(n, lock):
with lock:
print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'任务{n}的结果'
if __name__ == '__main__':
print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
# 创建一个线程池执行器,默认启动3个线程
executor = ThreadPoolExecutor(3)
# 创建线程锁
lock = RLock()
# 使用 submit 方法提交任务
futures = [executor.submit(work, index, lock) for index in range(1, 8)]
# 收集每个线程返回的结果
result_list = []
for future in as_completed(futures):
result_list.append(future.result())
# 阻塞等待线程池中所有任务执行完毕。
executor.shutdown(wait=True)
# 打印结果
print(result_list)
print('--------- Main Process End -------------')
使用 add_done_callback 方法,为任务添加完成时的回调函数。
import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock
def work(n, lock):
with lock:
print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'任务{n}的结果'
if __name__ == '__main__':
print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
# 创建一个线程池执行器,默认启动3个线程
executor = ThreadPoolExecutor(3)
# 创建线程锁
lock = RLock()
# 收集每个线程返回的结果
result_list = []
# 定义一个线程执行成功后的回调函数
def done_func(f):
result_list.append(f.result())
# 使用 submit 方法提交任务,并指定回调函数
for index in range(1, 8):
f = executor.submit(work, index, lock)
f.add_done_callback(done_func)
# 阻塞等待线程池中所有任务执行完毕。
executor.shutdown(wait=True)
# 打印结果
print(result_list)
print('--------- Main Process End -------------')
使用 map 方法批量提交任务。
注意:map方法本身不阻塞,但读取其返回的生成器对象是阻塞的,并且得到结果的顺序,与任务分配的顺序是一致的。
map方法会把这一批任务提交到线程池里执行,它会立刻返回一个生成器,真正的阻塞发生在:生成器取结果时,如list(result)。
import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock
def work(n, lock):
with lock:
print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'任务{n}的结果'
if __name__ == '__main__':
print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
# 创建一个线程池执行器,默认启动3个线程
executor = ThreadPoolExecutor(3)
# 创建线程锁
lock = RLock()
# 使用 map 方法批量提交任务
result = executor.map(work, range(1, 8), [lock] * 7)
# 打印结果
print(list(result))
# 阻塞等待线程池中所有任务执行完毕。
executor.shutdown(wait=True)
print('--------- Main Process End -------------')
使用 with:线程池的“自动回收”写法,离开 with 代码块时自动执行 shudown(wait=True)
import time, os
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import get_native_id, RLock
def work(n, lock):
with lock:
print(f'work正在执行任务{n}.........进程PID:{os.getpid()},线程编号:{get_native_id()}')
if n == 1:
time.sleep(15)
elif n == 2:
time.sleep(10)
else:
time.sleep(1)
return f'任务{n}的结果'
if __name__ == '__main__':
print(f'--------- Main Process Start -------------进程PID:{os.getpid()}')
with ThreadPoolExecutor(3) as executor:
# 创建线程锁
lock = RLock()
# 使用 map 方法批量提交任务
result = executor.map(work, range(1, 8), [lock] * 7)
# 打印结果
print(list(result))
print('--------- Main Process End -------------')
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。