为了防止多个进程同时打印或操作同一资源的数据错乱,可以使用 Lock。
数据错乱演示:
import time
from multiprocessing import Process
def speak():
for index in range(10):
print('好好', end='')
print('学习', end='')
print('天天', end='')
print('向上')
time.sleep(1)
def study():
for index in range(15):
print('A', end='')
print('B', end='')
print('C', end='')
print('D')
time.sleep(1)
if __name__ == '__main__':
print('我是主进程中的【第一行】打印')
p1 = Process(target=speak)
p2 = Process(target=study)
p1.start()
p2.start()
print('我是主进程中的【最后一行】打印')
多次运行上面的代码,会出现(不是每次都出现)类似如下的异常:

❏ 传统用法:
lock.acquire() # 上锁
# ... 临界区代码 ...
lock.release() # 释放锁
❏ 上下文管理器用法(推荐):
with lock:
# 自动上锁,退出块时自动释放
# 好处:即便发生异常也能保证释放锁,避免死锁
上面的【数据错乱演示】代码增加 Lock 调整后的代码如下:
import time
from multiprocessing import Process, Lock
def speak(process_lock):
for index in range(10):
# 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待
process_lock.acquire()
print('好好', end='')
print('学习', end='')
print('天天', end='')
print('向上')
# 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等)
process_lock.release()
time.sleep(1)
def study(process_lock):
for index in range(15):
# with process_lock: 会自动完成两件事:
# (1)进入前:自动执行 process_lock.acquire() 上锁
# (2)离开后:自动执行 process_lock.release() 释放锁
# 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”
with process_lock:
print('A', end='')
print('B', end='')
print('C', end='')
print('D')
time.sleep(1)
if __name__ == '__main__':
print('我是主进程中的【第一行】打印')
lock = Lock()
p1 = Process(target=speak, args=(lock,))
p2 = Process(target=study, args=(lock,))
p1.start()
p2.start()
print('我是主进程中的【最后一行】打印')
传统 Lock 在面对多次上锁时,处理不当可能会产生死锁状态,比如:连续两次『上锁』期间『没有释放前一次的锁』,就会造成死锁。原因如下:
上锁的逻辑是:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待。
所以,第一次上锁后,锁被占用,第二次上锁之前,上一次的锁没有被释放,继续上锁就会导致当前进程一直处于阻塞等待状态。
死锁演示代码:(下面只是为了演示死锁效果,实际代码中不可能连续两行进行上锁,但是实际业务中多人开发代码嵌套复用,就有可能出现连续上锁导致死锁的场景)
import time
from multiprocessing import Process, Lock
def speak(process_lock):
for index in range(10):
# 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待
process_lock.acquire()
process_lock.acquire() # 这一行就会造成死锁,因为它会一直等上一行上的锁被释放
print('好好', end='')
print('学习', end='')
print('天天', end='')
print('向上')
# 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等)
process_lock.release()
process_lock.release()
time.sleep(1)
def study(process_lock):
for index in range(15):
# with process_lock: 会自动完成两件事:
# (1)进入前:自动执行 process_lock.acquire() 上锁
# (2)离开后:自动执行 process_lock.release() 释放锁
# 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”
with process_lock:
print('A', end='')
print('B', end='')
print('C', end='')
print('D')
time.sleep(1)
if __name__ == '__main__':
print('我是主进程中的【第一行】打印')
lock = Lock()
p1 = Process(target=speak, args=(lock,))
p2 = Process(target=study, args=(lock,))
p1.start()
p2.start()
print('我是主进程中的【最后一行】打印')
上面代码运行结果如下:

