python快速入门学习笔记(进阶篇)三十一:进程(三)进程间通信

  • 原创
  • 作者:程序员三丰
  • 发布时间:2026-05-28 12:16
  • 浏览量:6
Python入门第三十一课,主要是学习了进程间通信的两种通信方式:队列(Queue)和管道(Pipe)。

进程之间不共享变量

每个进程都有自己的独立内存空间,进程之间不共享内存,因此也就不共享任何变量。

  • 进程是操作系统资源分配的最小单位,每个进程拥有独立的地址空间。
  • 这种隔离提高了稳定性和安全性:一个进程崩溃不会直接影响其他进程。

验证代码如下:

验证代码 ➊

from multiprocessing import Process

num = 100
names = []

def test1():
    global num, names
    num += 10
    names.append('张三')
    print(f'我是 test1 进程,操作之后的 num 是{num}, names 是:{names}')


def test2():
    global num, names
    num += 22
    names.append('李四')
    print(f'我是 test2 进程,操作之后的 num 是{num}, names 是:{names}')

if __name__ == '__main__':
    print('主进程中的【第一行】代码')

    p1 = Process(target=test1)
    p2 = Process(target=test2)

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print('主进程中的【最后一行】代码', num, names)

验证代码 ➋

from multiprocessing import Process

def test1(num, names):
    num += 10
    names.append('张三')
    print(f'我是 test1 进程,操作之后的 num 是{num}, names 是:{names}')


def test2(num, names):
    num += 22
    names.append('李四')
    print(f'我是 test2 进程,操作之后的 num 是{num}, names 是:{names}')

if __name__ == '__main__':
    print('主进程中的【第一行】代码')

    num = 100
    names = []

    p1 = Process(target=test1, args=(num, names))
    p2 = Process(target=test2, args=(num, names))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print('主进程中的【最后一行】代码', num, names)

注意

我们正常编写的变量,在多个进程之间是不共享的,但有些对象是天然被多个进程共享的,比如:我们之前讲过的 Lock 和 RLock,我们后面还会学习到很多天然被多个进程所共享的对象,例如:multiprocessing.Queue

Queue 队列

我们之前学过listtupledict,它们的特点是:数据想怎么拿就怎么拿。

队列(Queue)是一种“先进先出”的数据结构(先放进去的数据,一定会先取出来)。

下面介绍常见队列操作:

创建队列

创建队列之前需要先导入multiprocessing.Queue模块:

from multiprocessing import Queue

注意:千万不要导入from queue import Queue,这个模块具备跨进程共享数据的能力。

  • 创建一个队列,不限制大小(即:不设置容量上限)
q1 = Queue()
  • 创建一个队列,限制大小(即:设置最多能保存的元素个数)
q2 = Queue(3) # 队列 q2 最多能保存3个元素

put 方法(入队)

q.put(value)方法往队列里放数据(也称:入队)。

q1.put(10)
q1.put(20)
q1.put(30)

注意,put队列具备等待模式:

  • 当队列已满,继续put,就会进入等待模式,阻塞当前进程,直到别的进程给从该队列 get 了元素,才能结束等待put成功。
  • 通过put(元素, timeout=秒数)的第二个参数 timeout 指定等待的秒数,超时还不能入队则抛出异常queue.Full
  • 使用put_nowait(元素)方法,会直接向队列中添加元素,不会进入等待模式,若队列已满,会抛出异常queue.Fullput_nowait等价于put(元素, block=False),block 的默认值为 True。
q2 = Queue(3)

q2.put(1)
q2.put(2)
q2.put(3)

# 队列已满,下面介绍几种等待状态的不同入队写法

# 进入等待,阻塞进程
# q2.put(4)

# 等待3秒后抛出异常
# q2.put(4, timeout=3)

# 不等待,直接抛出异常
# q2.put_nowait(4)

# 不等待,直接抛出异常
q2.put(4, block=False)

get 方法(出队)

