当子进程逻辑比较复杂,或者想把『进程 + 行为』封装成一个整体时,可以使用继承 Process 类的方式去创建进程,这种方式属于“面向对象风格”。
原理是:
- 通过继承
Process并重写run()方法来实现个性化的逻辑封装。- 当调用
start()方法时,新进程会执行run()方法。- 不要直接调用
run()方法,否则只是在当前进程中执行,不会创建新进程。__init__()在主进程中执行,run()在子进程中执行。
自定义进程类有以下几点好处:
✅ 封装复杂逻辑:把进程需要的数据和函数放在同一个类中,避免传递过多参数。
✅ 复用:可以定义通用的进程行为,然后在不同地方实例化。
✅ 添加额外的方法/属性:比如进度跟踪、状态查询等。
✧ 必须继承Process类。
✧ 把子进程要干的事情,写进run()方法里。
✧ 依然使用start()方法启动进程,不要手动调用run()方法。
✧ 若子进程不需要参数,可以不写__init__()方法,若需要参数,则需要编写__init__()方法。
✧ 传给子进程的参数,作为实例属性保存。
from multiprocessing import Process
import os, time
class SpeakProcess(Process):
def __init__(self, a, b, **kwargs):
super().__init__(**kwargs) # 必须调用父类初始化
self.a = a
self.b = b
def run(self): # 重写 run 方法
for index in range(10):
print(f'{self.a} -- {self.b} -- 我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
class StudyProcess(Process):
def run(self):
for index in range(15):
print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
print('--------- Main Proc Start ----------')
p1 = SpeakProcess(10, 20)
p2 = StudyProcess()
p1.start() # 内部会调用 run 方法
p2.start()
p1.join()
p2.join()
print('--------- Main Proc End ----------')
下面的示例代码,通过Queue实现带结果回传的自定义进程。
from multiprocessing import Process, Queue
import os
class SumProcess(Process):
def __init__(self, numbers, result_queue):
print(f'__init__() 内获取到的进程pid是{os.getpid()}')
super().__init__()
self.numbers = numbers
self.result_queue = result_queue
def run(self):
print(f'run() 内获取到的进程pid是{os.getpid()}')
total = sum(self.numbers)
self.result_queue.put(total)
if __name__ == '__main__':
print(f'--------- Main Process Start {os.getpid()} ---------')
q = Queue()
p = SumProcess([1, 2, 3, 4, 5], q)
p.start()
p.join()
print(f'计算结果是:{q.get()}')
print(f'--------- Main Process End ---------')
下面是一个在自定义进程类中同时使用Queue和Lock的综合示例。实现了下面场景描述的需求内容:
【场景描述】多个子进程分别计算一部分数据,将结果放入Queue,同时使用Lock保护共享计数器(比如统计已处理的任务数)以及对控制台输出的同步,避免打印信息混乱。
from multiprocessing import Process, Queue, Lock, Value
import os
class WorkerProcess(Process):
def __init__(self, worker_id, data_chunk, result_queue, counter, counter_lock):
super().__init__()
self.worker_id = worker_id
self.data_chunk = data_chunk # 需要处理的数据段(例如一个列表)
self.result_queue = result_queue # 用于回传结果的队列
self.counter = counter # 共享计数器(Value类型)
self.counter_lock = counter_lock # 保护计数器的锁
def run(self):
# 处理数据
processed = [x * x for x in self.data_chunk] # 平方计算
# 将结果放入队列
self.result_queue.put((self.worker_id, processed))
# 使用锁安全地更新共享计数器
# 使用锁保护打印输出(避免多个进程同时打印导致混乱)
# with self.counter_lock: # 自定义锁
with self.counter.get_lock(): # 推荐使用counter内部的锁
self.counter.value += len(self.data_chunk)
print(f'[进程{self.worker_id} {os.getpid()}-{os.getppid()}] 完成,处理了 {len(self.data_chunk)} 项,累计处理 {self.counter.value} 项')
if __name__ == '__main__':
# 准备数据:将一个大列表分成多个块
all_data = list(range(100))
chunk_size = 25
chunks = [all_data[i:i + chunk_size] for i in range(0, len(all_data), chunk_size)]
# 创建进程间共享对象
result_queue = Queue()
counter_lock = Lock() # 保护共享计数器的锁
counter = Value('i', 0) # 共享整数初始值 0
processes = []
for idx, chunk in enumerate(chunks):
p = WorkerProcess(idx, chunk, result_queue, counter, counter_lock)
processes.append(p)
p.start()
# 等待所有子进程结束
for p in processes:
p.join()
# 从队列中收集所有结果
all_results = []
while not result_queue.empty():
all_results.append(result_queue.get())
print('\n所有进程执行完毕')
print(f'共享计数器最终值:{counter.value}')
print(f'收集到的结果数量:{len(all_results)}')
print('收集到的结果如下:')
for result in all_results:
print(result)
下面是对上面示例代码的解析:
Value的 += 不是原子操作:counter.value += n 实际包含“读-改-写”三步,没有锁会导致更新丢失。enumerate() 内置函数说明enumerate()是 Python 的一个内置函数,用于将一个可迭代对象组合为一个索引序列,同时返回索引和元素值。它常用于在 for 循环中同时获取元素的索引和值,避免手动维护计数器。enumerate 对象),它是一个迭代器,每次迭代产生一个元组 (index, element)。>>> seasons = ['Spring', 'Summer', 'Fall', 'Winter']
>>> list(enumerate(seasons))
[(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]
>>> list(enumerate(seasons, start=1))
[(1, 'Spring'), (2, 'Summer'), (3, 'Fall'), (4, 'Winter')]
Value 共享内存对象介绍Value是一个可共享的、类型化的内存对象,允许多个进程访问同一块内存区域。它是 Python 多进程编程中用于在进程间共享单个值(如整数、浮点数、字符串等)的工具。Value对象存储在共享内存中,所有进程可以看到它的变化。
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。