针对这个问题解决办法就是使用RLock替代Lock。
对于上面死锁的问题,解决办法只需要在定义进程的时候,把传统的Lock替换成RLock即可,完整示例代码如下:
import time
from multiprocessing import Process, RLock
def speak(process_lock):
for index in range(10):
# 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待
process_lock.acquire()
process_lock.acquire() # 这一行就会造成死锁,因为它会一直等上一行上的锁被释放
print('好好', end='')
print('学习', end='')
print('天天', end='')
print('向上')
# 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等)
process_lock.release() # 注意:上了几次锁,这里就要释放几次锁
process_lock.release()
time.sleep(1)
def study(process_lock):
for index in range(15):
# with process_lock: 会自动完成两件事:
# (1)进入前:自动执行 process_lock.acquire() 上锁
# (2)离开后:自动执行 process_lock.release() 释放锁
# 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”
with process_lock:
print('A', end='')
print('B', end='')
print('C', end='')
print('D')
time.sleep(1)
if __name__ == '__main__':
print('我是主进程中的【第一行】打印')
lock = RLock()
p1 = Process(target=speak, args=(lock,))
p2 = Process(target=study, args=(lock,))
p1.start()
p2.start()
print('我是主进程中的【最后一行】打印')
注意:如果是传统上锁的写法,上了几次锁,就必须对应释放几次锁。
process_lock.acquire()
process_lock.acquire()
# ... 其他代码 ...
# 注意:上了几次锁,这里就要释放几次锁
process_lock.release()
process_lock.release()
❏ 作用
阻塞当前进程(通常是主进程),直到调用join的那个进程执行完毕。
❏ 语法
join(timeout),参数timeout为超时时间(单位:秒),该参数是可选的,表示等多久。
如果时间到了进程还没结束,主进程就不等了,会继续执行。
❏ 注意
join必须在start()之后调用。p.join()不是让进程 p 等,而是让“执行这行 join 代码的那个进程”去等,等的是 p 这个进程。timeout所指定的时间后,进程并不会终止,只是“不再等”了。import os
import time
from multiprocessing import Process
def speak():
for index in range(10):
print(f'我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
def study():
for index in range(15):
print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
print('我是主进程中的【第一行】打印')
p1 = Process(target=speak)
p2 = Process(target=study)
p1.start()
p1.join(5) # 让主进程等 p1 5秒钟
p2.start()
# p1.join() # 让主进程等 p1 进程执行完毕后,主进程在继续执行
# p2.join() # 让主进程等 p2 进程执行完毕后,主进程再继续执行
print('我是主进程中的【最后一行】打印')
建议:尝试在不同的位置使用join,并观察运行结果。
❏ 作用
向操作系统申请强制终止进程。
❏ 注意
使用terminate终止进程, 不会执行finally代码块。
❏ 辅助方法
is_alive()可用于查看进程是否还“活着”。
import os
import time
from multiprocessing import Process
def speak():
try:
for index in range(10):
print(f'我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
finally:
print('我是finally里的逻辑')
def study():
for index in range(15):
print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
time.sleep(1)
if __name__ == '__main__':
print('我是主进程中的【第一行】打印')
p1 = Process(target=speak)
p2 = Process(target=study)
p1.start()
p2.start()
# 3 秒后终止 p1 进程
time.sleep(3)
print('xxxxx 终止了p1进程....')
# 向操作系统申请强制终止p1进程
p1.terminate()
# 等操作系统彻底终止掉了p1进程
p1.join()
# 查看p1进程是否还“活着”
print(p1.is_alive()) # 由于p1.terminate() 方法是异步的,需要先调用 p1.join() 在检查进程存活状态
print('我是主进程中的【最后一行】打印')
Python中守护进程是一种“依附于主进程存在的子进程”,一旦主进程结束,它就会被自动终止。
它的核心行为是『自动结束』:一旦父进程代码执行完毕,守护进程就会立即被强制终止,无论它自身的任务是否完成。
基础示例:
import time
from multiprocessing import Process
def background_task():
index: int = 1
while True:
print(f'[Daemon] 守护进程工作中{index}')
time.sleep(1)
index += 1
def main():
p = Process(target=background_task, daemon=True)
p.start()
for index in range(3):
print(f'[Main] 主进程工作了 {index + 1} 秒')
time.sleep(1)
print(f'[Main] 主进程工作结束@@')
if __name__ == '__main__':
main()
运行上面的代码,会发现程序在3秒后就退出了,守护进程也随之被强制终止,具体效果如下:

注意点:
start之前,start()之后,不能再设置daemon。有时候,即使父进程代码执行完了,我们看起来主进程还没退出,这通常是因为还有其他的非守护进程在运行。主进程会等待所有非守护进程结束,而守护进程则会随着父进程(此时是代码执行结束的主进程)被“清理”掉。
守护进程适用于非关键、辅助性、可随时丢弃的任务。
注意:不适用于任何关键数据操纵、必须完成的结算任务、需要可靠关闭的网络服务。
import os
import time
from multiprocessing import Process
# 定义【守护进程】业务逻辑
def monitor():
while True:
try:
with open('log.txt', 'r', encoding='utf-8') as f:
lines = sum(1 for _ in f)
except FileNotFoundError:
lines = 0
print(f'我是【守护进程({os.getpid()})】,log.txt 共有{lines}行')
time.sleep(1)
#定义【非守护进程】业务逻辑
def test():
for index in range(30):
print(f'我是test({os.getpid()}), {index}')
time.sleep(1)
if __name__ == '__main__':
print(f'我是主进程({os.getpid()})中的【第一行】代码')
# 创建 p1 守护进程
p1 = Process(target=monitor, daemon=True)
# 创建 p2 非守护进程
p2 = Process(target=test)
p1.start()
p2.start()
# 记录日志:向文件中写入数据
with open('log.txt', 'a', encoding='utf-8') as f:
for index in range(10):
f.write(f'测试日志{index}\n')
f.flush()
time.sleep(1)
print(f'我是主进程({os.getpid()})中的【最后一行】代码')
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。