python快速入门学习笔记(进阶篇)三十三:进程(五)继承Process类创建自定义进程

  • 原创
  • 作者:程序员三丰
  • 发布时间:2026-06-01 13:58
  • 浏览量:8
Python入门第三十三课,主要是学习了当子进程逻辑比较复杂,或者想把『进程 + 行为』封装成一个整体时,如何使用继承 Process 类的方式去创建自定义进程类。

为什么要自定义进程

当子进程逻辑比较复杂,或者想把『进程 + 行为』封装成一个整体时,可以使用继承 Process 类的方式去创建进程,这种方式属于“面向对象风格”。

原理是:

  • 通过继承Process并重写run()方法来实现个性化的逻辑封装。
  • 当调用start()方法时,新进程会执行run()方法。
  • 不要直接调用run()方法,否则只是在当前进程中执行,不会创建新进程。
  • __init__()在主进程中执行,run()在子进程中执行。

自定义进程类有以下几点好处:

封装复杂逻辑:把进程需要的数据和函数放在同一个类中,避免传递过多参数。

复用:可以定义通用的进程行为,然后在不同地方实例化。

添加额外的方法/属性:比如进度跟踪、状态查询等。

核心要点

✧ 必须继承Process类。

✧ 把子进程要干的事情,写进run()方法里。

✧ 依然使用start()方法启动进程,不要手动调用run()方法。

✧ 若子进程不需要参数,可以不写__init__()方法,若需要参数,则需要编写__init__()方法。

✧ 传给子进程的参数,作为实例属性保存。

示例:基本写法

from multiprocessing import Process
import os, time