q.get()方法从队列里取出数据(也称:出队)。

value1 = q1.get()
value2 = q1.get()
value3 = q1.get()
print(value1, value2, value3)

注意,get队列也具备等待模式:

  • 当队列已空,继续get,就会进入等待模式,阻塞当前进程,直到别的进程给该队列 put 了元素,才能结束等待get成功。
  • 通过get(timeout=秒数)的第二个参数 timeout 指定等待的秒数,超时还不能 get 成功则抛出异常queue.Empty
  • 使用get_nowait()方法,会直接向队列读取元素,不会进入等待模式,若队列已空,会抛出异常queue.Emptyget_nowait等价于get(block=False),block 的默认值为 True。
q2 = Queue(3)

q2.put(1)
q2.put(2)
q2.put(3)

q2.get()
q2.get()
q2.get()

# 队列已空,下面介绍几种等待状态的不同get写法

# 进入等待,阻塞进程
# q2.get()

# 等待3秒后抛出异常
# q2.get(timeout=3)

# 不等待,直接抛出异常
# q2.get_nowait()

# 不等待,直接抛出异常
q2.get(block=False)

empty 方法

q.empty()方法判断队列是否为空。

result = q1.empty()
print(result)

full 方法

q.full()方法判断队列是否已满。

result = q1.full()
print(result)

qsize 方法

q.qsize()方法获取队列的长度。

result = q1.qsize()
print(result)

多进程演示队列操作

下面的代码实现逻辑为:当队列满了以后,再次put会等待,当有人从队列中取出元素后,put会继续。

import time
from multiprocessing import Queue, Process


def test(q):
    time.sleep(3)
    result = q.get()
    print(f'test 进程从队列中取出了一个元素:{result}')


def main():
    # 创建一个队列,长度为2
    q = Queue(2)
    # 填满队列
    q.put('aaa')
    q.put('bbb')
    print(f'队列是否已满检查结果:{q.full()}')

    # 创建子进程
    p1 = Process(target=test, args=(q,))
    p1.start()

    print('向队列中添加一个元素……')
    q.put('CCC')

    print('目前队列中元素是:')
    print(q.get())
    print(q.get())


if __name__ == '__main__':
    main()

使用 Queue 实现进程通信

通过队列可以解决进程之间不共享数据的问题,因为multiprocessing.Queue是跨进程的,也就是队列可以实现进程间的通信。

核心思想:一个进程负责生产数据,另一个进程负责消费数据,中间通过队列进行“传话”。

import time
from multiprocessing import Process, Queue


# 入队进程
def test1(q):
    for index in range(5):
        print(f'++ 入队 -- {index}')
        q.put(index)
        time.sleep(1)


# 出队进程
def test2(q):
    for _ in range(5):
        data = q.get()
        print(f'-- 出队 -- {data}')
        time.sleep(1)


if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=test1, args=(q,))
    p2 = Process(target=test2, args=(q,))

    # 先入队,再出队
    # p1.start()
    # p1.join()
    # p2.start()
    # p2.join()

    # 边入队边出队
    p1.start()
    p2.start()
    p1.join()
    p2.join()

使用 Pipe 实现进程通信

Pipe 介绍

Pipemultiprocessing模块提供的进程间通信(IPC)工具,用于在两个进程之间建立一条管道,实现双向或单向的数据传递。它比Queue更轻量,适合简单的点对点通信。

使用语法

创建管道的语法:

from multiprocessing import Pipe

conn1, conn2 = Pipe(duplex=True)

Pipe()会返回一对连接对象,它们分别代表管道的两端。

参数duplex用于控制管道为单向还是双向,True 表示双向,False 表示单向。

  • 单向Pipe的规则:一端仅能发送,另一端仅能接收。
  • 双向Pipe的规则:两端都可以发送和接收。

发送和接收的方法:

  • send()方法:向管道中发送数据。
  • recv()方法:从管道中接收数据。

注意事项

