python快速入门学习笔记(进阶篇)三十:进程与线程(二)进程控制 —— 进程锁、阻塞进程、终止进程、守护进程

  • 原创
  • 作者:程序员三丰
  • 发布时间:2026-05-26 14:25
  • 浏览量:3
Python入门第三十课,主要是学习了几种控制进程的方式,如何添加/释放进程锁,阻塞进程,终止进程,以及守护进程。

进程锁(Lock)

为什么需要 Lock?

为了防止多个进程同时打印或操作同一资源的数据错乱,可以使用 Lock。

数据错乱演示:

import time
from multiprocessing import Process


def speak():
    for index in range(10):
        print('好好', end='')
        print('学习', end='')
        print('天天', end='')
        print('向上')
        time.sleep(1)


def study():
    for index in range(15):
        print('A', end='')
        print('B', end='')
        print('C', end='')
        print('D')
        time.sleep(1)


if __name__ == '__main__':
    print('我是主进程中的【第一行】打印')
    p1 = Process(target=speak)
    p2 = Process(target=study)
    p1.start()
    p2.start()
    print('我是主进程中的【最后一行】打印')

多次运行上面的代码,会出现(不是每次都出现)类似如下的异常:

Lock 的两种写法

❏ 传统用法:

lock.acquire() # 上锁
# ... 临界区代码 ...
lock.release() # 释放锁

上下文管理器用法(推荐):

with lock:
    # 自动上锁,退出块时自动释放
    # 好处:即便发生异常也能保证释放锁,避免死锁

Lock 的使用

上面的【数据错乱演示】代码增加 Lock 调整后的代码如下:

import time
from multiprocessing import Process, Lock

def speak(process_lock):
    for index in range(10):
        # 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待
        process_lock.acquire()
        print('好好', end='')
        print('学习', end='')
        print('天天', end='')
        print('向上')
        # 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等)
        process_lock.release()
        time.sleep(1)


def study(process_lock):
    for index in range(15):
        # with process_lock: 会自动完成两件事:
        #   (1)进入前:自动执行 process_lock.acquire() 上锁
        #   (2)离开后:自动执行 process_lock.release() 释放锁
        # 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”
        with process_lock:
            print('A', end='')
            print('B', end='')
            print('C', end='')
            print('D')
        time.sleep(1)


if __name__ == '__main__':
    print('我是主进程中的【第一行】打印')
    lock = Lock()
    p1 = Process(target=speak, args=(lock,))
    p2 = Process(target=study, args=(lock,))
    p1.start()
    p2.start()
    print('我是主进程中的【最后一行】打印')

Lock 的问题(死锁)

传统 Lock 在面对多次上锁时,处理不当可能会产生死锁状态,比如:连续两次『上锁』期间『没有释放前一次的锁』,就会造成死锁。原因如下:

上锁的逻辑是:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待。

所以,第一次上锁后,锁被占用,第二次上锁之前,上一次的锁没有被释放,继续上锁就会导致当前进程一直处于阻塞等待状态。

死锁演示代码:(下面只是为了演示死锁效果,实际代码中不可能连续两行进行上锁,但是实际业务中多人开发代码嵌套复用,就有可能出现连续上锁导致死锁的场景)

import time
from multiprocessing import Process, Lock


def speak(process_lock):
    for index in range(10):
        # 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待
        process_lock.acquire()
        process_lock.acquire() # 这一行就会造成死锁,因为它会一直等上一行上的锁被释放
        print('好好', end='')
        print('学习', end='')
        print('天天', end='')
        print('向上')
        # 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等)
        process_lock.release()
        process_lock.release()
        time.sleep(1)


def study(process_lock):
    for index in range(15):
        # with process_lock: 会自动完成两件事:
        #   (1)进入前:自动执行 process_lock.acquire() 上锁
        #   (2)离开后:自动执行 process_lock.release() 释放锁
        # 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”
        with process_lock:
            print('A', end='')
            print('B', end='')
            print('C', end='')
            print('D')
        time.sleep(1)


if __name__ == '__main__':
    print('我是主进程中的【第一行】打印')
    lock = Lock()
    p1 = Process(target=speak, args=(lock,))
    p2 = Process(target=study, args=(lock,))
    p1.start()
    p2.start()
    print('我是主进程中的【最后一行】打印')