class SpeakProcess(Process):
    def __init__(self, a, b, **kwargs):
        super().__init__(**kwargs) # 必须调用父类初始化
        self.a = a
        self.b = b

    def run(self): # 重写 run 方法
        for index in range(10):
            print(f'{self.a} -- {self.b} -- 我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
            time.sleep(1)

class StudyProcess(Process):
    def run(self):
        for index in range(15):
            print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
            time.sleep(1)

if __name__ == '__main__':
    print('--------- Main Proc Start ----------')

    p1 = SpeakProcess(10, 20)
    p2 = StudyProcess()

    p1.start() # 内部会调用 run 方法
    p2.start()

    p1.join()
    p2.join()

    print('--------- Main Proc End ----------')

示例:自定义进程使用Queue

下面的示例代码,通过Queue实现带结果回传的自定义进程。

from multiprocessing import Process, Queue
import os


class SumProcess(Process):
    def __init__(self, numbers, result_queue):
        print(f'__init__() 内获取到的进程pid是{os.getpid()}')
        super().__init__()
        self.numbers = numbers
        self.result_queue = result_queue

    def run(self):
        print(f'run() 内获取到的进程pid是{os.getpid()}')
        total = sum(self.numbers)
        self.result_queue.put(total)


if __name__ == '__main__':
    print(f'--------- Main Process Start {os.getpid()} ---------')
    q = Queue()
    p = SumProcess([1, 2, 3, 4, 5], q)
    p.start()
    p.join()
    print(f'计算结果是:{q.get()}')
    print(f'--------- Main Process End ---------')

实例:自定义进程使用Queue和Lock

下面是一个在自定义进程类中同时使用QueueLock的综合示例。实现了下面场景描述的需求内容:

【场景描述】多个子进程分别计算一部分数据,将结果放入Queue,同时使用Lock保护共享计数器(比如统计已处理的任务数)以及对控制台输出的同步,避免打印信息混乱。

from multiprocessing import Process, Queue, Lock, Value
import os


class WorkerProcess(Process):
    def __init__(self, worker_id, data_chunk, result_queue, counter, counter_lock):
        super().__init__()
        self.worker_id = worker_id
        self.data_chunk = data_chunk  # 需要处理的数据段(例如一个列表)
        self.result_queue = result_queue  # 用于回传结果的队列
        self.counter = counter  # 共享计数器(Value类型)
        self.counter_lock = counter_lock  # 保护计数器的锁

    def run(self):
        # 处理数据
        processed = [x * x for x in self.data_chunk]  # 平方计算
        # 将结果放入队列
        self.result_queue.put((self.worker_id, processed))

        # 使用锁安全地更新共享计数器
        # 使用锁保护打印输出(避免多个进程同时打印导致混乱)
        # with self.counter_lock: # 自定义锁
        with self.counter.get_lock(): # 推荐使用counter内部的锁
            self.counter.value += len(self.data_chunk)
            print(f'[进程{self.worker_id} {os.getpid()}-{os.getppid()}] 完成,处理了 {len(self.data_chunk)} 项,累计处理 {self.counter.value} 项')


if __name__ == '__main__':
    # 准备数据:将一个大列表分成多个块
    all_data = list(range(100))
    chunk_size = 25
    chunks = [all_data[i:i + chunk_size] for i in range(0, len(all_data), chunk_size)]

    # 创建进程间共享对象
    result_queue = Queue()
    counter_lock = Lock()  # 保护共享计数器的锁
    counter = Value('i', 0)  # 共享整数初始值 0

    processes = []
    for idx, chunk in enumerate(chunks):
        p = WorkerProcess(idx, chunk, result_queue, counter, counter_lock)
        processes.append(p)
        p.start()

    # 等待所有子进程结束
    for p in processes:
        p.join()

    # 从队列中收集所有结果
    all_results = []
    while not result_queue.empty():
        all_results.append(result_queue.get())

    print('\n所有进程执行完毕')
    print(f'共享计数器最终值:{counter.value}')
    print(f'收集到的结果数量:{len(all_results)}')
    print('收集到的结果如下:')
    for result in all_results:
        print(result)

下面是对上面示例代码的解析

  • 运行原理
    • 主进程创建 Queue、Value 和 Lock,并将它们传入每个自定义进程的构造函数。
    • 每个子进程在 run() 中执行真正的计算,结果放入 Queue。
    • 更新共享计数器时,使用 with counter_lock: 保证同一时间只有一个进程修改它。
    • 打印时也使用同一个锁(或单独定义一个 print_lock),防止多个进程的输出交错在一起。
    • 主进程最后通过 join() 等待所有子进程结束,再从 Queue 中取出所有结果。
  • 为什么锁在这里是必要的?
    • Value的 += 不是原子操作counter.value += n 实际包含“读-改-写”三步,没有锁会导致更新丢失。
    • 标准输出 print() 在多进程中也可能交错:锁可以保证一次输出完整的一行信息。
  • enumerate() 内置函数说明
    • enumerate()是 Python 的一个内置函数,用于将一个可迭代对象组合为一个索引序列,同时返回索引和元素值。它常用于在 for 循环中同时获取元素的索引和值,避免手动维护计数器。
    • 返回值:返回一个枚举对象(enumerate 对象),它是一个迭代器,每次迭代产生一个元组 (index, element)
    • 示例:
>>> seasons = ['Spring', 'Summer', 'Fall', 'Winter']
>>> list(enumerate(seasons))
[(0, 'Spring'), (1, 'Summer'), (2, 'Fall'), (3, 'Winter')]

>>> list(enumerate(seasons, start=1))
[(1, 'Spring'), (2, 'Summer'), (3, 'Fall'), (4, 'Winter')]
  • Value 共享内存对象介绍
    • Value是一个可共享的、类型化的内存对象,允许多个进程访问同一块内存区域。它是 Python 多进程编程中用于在进程间共享单个值(如整数、浮点数、字符串等)的工具。
    • 它与普通变量的区别在于:普通变量在每个进程中独立复制,互不干扰。Value对象存储在共享内存中,所有进程可以看到它的变化。
声明:本文为原创文章,51blog.xyz和作者拥有版权,如需转载,请注明来源于51blog.xyz并保留原文链接:https://www.51blog.xyz/article/135.html

文章归档

推荐文章

buildadmin logo
Thinkphp8 Vue3 Element PLus TypeScript Vite Pinia

🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。

热门标签

PHP ThinkPHP ThinkPHP5.1 Go Mysql Mysql5.7 Redis Linux CentOS7 Git HTML CSS CSS3 Javascript JQuery Vue LayUI VMware Uniapp 微信小程序 docker wiki Confluence7 学习笔记 uView ES6 Ant Design Pro of Vue React ThinkPHP6.0 chrome 扩展 翻译工具 Nuxt SSR 服务端渲染 scrollreveal.js ThinkPHP8.0 Mac webman 跨域CORS vscode GitHub ECharts Canvas vue3 three.js 微信支付 PHP全栈开发 Python AI 人工智能 AI生成 工作经验 实战笔记