线程的语法与进程有很多相似之处,所以本文就简单串讲一下线程的相关知识,更多细节可以阅读本站进程相关文章或官方文档。
线程是操作系统能够调度的最小执行单元,它属于一个进程,同一个进程内的多个线程共享进程的内存空间(包括全局变量、文件描述符等)。
我们需要明确一个概念:任何一个正在运行的 Python 程序中,至少都有一个线程!以下面的代码为例:
if __name__ == '__main__':
print('主进程中的代码')
对于上面的代码来说:print('主进程中的代码')确实属于主进程,但更准确的说,是运行在主进程里的主线程中。
线程是进程中的执行单位:
与进程非常相似,语法结构基本一致,且传递参数也相同。
在实例化Thread时,可以传递以下参数:
✧ group默认值为None(应当始终为None)。
✧ target线程要执行的可调用对象,默认值为None。
✧ name线程名称,默认为None,如果设置为None,Python 会自动分配名字。
✧ args给target传的位置参数(元组)。
✧ kwargs给target传的关键字参数(字典)。
✧ daemon标记线程是否为守护线程,取值为布尔值(默认为None,表示从创建方继承)。
可以使用current_thread().name获取当前线程的名字。
线程控制:
✧ t1.join()方法的作用是阻塞t1线程,直到t1线程执行完成。(更多用法可以参考进程的join)
✧ 使用threading.RLock锁保护线程的安全共享,具体应用with lock为最佳实践,它可以通过上下文管理器自动处理锁的获取和释放,避免忘记解锁。
示例代码:
import os, time
from threading import Thread, get_native_id, RLock, current_thread
def speak(lock, thread_nums, desc):
for index in range(thread_nums):
with lock:
print(f'{desc} {current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}')
time.sleep(1)
def study(lock, thread_nums, desc):
for index in range(thread_nums):
with lock:
print(f'{desc} {current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}')
time.sleep(1)
if __name__ == '__main__':
print(f'-------- Main Process Start --------进程PID是:{os.getpid()},线程编号是:{get_native_id()}')
lock = RLock()
# 创建线程对象
t1 = Thread(target=speak, name='SpeakThread', args=(lock, 5), kwargs={'desc': '讲话线程'})
t2 = Thread(target=study, name='StudyThread', args=(lock, 3), kwargs={'desc': '学习线程'})
# 调用线程对象的 start 方法,会立刻将该线程交由操作系统进行调度
t1.start()
t2.start()
# 让主线程等 t1 和 t2 线程执行完毕后,主线程再继续执行
t1.join()
t2.join()
print('-------- Main Process End --------')
同一个进程下的多个线程,共享它们所同属的进程的内存空间,可以访问和修改全局变量,但由于GIL(全局解释器锁)和线程调度的不确定性,直接共享变量可能导致静态条件。
下面是一个完整的 Python 线程共享变量示例,演示内容:
counter)。RLock锁保护线程安全共享,得到正确结果。from threading import Thread, RLock
import time
# 共享变量
counter = 0
# 创建一个锁对象
lock = RLock()
# 目标函数:对共享变量进行 N 次自增(不安全的自增操作,缺少锁保护)
def increment_without_lock(n):
global counter
for _ in range(n):
current = counter
time.sleep(0.0001) # 模拟一些计算延迟,增加竞态条件发生的概率
counter = current + 1
# 目标函数:对共享变量进行 N 次自增(使用了锁保护)
def increment_with_lock(n):
global counter
for _ in range(n):
with lock: # 自动获取和释放锁
current = counter
# 模拟计算延迟(即使有延迟,锁也能保证安全)
time.sleep(0.0001)
counter = current + 1
def run_test(target_func, n, num_threads, description):
global counter
counter = 0 # 重置共享变量
threads = []
# 创建多个线程
for _ in range(num_threads):
t = Thread(target=target_func, args=(n,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 理论上结果应为:n * num_threads
expected = n * num_threads
print(f'{description:20} | 期望值:{expected:6} | 实际值:{counter:6} | {'正确' if counter == expected else '错误'}')
if __name__ == '__main__':
# 每个线程执行的自增次数
INC_PER_THREAD = 10_000
# 线程数量
THREAD_COUNT = 3
print('线程共享变量静态条件演示\n')
# 测试无锁版本(错误)
run_test(increment_without_lock, INC_PER_THREAD, THREAD_COUNT, '无锁版本(错误)')
# 测试有锁版本(正确)
run_test(increment_with_lock, INC_PER_THREAD, THREAD_COUNT, '有锁版本(正确)')
运行结果如下:
线程共享变量静态条件演示
无锁版本(错误) | 期望值: 30000 | 实际值: 10000 | 错误
有锁版本(正确) | 期望值: 30000 | 实际值: 30000 | 正确
如果一个线程被设为守护线程,但所有非守护线程结束时,他会自动被强制终止。
import time
from threading import Thread, RLock
lock = RLock()
def daemon_worker():
while True:
with lock:
print('守护线程工作中……')
time.sleep(1)
def normal_worker():
for _ in range(5):
with lock:
print('正常线程工作中……')
time.sleep(1)
if __name__ == '__main__':
t1 = Thread(target=daemon_worker, daemon=True)
t2 = Thread(target=normal_worker)
t1.start()
t2.start()
上面的代码运行结果是:5秒后会自动终止daemon_worker中的无限循环。
与继承Process创建进程一样,我们也可以继承Thread创建线程。
import os, time
from threading import Thread, get_native_id, RLock, current_thread
class SpeakClass(Thread):
def __init__(self, lock, thread_nums, desc, **kwargs):
super().__init__(**kwargs)
self.lock = lock
self.thread_nums = thread_nums
self.desc = desc
def run(self):
for index in range(self.thread_nums):
with self.lock:
print(
f'{self.desc} {current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}')
time.sleep(1)
class StudyClass(Thread):
def __init__(self, lock, thread_nums, desc, **kwargs):
super().__init__(**kwargs)
self.lock = lock
self.thread_nums = thread_nums
self.desc = desc
def run(self):
for index in range(self.thread_nums):
with self.lock:
print(
f'{self.desc} {current_thread().name}{index},进程PID是:{os.getpid()},线程编号是:{get_native_id()}')
time.sleep(1)
if __name__ == '__main__':
print(f'-------- Main Process Start --------进程PID是:{os.getpid()},线程编号是:{get_native_id()}')
lock = RLock()
# 创建线程对象
t1 = SpeakClass(lock, 5, '讲话线程', name='SpeakThread')
t2 = StudyClass(lock, 3, '学习线程', name='StudyThread')
# 调用线程对象的 start 方法,会立刻将该线程交由操作系统进行调度
t1.start()
t2.start()
# 让主线程等 t1 和 t2 线程执行完毕后,主线程再继续执行
t1.join()
t2.join()
print('-------- Main Process End --------')
🔥BuildAdmin是一个永久免费开源,无需授权即可商业使用,且使用了流行技术栈快速创建商业级后台管理系统。