每个进程都有自己的独立内存空间,进程之间不共享内存,因此也就不共享任何变量。
验证代码如下:
验证代码 ➊
from multiprocessing import Process
num = 100
names = []
def test1():
global num, names
num += 10
names.append('张三')
print(f'我是 test1 进程,操作之后的 num 是{num}, names 是:{names}')
def test2():
global num, names
num += 22
names.append('李四')
print(f'我是 test2 进程,操作之后的 num 是{num}, names 是:{names}')
if __name__ == '__main__':
print('主进程中的【第一行】代码')
p1 = Process(target=test1)
p2 = Process(target=test2)
p1.start()
p2.start()
p1.join()
p2.join()
print('主进程中的【最后一行】代码', num, names)
验证代码 ➋
from multiprocessing import Process
def test1(num, names):
num += 10
names.append('张三')
print(f'我是 test1 进程,操作之后的 num 是{num}, names 是:{names}')
def test2(num, names):
num += 22
names.append('李四')
print(f'我是 test2 进程,操作之后的 num 是{num}, names 是:{names}')
if __name__ == '__main__':
print('主进程中的【第一行】代码')
num = 100
names = []
p1 = Process(target=test1, args=(num, names))
p2 = Process(target=test2, args=(num, names))
p1.start()
p2.start()
p1.join()
p2.join()
print('主进程中的【最后一行】代码', num, names)
注意:
我们正常编写的变量,在多个进程之间是不共享的,但有些对象是天然被多个进程共享的,比如:我们之前讲过的 Lock 和 RLock,我们后面还会学习到很多天然被多个进程所共享的对象,例如:
multiprocessing.Queue。
我们之前学过list、tuple、dict,它们的特点是:数据想怎么拿就怎么拿。
队列(Queue)是一种“先进先出”的数据结构(先放进去的数据,一定会先取出来)。
下面介绍常见队列操作:
创建队列之前需要先导入multiprocessing.Queue模块:
from multiprocessing import Queue
注意:千万不要导入from queue import Queue,这个模块具备跨进程共享数据的能力。
q1 = Queue()
q2 = Queue(3) # 队列 q2 最多能保存3个元素
q.put(value)方法往队列里放数据(也称:入队)。
q1.put(10)
q1.put(20)
q1.put(30)
注意,put队列具备等待模式:
put,就会进入等待模式,阻塞当前进程,直到别的进程给从该队列 get 了元素,才能结束等待put成功。put(元素, timeout=秒数)的第二个参数 timeout 指定等待的秒数,超时还不能入队则抛出异常queue.Full。put_nowait(元素)方法,会直接向队列中添加元素,不会进入等待模式,若队列已满,会抛出异常queue.Full。put_nowait等价于put(元素, block=False),block 的默认值为 True。q2 = Queue(3)
q2.put(1)
q2.put(2)
q2.put(3)
# 队列已满,下面介绍几种等待状态的不同入队写法
# 进入等待,阻塞进程
# q2.put(4)
# 等待3秒后抛出异常
# q2.put(4, timeout=3)
# 不等待,直接抛出异常
# q2.put_nowait(4)
# 不等待,直接抛出异常
q2.put(4, block=False)
q.get()方法从队列里取出数据(也称:出队)。
value1 = q1.get()
value2 = q1.get()
value3 = q1.get()
print(value1, value2, value3)
注意,get队列也具备等待模式:
get,就会进入等待模式,阻塞当前进程,直到别的进程给该队列 put 了元素,才能结束等待get成功。get(timeout=秒数)的第二个参数 timeout 指定等待的秒数,超时还不能 get 成功则抛出异常queue.Empty。get_nowait()方法,会直接向队列读取元素,不会进入等待模式,若队列已空,会抛出异常queue.Empty。get_nowait等价于get(block=False),block 的默认值为 True。q2 = Queue(3)
q2.put(1)
q2.put(2)
q2.put(3)
q2.get()
q2.get()
q2.get()
# 队列已空,下面介绍几种等待状态的不同get写法
# 进入等待,阻塞进程
# q2.get()
# 等待3秒后抛出异常
# q2.get(timeout=3)
# 不等待,直接抛出异常
# q2.get_nowait()
# 不等待,直接抛出异常
q2.get(block=False)
q.empty()方法判断队列是否为空。
result = q1.empty()
print(result)
q.full()方法判断队列是否已满。
result = q1.full()
print(result)
q.qsize()方法获取队列的长度。
result = q1.qsize()
print(result)
下面的代码实现逻辑为:当队列满了以后,再次put会等待,当有人从队列中取出元素后,put会继续。
import time
from multiprocessing import Queue, Process
def test(q):
time.sleep(3)
result = q.get()
print(f'test 进程从队列中取出了一个元素:{result}')
def main():
# 创建一个队列,长度为2
q = Queue(2)
# 填满队列
q.put('aaa')
q.put('bbb')
print(f'队列是否已满检查结果:{q.full()}')
# 创建子进程
p1 = Process(target=test, args=(q,))
p1.start()
print('向队列中添加一个元素……')
q.put('CCC')
print('目前队列中元素是:')
print(q.get())
print(q.get())
if __name__ == '__main__':
main()
通过队列可以解决进程之间不共享数据的问题,因为multiprocessing.Queue是跨进程的,也就是队列可以实现进程间的通信。
核心思想:一个进程负责生产数据,另一个进程负责消费数据,中间通过队列进行“传话”。
import time
from multiprocessing import Process, Queue
# 入队进程
def test1(q):
for index in range(5):
print(f'++ 入队 -- {index}')
q.put(index)
time.sleep(1)
# 出队进程
def test2(q):
for _ in range(5):
data = q.get()
print(f'-- 出队 -- {data}')
time.sleep(1)
if __name__ == '__main__':
q = Queue()
p1 = Process(target=test1, args=(q,))
p2 = Process(target=test2, args=(q,))
# 先入队,再出队
# p1.start()
# p1.join()
# p2.start()
# p2.join()
# 边入队边出队
p1.start()
p2.start()
p1.join()
p2.join()
Pipe是multiprocessing模块提供的进程间通信(IPC)工具,用于在两个进程之间建立一条管道,实现双向或单向的数据传递。它比Queue更轻量,适合简单的点对点通信。
创建管道的语法:
from multiprocessing import Pipe
conn1, conn2 = Pipe(duplex=True)
Pipe()会返回一对连接对象,它们分别代表管道的两端。
参数duplex用于控制管道为单向还是双向,True 表示双向,False 表示单向。
Pipe的规则:一端仅能发送,另一端仅能接收。Pipe的规则:两端都可以发送和接收。发送和接收的方法:
send()方法:向管道中发送数据。recv()方法:从管道中接收数据。❏ 数据序列化限制
pickle序列化(如基本类型、列表、字典、自定义类的实例等)。# 发送元组 (command, data)
conn.send(("sum", [1, 2, 3]))
conn.send(("exit", None))
# 接收方
while True:
cmd, data = conn.recv()
if cmd == "sum":
conn.send(sum(data))
elif cmd == "exit":
break
❏ 阻塞与死锁
recv()会阻塞直到有数据。如果双方都在等待接收而没有发送,就会死锁。recv()(除非使用非阻塞或超时)。poll(timout)检查是否有数据可读,避免无限阻塞。if conn.poll(1): # 等待1秒
msg = conn.recv()
else:
# 超时处理
❏ EOFError
recv()会抛出EOFError。(注意:操作系统只有在所有写入端关闭时,才会向接收端发送EOF信号触发EOFError异常)❏ 关闭不需要的端口
EOFError。from multiprocessing import Process, Pipe
def sender(conn):
for msg in ['你好', '任务完成']:
conn.send(msg)
print(f'发送了数据:{msg}')
conn.close() # 发送完成后关闭连接
print('数据发送任务完成,关闭连接')
def receiver(conn):
while True:
try:
print(f'开始接收数据....')
msg = conn.recv()
print(f'接收到:{msg}')
except EOFError: # 当发送端关闭时触发
break
conn.close()
print('接收数据任务结束,关闭连接')
if __name__ == '__main__':
print('---------- Main Proc Start ----------')
# 创建管道
conn1, conn2 = Pipe()
# 子进程使用 conn2 发送,主进程使用 conn1 接收
p = Process(target=sender, args=(conn2,))
p.start()
conn2.close() # 注意:主进程在启动子进程后立即关闭未使用的写入端,否则sender内conn.close无法触发抛出EOFError异常
receiver(conn1)
p.join()
print('---------- Main Proc End ----------')
特别注意:
每个进程只应持有自己需要的连接端,其他连接端必须立即关闭。这是避免 Pipe 阻塞问题的黄金法则。
如上面的代码中如果不在主进程增加
conn2.close()则会产生阻塞。
主进程:创建管道后,立即关闭『传递给子进程,但自身未使用』的连接端。
子进程:使用完连接后必须close(),避免资源泄露。
接收端逻辑:必须用try-except EOFError检测通信结束,不能依赖空消息判断。
多进程场景:Pipe 仅支持两个进程,多进程请改用Queue 或 Manager。
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。