上面代码运行结果如下:

针对这个问题解决办法就是使用RLock替代Lock

RLock 解决死锁

对于上面死锁的问题,解决办法只需要在定义进程的时候,把传统的Lock替换成RLock即可,完整示例代码如下:

import time
from multiprocessing import Process, RLock


def speak(process_lock):
    for index in range(10):
        # 上锁:如果锁是空的,立刻上锁,继续往下执行;如果锁被别人拿着,当前进程会阻塞等待
        process_lock.acquire()
        process_lock.acquire() # 这一行就会造成死锁,因为它会一直等上一行上的锁被释放
        print('好好', end='')
        print('学习', end='')
        print('天天', end='')
        print('向上')
        # 释放锁:acquire 和 release 必须成对出现,否则会永远卡主(死等)
        process_lock.release() # 注意:上了几次锁,这里就要释放几次锁
        process_lock.release()
        time.sleep(1)


def study(process_lock):
    for index in range(15):
        # with process_lock: 会自动完成两件事:
        #   (1)进入前:自动执行 process_lock.acquire() 上锁
        #   (2)离开后:自动执行 process_lock.release() 释放锁
        # 好处:即便代码里发生异常,也能保证释放锁,避免“卡死”
        with process_lock:
            print('A', end='')
            print('B', end='')
            print('C', end='')
            print('D')
        time.sleep(1)


if __name__ == '__main__':
    print('我是主进程中的【第一行】打印')
    lock = RLock()
    p1 = Process(target=speak, args=(lock,))
    p2 = Process(target=study, args=(lock,))
    p1.start()
    p2.start()
    print('我是主进程中的【最后一行】打印')

注意:如果是传统上锁的写法,上了几次锁,就必须对应释放几次锁。

process_lock.acquire()
process_lock.acquire() 

# ... 其他代码 ...

# 注意:上了几次锁,这里就要释放几次锁
process_lock.release()
process_lock.release()

阻塞进程 join 方法

❏ 作用

阻塞当前进程(通常是主进程),直到调用join的那个进程执行完毕。

❏ 语法

join(timeout),参数timeout为超时时间(单位:秒),该参数是可选的,表示等多久。

如果时间到了进程还没结束,主进程就不等了,会继续执行。

❏ 注意

  • join必须在start()之后调用。
  • p.join()不是让进程 p 等,而是让“执行这行 join 代码的那个进程”去等,等的是 p 这个进程。
  • 当达到了timeout所指定的时间后,进程并不会终止,只是“不再等”了。
import os
import time
from multiprocessing import Process

