本文主要记录Python多线程编程相关的一些笔记。包括threading模块,Thread类,以及5种线程锁(互斥锁,重入锁,信号,事件,条件)等。
Python 多线程笔记
Created 2021.02.20 by William Yu; Last modified: 2022.09.16-v1.1.0
Contact: windmillyucong@163.com
Copyleft! 2022 William Yu. Some rights reserved.
Reference
0. 简介
在Python3中,通过threading模块提供线程。原来的thread模块已废弃。但是threading模块中有个Thread类(大写的T,类名),最主要的线程类。
1. threading模块
1.1 常用方法
方法与属性 |
描述 |
current_thread() |
返回当前线程 |
active_count() |
返回当前活跃的线程数,1个主线程+n个子线程 |
get_ident() |
返回当前线程 |
enumerate() |
返回当前活动 Thread 对象列表 |
main_thread() |
返回主 Thread 对象 |
settrace(func) |
为所有线程设置一个 trace 函数 |
setprofile(func) |
为所有线程设置一个 profile 函数 |
stack_size([size]) |
返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size |
TIMEOUT_MAX |
Lock.acquire(), RLock.acquire(), Condition.wait() 允许的最大超时时间 |
1.2 提供的类
- Thread:基本线程类
- Lock:互斥锁
- RLock:可重入锁,使单一进程再次获得已持有的锁(递归锁)
- Condition:条件锁,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值。
- Semaphore:信号锁。为线程间共享的有限资源提供一个”计数器”,如果没有可用资源则会被阻塞。
- Event:事件锁,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活
- Timer:一种计时器
- Barrier:Python3.2新增的“阻碍”类,必须达到指定数量的线程后才可以继续执行。
2. Thread类
threading模块最重要的类
2.1 Thread类的方法
方法与属性 |
说明 |
start() |
启动线程,等待CPU调度 |
run() |
线程被cpu调度后自动执行的方法 |
getName()、setName()和name |
用于获取和设置线程的名称。 |
setDaemon() |
设置为后台线程或前台线程(默认是False,前台线程)。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止。如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程执行完成后,程序才停止。 |
ident |
获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。 |
is_alive() |
判断线程是否是激活的(alive)。从调用start()方法启动线程,到run()方法执行完毕或遇到未处理异常而中断这段时间内,线程是激活的。 |
isDaemon()方法和daemon属性 |
是否为守护线程 |
join([timeout]) |
调用该方法将会使主调线程堵塞,直到被调用线程运行结束或超时。参数timeout是一个数值类型,表示超时时间,如果未提供该参数,那么主调线程将一直堵塞到被调线程结束。 |
2.2 创建线程
方法一: 类继承
- step1. 直接从 threading.Thread 继承创建一个子类
- step2. 并在子类里面实现一个重载的run()方法
- step3. 实例化之后调用start()方法启动新线程
- step4. start()会启动重载的run()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import threading
import time
class MyThread(threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print("start thread:"+self.name)
self.MyJob()
print("exit thread:"+self.name)
pass
def MyJob(self):
while self.counter:
time.sleep(1) # 1s
print("%s,%s" % (self.name, time.ctime(time.time()) ))
self.counter -= 1
pass
|
1
2
3
4
| thread = MyThread(1, "hello", 6)
thread.start()
thread.join()
print("exit main thread")
|
1
2
3
4
5
6
7
8
9
| start thread:hello
hello,Sun Feb 7 16:40:44 2021
hello,Sun Feb 7 16:40:45 2021
hello,Sun Feb 7 16:40:46 2021
hello,Sun Feb 7 16:40:47 2021
hello,Sun Feb 7 16:40:48 2021
hello,Sun Feb 7 16:40:49 2021
exit thread:hello
exit main thread
|
方法二:实例化threading.Thread对象
- 在实例化该对象的时候,将线程要执行的任务函数作为参数传入线程
1
2
3
4
5
6
7
8
9
10
11
12
| import threading
import time
def MyJob(name, counter):
while counter:
time.sleep(1) # 1s
print("%s,%s" % (name, time.ctime(time.time()) ))
counter -= 1
pass
mythread = threading.Thread(target=MyJob, args=('hello', 4))
mythread.start()
mythread.join()
|
1
2
3
4
| hello,Mon Mar 8 15:14:09 2021
hello,Mon Mar 8 15:14:10 2021
hello,Mon Mar 8 15:14:11 2021
hello,Mon Mar 8 15:14:12 2021
|
注意:构造参数
- 第一个参数是线程函数变量
- 第二个参数args是一个数组变量参数
- 如果需要传递多个参数,写在元组里面,(参数1, 参数2, 参数3)
- 如果只传递一个值,元组中只包含一个元素时,需要在元素后面添加逗号,逗号不能省略 (参数1,)
3. 线程同步:锁
threading 提供了很多锁类型:
- Lock 互斥锁
- RLock 可重入锁
- Condition 条件
- Event 事件
- Semaphore 信号
- Barrier “阻碍”
3.1 Lock 互斥锁
使用过程也非常简单:
lock = threading.Lock()
实例化锁
lock.acquire()
上锁
lock.release()
释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
| import threading
import time
number = 0
lock = threading.Lock() # 实例化一个互斥锁
def plus(lk):
global number # global声明此处的number指的就是外面的全局变量number
lk.acquire() # 开始加锁
for _ in range(1000000): # 进行一个大数级别的循环加一运算
number += 1
print("子线程%s运算结束后,number = %s" % (threading.current_thread().getName(), number))
lk.release() # 释放锁,让别的线程也可以访问number
if __name__ == '__main__':
threads = []
for i in range(2): # 用2个子线程,就可以观察到脏数据
t = threading.Thread(target=plus, args=(lock,)) # 需要把锁当做参数传递给plus函数
t.start()
threads.append(t)
for i in threads:
i.join()
print("主线程执行完毕后,number = ", number)
|
1
2
3
| 子线程Thread-17运算结束后,number = 1000000
子线程Thread-18运算结束后,number = 2000000
主线程执行完毕后,number = 2000000
|
3.2 RLock 重入锁
- Rlock的使用方法与Lock一样,但是可重入。
- 可被一个线程多次acquire,锁内部有一个counter计数对象,记录acquire次数
- 可被一个线程多次release,release一次,counter减1
- 计数为0时,其他线程才能获取资源
3.3 Semaphore 信号
- 可被多个线程同时acquire
- 可以设置一个counter上限,表示最多有多少线程可同时拥有这把锁
- 通常用于只访问数据,不改变数据的多线程工作
- 信号量上限为1时,相当于互斥锁
使用方法
se = threading.BoundedSemaphore(2)
构造一个允许2个线程同时持有的锁
se.acquire()
获取锁
se.release()
释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import threading
import time
def MyJob(name, counter, delay, se):
se.acquire()
while counter:
time.sleep(delay) # 1s
print("%s,%s" % (name, time.ctime(time.time()) ))
counter -= 1
se.release()
pass
se = threading.BoundedSemaphore(2) # 创建一个信号量锁,运行同时发两把钥匙
mythread1 = threading.Thread(target=MyJob, args=('thread1', 8, 1, se))
mythread2 = threading.Thread(target=MyJob, args=('thread2', 3, 2, se))
mythread1.start()
mythread2.start()
mythread1.join()
mythread2.join()
|
1
2
3
4
5
6
7
8
9
10
11
| thread1,Sun Feb 7 18:29:24 2021
thread2,Sun Feb 7 18:29:25 2021
thread1,Sun Feb 7 18:29:25 2021
thread1,Sun Feb 7 18:29:26 2021
thread2,Sun Feb 7 18:29:27 2021
thread1,Sun Feb 7 18:29:27 2021
thread1,Sun Feb 7 18:29:28 2021
thread2,Sun Feb 7 18:29:29 2021
thread1,Sun Feb 7 18:29:29 2021
thread1,Sun Feb 7 18:29:30 2021
thread1,Sun Feb 7 18:29:31 2021
|
3.4 Event 事件
事件线程锁的运行机制
- 定义全局Flag
- 在线程执行wait()方法时,如果Flag为Flase,就会被阻塞,反之放行
- 可以存在多个线程wait一个Flag的情形,放行时一次全部放行
Event提供的方法
clear()
复位 Flase
set()
置位放行 True
wait()
注册一个等待,一旦有其他线程运行了set()将Flag置位,就接着向下运行
is_set()
判断是否为true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
| import threading
import time
event_green_light_on = threading.Event()
running = False
def lighter():
green_time = 3 # 绿灯时间
red_time = 2 # 红灯时间
event_green_light_on.set() # 初始设为绿灯
while running:
print("绿灯亮...")
time.sleep(green_time)
event_green_light_on.clear()
print("红灯亮...")
time.sleep(red_time)
event_green_light_on.set()
print("红绿灯线程退出")
pass
def Cars(name):
while running:
if event_green_light_on.is_set(): # 判断当前是否"放行"状态
print("一辆[%s] 呼啸开过..." % name)
time.sleep(0.5)
else:
print("一辆[%s]开来,看到红灯,无奈的停下了..." % name)
event_green_light_on.wait() # 程序阻塞,等待事件
print("[%s] 看到绿灯亮了,瞬间飞起....." % name)
print("汽车线程退出")
pass
if __name__ == '__main__':
running = True
light = threading.Thread(target=lighter,)
light.start()
for name in ['奔驰', '宝马', '奥迪']:
car = threading.Thread(target=Cars, args=(name,))
car.start()
time.sleep(10)
running = False
if light.isAlive():
light.join()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
| 绿灯亮...
一辆[奔驰] 呼啸开过...一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[宝马] 呼啸开过...一辆[奥迪] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[奔驰] 呼啸开过...
红灯亮...
一辆[宝马]开来,看到红灯,无奈的停下了...
一辆[奥迪]开来,看到红灯,无奈的停下了...
一辆[奔驰]开来,看到红灯,无奈的停下了...
绿灯亮...[宝马] 看到绿灯亮了,瞬间飞起.....
一辆[宝马] 呼啸开过...
[奔驰] 看到绿灯亮了,瞬间飞起.....
一辆[奔驰] 呼啸开过...
[奥迪] 看到绿灯亮了,瞬间飞起.....
一辆[奥迪] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[奥迪] 呼啸开过...
一辆[宝马] 呼啸开过...
一辆[奔驰] 呼啸开过...
一辆[奥迪] 呼啸开过...
红灯亮...
一辆[宝马]开来,看到红灯,无奈的停下了...
一辆[奔驰]开来,看到红灯,无奈的停下了...
一辆[奥迪]开来,看到红灯,无奈的停下了...
红绿灯线程退出[奥迪] 看到绿灯亮了,瞬间飞起.....[奔驰] 看到绿灯亮了,瞬间飞起.....[宝马] 看到绿灯亮了,瞬间飞起.....
汽车线程退出
汽车线程退出汽车线程退出
|
3.5 条件 Condition
Condition称作条件锁
提供的方法
acquire()/release()
加锁解锁
wait([timeout])
将线程加入到condition的等待池,等待通知,并释放锁。使用前线程必须已经获得锁,否则抛出异常
notify()
从等待池中间挑选一个线程并通知,收到通知的线程自动调用acquire()获取锁,其余线程依旧等待。这个方法不会释放锁。使用线程前必须已经获得锁,否则抛出异常。
notifyAll()
通知所有等待池中的线程,这些线程都进入锁定池尝试获得锁定。调用这个方法不会释放锁定。使用前线程必须已获得锁定,否则将抛出异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
| import threading
import time
num = 0
con = threading.Condition() # 设置一个条件
running = True
class Foo(threading.Thread):
def __init__(self, name, action):
super(Foo, self).__init__()
self.name = name
self.action = action
def run(self):
global num
con.acquire() # 获取锁
print("%s开始执行..." % self.name)
while running:
if self.action == "add":
num += 1
elif self.action == 'reduce':
num -= 1
else:
exit(1)
print("num当前为:", num)
time.sleep(1)
if num == 5 or num == 0:
print("暂停执行%s!" % self.name)
con.notify() # 通知
con.wait() # 等待
print("%s开始执行..." % self.name)
con.release() # 释放锁
pass
if __name__ == '__main__':
a = Foo("线程A", 'add')
b = Foo("线程B", 'reduce')
a.start()
b.start()
time.sleep(12)
running = False
if a.isAlive():
con.notify()
a.join()
if b.isAlive():
con.notify()
b.join()
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
| 线程A开始执行...
num当前为: 1
num当前为: 2
num当前为: 3
num当前为: 4
num当前为: 5
暂停执行线程A!
线程B开始执行...
num当前为: 4
num当前为: 3
num当前为: 2
num当前为: 1
num当前为: 0
暂停执行线程B!
线程A开始执行...
num当前为: 1
num当前为: 2
---------------------------------------------------------------------------
RuntimeError Traceback (most recent call last)
<ipython-input-7-3ccbac894eac> in <module>
42 running = False
43 if a.isAlive():
---> 44 con.notify()
45 a.join()
46 if b.isAlive():
/usr/lib/python3.5/threading.py in notify(self, n)
341 """
342 if not self._is_owned():
--> 343 raise RuntimeError("cannot notify on un-acquired lock")
344 all_waiters = self._waiters
345 waiters_to_notify = _deque(_islice(all_waiters, n))
RuntimeError: cannot notify on un-acquired lock
|
3.6 Timer 定时器
- Timer定时器类,用于指定n秒之后执行某项操作
- Timer类是threading模块中的一个小工具,用于指定n秒后执行某操作。一个简单但很实用的东西。
1
2
3
4
5
6
7
| from threading import Timer
def hello():
print("hello. world")
t = Timer(3, hello)
t.start()
|
3.7 通过with语句使用的线程锁
是一种良好的代码习惯与格式。
常用于异常处理:
- 所有的线程锁都有一个加锁和释放锁的动作,类似与文件的打开与关闭
- 加锁之后,如果出现异常,没有正常释放锁,那么线程会造成致命性的影响
- 通过with上下文管理器,确保锁被正常释放
1
2
3
| with some_lock:
# do something
pass
|
1
2
3
4
5
6
| # 也相当于
some_lock.acquire()
try:
# 执行任务..
finally:
some_lock.release()
|
3.8 全局解释锁(GIL)
- python 中无论CPU有多少核,同时只能执行一个线程,这是由于GIL(Global Interpreter Lock())造成的
- GIL只在CPython解释器中存在,因为CPython调用的是c语言的原生线程,不能直接操作CPU
- 只能利用GIL保证同一时间只有一个线程拿到数据
- PyPy和JPython中都没有GIL
Python多线程的工作流程:
- 拿到公共数据
- 申请GIL
- Python解释器调用操作系统原生线程
- cpu执行运算
- 当该线程执行一段时间消耗完,无论任务是否已经执行完毕,都会释放GIL
- 下一个被CPU调度的线程重复上面的过程
特点
针对不同类型的任务,多线程的效率不同
- 对于CPU密集型的任务(各种循环,计算等),计算次数多,ticks计数很快达到阈值,会触发GIL的释放与再竞争。但是多个线程之间的来回切换非常耗时
- python的多线程对CPU密集型任务并不友好
- IO密集型任务(文件处理、网络通信等设计到数据读写的操作),IO操作会常有IO等待,在等待时切换到其他线程可不保证不浪费CPU资源,提升执行效率
- python的多线程对IO密集型任务比较友好
为什么不去除GIL
历史原因
如何是好
- Python中想要充分利用CPU资源,使用多进程。每个进程有自己独立的GIL,互不干扰。多进程才是真正意义上的Python并行
- 在Python中,多进程的执行效率优于多线程(仅仅针对多核CPU而言)
- IO密集型任务,使用多线程
- 计算密集型任务,使用多进程
- 此外,python 的协程机制 //todo(congyu)
4. 其他
注意哦:
Feel free to contact me windmillyucong@163.com anytime for anything.
License