Python Library: Threading


1. Thread

我们可以选择继承 Thread 来实现自己的线程类,或者直接像 C# Thread 那样传个函数进去。
from threading import *

def t(a, b):
print currentThread().name, a, b

Thread(target = t, args = (1, 2)).start()

输出:
$ ./main.py

Thread-1 1 2

要实现自己的线程类,可以重写 __init__() 和 run() 就行了。不过一旦我们定义了 run(),我们传进去的 target 就不会自动执行了。
class MyThread(Thread):
def __init__(self, name, x):
Thread.__init__(self, name=name)
self.x = x
def run(self):
print currentThread().name, self.x

MyThread("My", 1234).start()

输出:
$ ./main.py

My 1234

Thread 有个重要的属性 daemon,和 .NET Thread.IsBackground 是一个意思,一旦设置为 Daemon Thread,就表示是个 "后台线程"。
def test():
for i in range(10):
print currentThread().name, i
sleep(1)

t = Thread(target = test)
#t.daemon = True
t.start()
print "main over!"

输出:
$ ./main.py # 非 Daemon 效果,进程等待所有前台线程退出。

Thread-1 0
main over!
Thread-1 1
Thread-1 2
Thread-1 3
Thread-1 4
Thread-1 5
Thread-1 6
Thread-1 7
Thread-1 8
Thread-1 9

$ ./main.py # IsDaemon,进程不等待后台线程。

Thread-1 0
main over!

2. Lock

Lock/RLock 和 C# lock 关键字差不多意思。不同的是,Lock/RLock 只需 "锁定" 自己,而 C# lock 还得另外找个引用类型对象。Lock 有个问题就是同一个线程内部也不能多次 "锁定",否则会死锁。RLock 没有这个问题,它会处理 "owning thread" 和 "recursion level" 状态,对于同一线程的多次请求锁行为,只累加计数器。每次调用 release() 将递减该计数器,直到 0 时释放锁,因此记住 acquire() 和 release() 要成对出现。

直接用 RLock,忘了 Lock 吧。

非锁定版本:
def test():
for i in range(3):
print currentThread().name, i
sleep(1)

for i in range(2):
Thread(target = test).start()

输出:
$ ./main.py

Thread-1 0
Thread-2 0
Thread-1 1
Thread-2 1
Thread-1 2
Thread-2 2

锁定版本:
lock = RLock()

def test():
lock.acquire()
try:
for i in range(3):
print currentThread().name, i
sleep(1)
finally:
lock.release()

for i in range(2):
Thread(target = test).start()

输出:
$ ./main.py

Thread-1 0
Thread-1 1
Thread-1 2
Thread-2 0
Thread-2 1
Thread-2 2

RLock 实现了 Context Management Protocol,会自动调用 acquire() 和 release() 函数,因此直接用 with/as 来实现 C# "lock(o) { ... }"。
lock = RLock()

def test():
with lock:
for i in range(3):
print currentThread().name, i
sleep(1)

for i in range(2):
Thread(target = test).start()

3. Condition

Condition 算是 Lock 和 Event 的杂交版本,除了作为 Lock 的基本功能外,还提供了 wait() 和 notify() 作为线程间 "消息通知"。
from threading import *
from time import *

condi = Condition()

def t1():
condi.acquire()
try:
for i in range(10):
print currentThread().name, i
sleep(1)
if (i == 4): condi.wait()
# wait() 释放锁,并进入等待状态。直到接收到 notify() 发送的消息后再次试图获取锁,继续后续代码执行。
finally:
condi.release()

def t2():
condi.acquire()
try:
for i in range(10):
print currentThread().name, i
sleep(1)
finally:
condi.notify() # 在释放锁前通知等待的线程准备起床。
condi.release()

Thread(target=t1).start()
Thread(target=t2).start()

输出:
$ ./main.py

Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3
Thread-1 4 <--- Thread1 释放锁,开始等待。
Thread-2 0 <--- Thread2 获得锁,开始执行。
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-2 5
Thread-2 6
Thread-2 7
Thread-2 8
Thread-2 9 <--- Thread2 发送通知,并释放锁。
Thread-1 5 <--- Thread1 收到消息,再次拿到锁,开始未完成的工作。
Thread-1 6
Thread-1 7
Thread-1 8
Thread-1 9