def speak():
    for index in range(10):
        print(f'我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
        time.sleep(1)

def study():
    for index in range(15):
        print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
        time.sleep(1)

if __name__ == '__main__':
    print('我是主进程中的【第一行】打印')
    p1 = Process(target=speak)
    p2 = Process(target=study)

    p1.start()
    p1.join(5) # 让主进程等 p1 5秒钟

    p2.start()

    # p1.join() # 让主进程等 p1 进程执行完毕后,主进程在继续执行
    # p2.join() # 让主进程等 p2 进程执行完毕后,主进程再继续执行
    print('我是主进程中的【最后一行】打印')

建议:尝试在不同的位置使用join,并观察运行结果。

终止进程 terminate 方法

❏ 作用

向操作系统申请强制终止进程。

注意

使用terminate终止进程, 不会执行finally代码块。

❏ 辅助方法

is_alive()可用于查看进程是否还“活着”。

import os
import time
from multiprocessing import Process

def speak():
    try:
        for index in range(10):
            print(f'我在说话{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
            time.sleep(1)
    finally:
        print('我是finally里的逻辑')


def study():
    for index in range(15):
        print(f'我在学习{index},进程pid是:{os.getpid()},我的父进程是:{os.getppid()}')
        time.sleep(1)


if __name__ == '__main__':
    print('我是主进程中的【第一行】打印')
    p1 = Process(target=speak)
    p2 = Process(target=study)
    p1.start()
    p2.start()

    # 3 秒后终止 p1 进程
    time.sleep(3)
    print('xxxxx 终止了p1进程....')
    # 向操作系统申请强制终止p1进程
    p1.terminate()
    # 等操作系统彻底终止掉了p1进程
    p1.join()
    # 查看p1进程是否还“活着”
    print(p1.is_alive())  # 由于p1.terminate() 方法是异步的,需要先调用 p1.join() 在检查进程存活状态

    print('我是主进程中的【最后一行】打印')

守护进程(Daemon)

什么是守护进程?

Python中守护进程是一种“依附于主进程存在的子进程”,一旦主进程结束,它就会被自动终止。

它的核心行为是『自动结束』:一旦父进程代码执行完毕,守护进程就会立即被强制终止,无论它自身的任务是否完成。

基础示例:

import time
from multiprocessing import Process


def background_task():
    index: int = 1
    while True:
        print(f'[Daemon] 守护进程工作中{index}')
        time.sleep(1)
        index += 1


def main():
    p = Process(target=background_task, daemon=True)
    p.start()
    for index in range(3):
        print(f'[Main] 主进程工作了 {index + 1} 秒')
        time.sleep(1)
    print(f'[Main] 主进程工作结束@@')

if __name__ == '__main__':
    main()

运行上面的代码,会发现程序在3秒后就退出了,守护进程也随之被强制终止,具体效果如下:

注意点:

  • 守护进程必须是子进程。
  • 主进程结束,守护进程也会随之结束。
  • 守护进程中,不允许再创建新的子进程。
  • 必须在start之前,start()之后,不能再设置daemon
  • Python 中的守护进程不是真正的“后台服务”(不能与 Linux 的 systemd 服务相提并论)。

有时候,即使父进程代码执行完了,我们看起来主进程还没退出,这通常是因为还有其他的非守护进程在运行。主进程会等待所有非守护进程结束,而守护进程则会随着父进程(此时是代码执行结束的主进程)被“清理”掉。

守护进程的应用场景

守护进程适用于非关键、辅助性、可随时丢弃的任务。

  • 后台监控类任务
  • 日志/统计/采样 类任务
  • 辅助型“陪跑任务”

注意:不适用于任何关键数据操纵、必须完成的结算任务、需要可靠关闭的网络服务。

案例:基础日志记录

import os
import time
from multiprocessing import Process

# 定义【守护进程】业务逻辑
def monitor():
    while True:
        try:
            with open('log.txt', 'r', encoding='utf-8') as f:
                lines = sum(1 for _ in f)
        except FileNotFoundError:
            lines = 0
        print(f'我是【守护进程({os.getpid()})】,log.txt 共有{lines}行')
        time.sleep(1)

#定义【非守护进程】业务逻辑
def test():
    for index in range(30):
        print(f'我是test({os.getpid()}), {index}')
        time.sleep(1)

if __name__ == '__main__':
    print(f'我是主进程({os.getpid()})中的【第一行】代码')

    # 创建 p1 守护进程
    p1 = Process(target=monitor, daemon=True)
    # 创建 p2 非守护进程
    p2 = Process(target=test)

    p1.start()
    p2.start()

    # 记录日志:向文件中写入数据
    with open('log.txt', 'a', encoding='utf-8') as f:
        for index in range(10):
            f.write(f'测试日志{index}\n')
            f.flush()
            time.sleep(1)

    print(f'我是主进程({os.getpid()})中的【最后一行】代码')
声明:本文为原创文章,51blog.xyz和作者拥有版权,如需转载,请注明来源于51blog.xyz并保留原文链接:https://www.51blog.xyz/article/132.html

文章归档

推荐文章

buildadmin logo
Thinkphp8 Vue3 Element PLus TypeScript Vite Pinia

🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。

热门标签

PHP ThinkPHP ThinkPHP5.1 Go Mysql Mysql5.7 Redis Linux CentOS7 Git HTML CSS CSS3 Javascript JQuery Vue LayUI VMware Uniapp 微信小程序 docker wiki Confluence7 学习笔记 uView ES6 Ant Design Pro of Vue React ThinkPHP6.0 chrome 扩展 翻译工具 Nuxt SSR 服务端渲染 scrollreveal.js ThinkPHP8.0 Mac webman 跨域CORS vscode GitHub ECharts Canvas vue3 three.js 微信支付 PHP全栈开发 Python AI 人工智能 AI生成 工作经验 实战笔记