Python Library: Threading 1. Thread 我们可以选择继承 Thread 来实现自己的线程类,或者直接像 C# Thread 那样传个函数进去。 from threading import * def t(a, b): print currentThread().name, a, b Thread(target = t, args = (1, 2)).start() 输出: $ ./main.py Thread-1 1 2 要实现自己的线程类,可以重写 __init__() 和 run() 就行了。不过一旦我们定义了 run(),我们传进去的 target 就不会自动执行了。 class MyThread(Thread): def __init__(self, name, x): Thread.__init__(self, name=name) self.x = x def run(self): print currentThread().name, self.x MyThread("My", 1234).start() 输出: $ ./main.py My 1234 Thread 有个重要的属性 daemon,和 .NET Thread.IsBackground 是一个意思,一旦设置为 Daemon Thread,就表示是个 "后台线程"。 def test(): for i in range(10): print currentThread().name, i sleep(1) t = Thread(target = test) #t.daemon = True t.start() print "main over!" 输出: $ ./main.py # 非 Daemon 效果,进程等待所有前台线程退出。 Thread-1 0 main over! Thread-1 1 Thread-1 2 Thread-1 3 Thread-1 4 Thread-1 5 Thread-1 6 Thread-1 7 Thread-1 8 Thread-1 9 $ ./main.py # IsDaemon,进程不等待后台线程。 Thread-1 0 main over! 2. Lock Lock/RLock 和 C# lock 关键字差不多意思。不同的是,Lock/RLock 只需 "锁定" 自己,而 C# lock 还得另外找个引用类型对象。Lock 有个问题就是同一个线程内部也不能多次 "锁定",否则会死锁。RLock 没有这个问题,它会处理 "owning thread" 和 "recursion level" 状态,对于同一线程的多次请求锁行为,只累加计数器。每次调用 release() 将递减该计数器,直到 0 时释放锁,因此记住 acquire() 和 release() 要成对出现。 直接用 RLock,忘了 Lock 吧。 非锁定版本: def test(): for i in range(3): print currentThread().name, i sleep(1) for i in range(2): Thread(target = test).start() 输出: $ ./main.py Thread-1 0 Thread-2 0 Thread-1 1 Thread-2 1 Thread-1 2 Thread-2 2 锁定版本: lock = RLock() def test(): lock.acquire() try: for i in range(3): print currentThread().name, i sleep(1) finally: lock.release() for i in range(2): Thread(target = test).start() 输出: $ ./main.py Thread-1 0 Thread-1 1 Thread-1 2 Thread-2 0 Thread-2 1 Thread-2 2 RLock 实现了 Context Management Protocol,会自动调用 acquire() 和 release() 函数,因此直接用 with/as 来实现 C# "lock(o) { ... }"。 lock = RLock() def test(): with lock: for i in range(3): print currentThread().name, i sleep(1) for i in range(2): Thread(target = test).start() 3. Condition Condition 算是 Lock 和 Event 的杂交版本,除了作为 Lock 的基本功能外,还提供了 wait() 和 notify() 作为线程间 "消息通知"。 from threading import * from time import * condi = Condition() def t1(): condi.acquire() try: for i in range(10): print currentThread().name, i sleep(1) if (i == 4): condi.wait() # wait() 释放锁,并进入等待状态。直到接收到 notify() 发送的消息后再次试图获取锁,继续后续代码执行。 finally: condi.release() def t2(): condi.acquire() try: for i in range(10): print currentThread().name, i sleep(1) finally: condi.notify() # 在释放锁前通知等待的线程准备起床。 condi.release() Thread(target=t1).start() Thread(target=t2).start() 输出: $ ./main.py Thread-1 0 Thread-1 1 Thread-1 2 Thread-1 3 Thread-1 4 <--- Thread1 释放锁,开始等待。 Thread-2 0 <--- Thread2 获得锁,开始执行。 Thread-2 1 Thread-2 2 Thread-2 3 Thread-2 4 Thread-2 5 Thread-2 6 Thread-2 7 Thread-2 8 Thread-2 9 <--- Thread2 发送通知,并释放锁。 Thread-1 5 <--- Thread1 收到消息,再次拿到锁,开始未完成的工作。 Thread-1 6 Thread-1 7 Thread-1 8 Thread-1 9 wait() 实际可以分解为 "condi.release(); ... acquire()" 这两个动作。我们可以使用 Condition 包装已有的锁,当然也可以用 with/as 来改善我们的代码。 lock = RLock() condi = Condition(lock) def t1(): with condi: for i in range(10): print currentThread().name, i sleep(1) if (i == 4): condi.wait() def t2(): with lock: for i in range(10): print currentThread().name, i sleep(1) condi.notify() Thread(target=t1).start() Thread(target=t2).start() 注意调用 notify() 和 notifyall() 的线程必须事先获得锁,否则会抛出异常。 RuntimeError: cannot notify on un-aquired lock 4. Semaphore 和 .NET Semaphore 一样,限制可同时访问某一资源的线程数。 lock = Semaphore(2) def test(): with lock: for i in range(5): print currentThread().name, i sleep(1) for i in range(5): Thread(target = test).start() 输出: $ ./main.py Thread-1 0 <--- Thread-1 和 Thread-2 获得锁 Thread-2 0 Thread-1 1 Thread-2 1 Thread-1 2 Thread-2 2 Thread-1 3 Thread-2 3 Thread-1 4 Thread-2 4 Thread-3 0 <--- Thread-3 和 Thread-4 获得锁 Thread-4 0 Thread-3 1 Thread-4 1 Thread-3 2 Thread-4 2 Thread-3 3 Thread-4 3 Thread-3 4 Thread-4 4 Thread-5 0 <--- Thread-5 获得锁 Thread-5 1 Thread-5 2 Thread-5 3 Thread-5 4 5. Event 这个和 .NET ManualResetEvent 相对应,用于在多个线程间进行协同操作。Event.wait() 等待事件信号以便继续执行,set() 设置信号使等待的线程得以执行,clear() 清除信号。 event1 = Event() event2 = Event() def test1(): for i in range(5): event1.wait() # 等待信号 print currentThread().name, i event1.clear() # 执行完成后,清除标记(flag),使得下次 wait() 时需再次等待通知。 event2.set() # 设置另外一个等待事件,以便给另外一个线程发送信号。 def test2(): for i in range(5): event2.wait() print currentThread().name, i event2.clear() event1.set() Thread(target = test1).start() Thread(target = test2).start() event1.set() # 记得先激活一个,否则都在那 "等死" 呢。 输出: $ ./main.py Thread-1 0 Thread-2 0 Thread-1 1 Thread-2 1 Thread-1 2 Thread-2 2 Thread-1 3 Thread-2 3 Thread-1 4 Thread-2 4 6. Timer 使用新线程在 n 秒后执行一个函数,注意只会执行一次。如果定时器还没有执行,可以使用 cancel() 方法取消。 def test(a, b): print currentThread().name, a, b timer = Timer(5, test, ("abc", 1)) timer.start() 使用 cancel() 取消。 def test(a, b): print currentThread().name, a, b timer = Timer(5, test, ("abc", 1)) timer.start() sleep(2) timer.cancel() 7. Thread Local Storage 这个类似 .NET ThreadStaticAttribute 特性。threading.local() 创建的对象会利用 thread-local storage (TLS) 为每个线程保存不同的数据。 from threading import * from time import * o = local() def test(start, stop): o.range = range(start, stop) print currentThread().name, hex(id(o)), hex(id(o.range)), o.range for i in o.range: print currentThread().name, i sleep(1) Thread(target = test, args = (1, 5)).start() Thread(target = test, args = (10, 15)).start() 输出: $ ./main.py Thread-1 0xb7529740L 0xb74795ecL [1, 2, 3, 4] Thread-1 1 Thread-2 0xb7529740L 0xb74797ccL [10, 11, 12, 13, 14] Thread-2 10 Thread-1 2 Thread-2 11 Thread-1 3 Thread-2 12 Thread-1 4 Thread-2 13 Thread-2 14 我们注意到 o 是同一个对象,但 o.range 却不是,所持有的数据也不同。