❏ 数据序列化限制

  • 发送的对象必须能被pickle序列化(如基本类型、列表、字典、自定义类的实例等)。
# 发送元组 (command, data)
conn.send(("sum", [1, 2, 3]))
conn.send(("exit", None))

# 接收方
while True:
    cmd, data = conn.recv()
    if cmd == "sum":
        conn.send(sum(data))
    elif cmd == "exit":
        break
  • 某些对象(如文件句柄、数据库连接)无法通过管道传递。

❏ 阻塞与死锁

  • recv()会阻塞直到有数据。如果双方都在等待接收而没有发送,就会死锁。
  • 避免在管道的两个端口上都同时调用recv()(除非使用非阻塞或超时)。
  • 可使用poll(timout)检查是否有数据可读,避免无限阻塞。
if conn.poll(1):   # 等待1秒
    msg = conn.recv()
else:
    # 超时处理

❏ EOFError

  • 当管道的另一端关闭,且没有数据可读时,recv()会抛出EOFError。(注意:操作系统只有在所有写入端关闭时,才会向接收端发送EOF信号触发EOFError异常)
  • 通常通过捕获异常来优雅退出。

❏ 关闭不需要的端口

  • 在每个进程中,应显式关闭不使用的管道端口。这可以避免资源泄露,并让另一端在关闭时正确触发EOFError

示例代码

from multiprocessing import Process, Pipe


def sender(conn):
    for msg in ['你好', '任务完成']:
        conn.send(msg)
        print(f'发送了数据:{msg}')
    conn.close()  # 发送完成后关闭连接
    print('数据发送任务完成,关闭连接')


def receiver(conn):
    while True:
        try:
            print(f'开始接收数据....')
            msg = conn.recv()
            print(f'接收到:{msg}')
        except EOFError:  # 当发送端关闭时触发
            break
    conn.close()
    print('接收数据任务结束,关闭连接')


if __name__ == '__main__':
    print('---------- Main Proc Start ----------')

    # 创建管道
    conn1, conn2 = Pipe()

    # 子进程使用 conn2 发送,主进程使用 conn1 接收
    p = Process(target=sender, args=(conn2,))
    p.start()
    conn2.close()  # 注意:主进程在启动子进程后立即关闭未使用的写入端,否则sender内conn.close无法触发抛出EOFError异常
    receiver(conn1)
    p.join()

    print('---------- Main Proc End ----------')

特别注意:

每个进程只应持有自己需要的连接端,其他连接端必须立即关闭。这是避免 Pipe 阻塞问题的黄金法则。

如上面的代码中如果不在主进程增加conn2.close()则会产生阻塞。

最佳实践总结

主进程:创建管道后,立即关闭『传递给子进程,但自身未使用』的连接端。

子进程:使用完连接后必须close(),避免资源泄露。

接收端逻辑:必须用try-except EOFError检测通信结束,不能依赖空消息判断。

多进程场景:Pipe 仅支持两个进程,多进程请改用QueueManager

声明:本文为原创文章,51blog.xyz和作者拥有版权,如需转载,请注明来源于51blog.xyz并保留原文链接:https://www.51blog.xyz/article/133.html

文章归档

推荐文章

buildadmin logo
Thinkphp8 Vue3 Element PLus TypeScript Vite Pinia

🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。

热门标签

PHP ThinkPHP ThinkPHP5.1 Go Mysql Mysql5.7 Redis Linux CentOS7 Git HTML CSS CSS3 Javascript JQuery Vue LayUI VMware Uniapp 微信小程序 docker wiki Confluence7 学习笔记 uView ES6 Ant Design Pro of Vue React ThinkPHP6.0 chrome 扩展 翻译工具 Nuxt SSR 服务端渲染 scrollreveal.js ThinkPHP8.0 Mac webman 跨域CORS vscode GitHub ECharts Canvas vue3 three.js 微信支付 PHP全栈开发 Python AI 人工智能 AI生成 工作经验 实战笔记