wait() 实际可以分解为 "condi.release(); ... acquire()" 这两个动作。我们可以使用 Condition 包装已有的锁,当然也可以用 with/as 来改善我们的代码。
lock = RLock()
condi = Condition(lock)

def t1():
with condi:
for i in range(10):
print currentThread().name, i
sleep(1)
if (i == 4): condi.wait()

def t2():
with lock:
for i in range(10):
print currentThread().name, i
sleep(1)

condi.notify()

Thread(target=t1).start()
Thread(target=t2).start()

注意调用 notify() 和 notifyall() 的线程必须事先获得锁,否则会抛出异常。
RuntimeError: cannot notify on un-aquired lock

4. Semaphore

和 .NET Semaphore 一样,限制可同时访问某一资源的线程数。
lock = Semaphore(2)

def test():
with lock:
for i in range(5):
print currentThread().name, i
sleep(1)

for i in range(5):
Thread(target = test).start()

输出:
$ ./main.py

Thread-1 0 <--- Thread-1 和 Thread-2 获得锁
Thread-2 0
Thread-1 1
Thread-2 1
Thread-1 2
Thread-2 2
Thread-1 3
Thread-2 3
Thread-1 4
Thread-2 4
Thread-3 0 <--- Thread-3 和 Thread-4 获得锁
Thread-4 0
Thread-3 1
Thread-4 1
Thread-3 2
Thread-4 2
Thread-3 3
Thread-4 3
Thread-3 4
Thread-4 4
Thread-5 0 <--- Thread-5 获得锁
Thread-5 1
Thread-5 2
Thread-5 3
Thread-5 4

5. Event

这个和 .NET ManualResetEvent 相对应,用于在多个线程间进行协同操作。Event.wait() 等待事件信号以便继续执行,set() 设置信号使等待的线程得以执行,clear() 清除信号。
event1 = Event()
event2 = Event()

def test1():
for i in range(5):
event1.wait() # 等待信号
print currentThread().name, i
event1.clear() # 执行完成后,清除标记(flag),使得下次 wait() 时需再次等待通知。
event2.set() # 设置另外一个等待事件,以便给另外一个线程发送信号。

def test2():
for i in range(5):
event2.wait()
print currentThread().name, i
event2.clear()
event1.set()

Thread(target = test1).start()
Thread(target = test2).start()

event1.set() # 记得先激活一个,否则都在那 "等死" 呢。

输出:
$ ./main.py

Thread-1 0
Thread-2 0
Thread-1 1
Thread-2 1
Thread-1 2
Thread-2 2
Thread-1 3
Thread-2 3
Thread-1 4
Thread-2 4

6. Timer

使用新线程在 n 秒后执行一个函数,注意只会执行一次。如果定时器还没有执行,可以使用 cancel() 方法取消。
def test(a, b):
print currentThread().name, a, b

timer = Timer(5, test, ("abc", 1))
timer.start()

使用 cancel() 取消。
def test(a, b):
print currentThread().name, a, b

timer = Timer(5, test, ("abc", 1))
timer.start()

sleep(2)
timer.cancel()

7. Thread Local Storage

这个类似 .NET ThreadStaticAttribute 特性。threading.local() 创建的对象会利用 thread-local storage (TLS) 为每个线程保存不同的数据。
from threading import *
from time import *

o = local()

def test(start, stop):
o.range = range(start, stop)
print currentThread().name, hex(id(o)), hex(id(o.range)), o.range

for i in o.range:
print currentThread().name, i
sleep(1)

Thread(target = test, args = (1, 5)).start()
Thread(target = test, args = (10, 15)).start()

输出:
$ ./main.py
Thread-1 0xb7529740L 0xb74795ecL [1, 2, 3, 4]
Thread-1 1
Thread-2 0xb7529740L 0xb74797ccL [10, 11, 12, 13, 14]
Thread-2 10
Thread-1 2
Thread-2 11
Thread-1 3
Thread-2 12
Thread-1 4
Thread-2 13
Thread-2 14

我们注意到 o 是同一个对象,但 o.range 却不是,所持有的数据也不同。