引言&動(dòng)機(jī)
考慮一下這個(gè)場景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,每條數(shù)據(jù)互不干擾。該如何執(zhí)行才能花費(fèi)時(shí)間最短呢?
在多線程(MT)編程出現(xiàn)之前,電腦程序的運(yùn)行由一個(gè)執(zhí)行序列組成,執(zhí)行序列按順序在主機(jī)的中央處理器(CPU)中運(yùn)行。無論是任務(wù)本身要求順序執(zhí)行還是整個(gè)程序是由多個(gè)子任務(wù)組成,程序都是按這種方式執(zhí)行的。即使子任務(wù)相互獨(dú)立,互相無關(guān)(即,一個(gè)子任務(wù)的結(jié)果不影響其它子 任務(wù)的結(jié)果)時(shí)也是這樣。
對(duì)于上邊的問題,如果使用一個(gè)執(zhí)行序列來完成,我們大約需要花費(fèi) 10000*0.1 + 10000 = 11000 秒。這個(gè)時(shí)間顯然是太長了。
那我們有沒有可能在執(zhí)行計(jì)算的同時(shí)取數(shù)據(jù)呢?或者是同時(shí)處理幾條數(shù)據(jù)呢?如果可以,這樣就能大幅提高任務(wù)的效率。這就是多線程編程的目的。
對(duì)于本質(zhì)上就是異步的, 需要有多個(gè)并發(fā)事務(wù),各個(gè)事務(wù)的運(yùn)行順序可以是不確定的,隨機(jī)的,不可預(yù)測的問題,多線程是最理想的解決方案。這樣的任務(wù)可以被分成多個(gè)執(zhí)行流,每個(gè)流都有一個(gè)要完成的目標(biāo),然后將得到的結(jié)果合并,得到最終的結(jié)果。
線程和進(jìn)程
什么是進(jìn)程
進(jìn)程(有時(shí)被稱為重量級(jí)進(jìn)程)是程序的一次 執(zhí)行。每個(gè)進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。操作系 統(tǒng)管理在其上運(yùn)行的所有進(jìn)程,并為這些進(jìn)程公平地分配時(shí)間。進(jìn)程也可以通過 fork 和 spawn 操作 來完成其它的任務(wù)。不過各個(gè)進(jìn)程有自己的內(nèi)存空間,數(shù)據(jù)棧等,所以只能使用進(jìn)程間通訊(IPC), 而不能直接共享信息。
什么是線程
線程(有時(shí)被稱為輕量級(jí)進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個(gè)進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。
線程狀態(tài)如圖
線程有開始,順序執(zhí)行和結(jié)束三部分。它有一個(gè)自己的指令指針,記錄自己運(yùn)行到什么地方。 線程的運(yùn)行可能被搶占(中斷),或暫時(shí)的被掛起(也叫睡眠),讓其它的線程運(yùn)行,這叫做讓步。 一個(gè)進(jìn)程中的各個(gè)線程之間共享同一片數(shù)據(jù)空間,所以線程之間可以比進(jìn)程之間更方便地共享數(shù)據(jù)以及相互通訊。
當(dāng)然,這樣的共享并不是完全沒有危險(xiǎn)的。如果多個(gè)線程共同訪問同一片數(shù)據(jù),則由于數(shù)據(jù)訪 問的順序不一樣,有可能導(dǎo)致數(shù)據(jù)結(jié)果的不一致的問題。這叫做競態(tài)條件(race condition)。
線程一般都是并發(fā)執(zhí)行的,不過在單 CPU 的系統(tǒng)中,真正的并發(fā)是不可能的,每個(gè)線程會(huì)被安排成每次只運(yùn)行一小會(huì),然后就把 CPU 讓出來,讓其它的線程去運(yùn)行。由于有的函數(shù)會(huì)在完成之前阻塞住,在沒有特別為多線程做修改的情 況下,這種“貪婪”的函數(shù)會(huì)讓 CPU 的時(shí)間分配有所傾斜。導(dǎo)致各個(gè)線程分配到的運(yùn)行時(shí)間可能不 盡相同,不盡公平。
Python、線程和全局解釋器鎖
全局解釋器鎖(GIL)
首先需要明確的一點(diǎn)是GIL并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行(其中的JPython就沒有GIL)。
那么CPython實(shí)現(xiàn)中的GIL又是什么呢?GIL全稱Global Interpreter Lock為了避免誤導(dǎo),我們還是來看一下官方給出的解釋:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
盡管Python完全支持多線程編程, 但是解釋器的C語言實(shí)現(xiàn)部分在完全并行執(zhí)行時(shí)并不是線程安全的。 實(shí)際上,解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè)Python線程執(zhí)行。
在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:
- 設(shè)置GIL
- 切換到一個(gè)線程去執(zhí)行
- 運(yùn)行
- 指定數(shù)量的字節(jié)碼指令
- 線程主動(dòng)讓出控制(可以調(diào)用time.sleep(0))
- 把線程設(shè)置完睡眠狀態(tài)
- 解鎖GIL
- 再次重復(fù)以上步驟
對(duì)所有面向 I/O 的(會(huì)調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會(huì)在這個(gè) I/O 調(diào)用之 前被釋放,以允許其它的線程在這個(gè)線程等待 I/O 的時(shí)候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會(huì)在自己的時(shí)間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計(jì)算密集 型的程序更能充分利用多線程環(huán)境的好處。
退出線程
當(dāng)一個(gè)線程結(jié)束計(jì)算,它就退出了。線程可以調(diào)用 thread.exit()之類的退出函數(shù),也可以使用 Python 退出進(jìn)程的標(biāo)準(zhǔn)方法,如 sys.exit()或拋出一個(gè) SystemExit 異常等。不過,你不可以直接 “殺掉”(“kill”)一個(gè)線程。
在 Python 中使用線程
在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多數(shù)類 Unix 系統(tǒng)上運(yùn)行時(shí),Python 支持多線程 編程。Python 使用 POSIX 兼容的線程,即 pthreads。
默認(rèn)情況下,只要在解釋器中
如果沒有報(bào)錯(cuò),則說明線程可用。
Python 的 threading 模塊
Python 供了幾個(gè)用于多線程編程的模塊,包括 thread, threading 和 Queue 等。thread 和 threading 模塊允許程序員創(chuàng)建和管理線程。thread 模塊 供了基本的線程和鎖的支持,而 threading 供了更高級(jí)別,功能更強(qiáng)的線程管理的功能。Queue 模塊允許用戶創(chuàng)建一個(gè)可以用于多個(gè)線程之間 共享數(shù)據(jù)的隊(duì)列數(shù)據(jù)結(jié)構(gòu)。
核心 示:避免使用 thread 模塊
出于以下幾點(diǎn)考慮,我們不建議您使用 thread 模塊。
- 更高級(jí)別的 threading 模塊更為先 進(jìn),對(duì)線程的支持更為完善,而且使用 thread 模塊里的屬性有可能會(huì)與 threading 出現(xiàn)沖突。其次, 低級(jí)別的 thread 模塊的同步原語很少(實(shí)際上只有一個(gè)),而 threading 模塊則有很多。
- 對(duì)于你的進(jìn)程什么時(shí)候應(yīng)該結(jié)束完全沒有控制,當(dāng)主線程結(jié)束 時(shí),所有的線程都會(huì)被強(qiáng)制結(jié)束掉,沒有警告也不會(huì)有正常的清除工作。我們之前說過,至少 threading 模塊能確保重要的子線程退出后進(jìn)程才退出。
thread 模塊
除了產(chǎn)生線程外,thread 模塊也提供了基本的同步數(shù) 據(jù)結(jié)構(gòu)鎖對(duì)象(lock object,也叫原語鎖,簡單鎖,互斥鎖,互斥量,二值信號(hào)量)。
thread 模塊函數(shù)
- start_new_thread(function, args, kwargs=None):產(chǎn)生一個(gè)新的線程,在新線程中用指定的參數(shù)和可選的 kwargs 來調(diào)用這個(gè)函數(shù)。
- allocate_lock():分配一個(gè) LockType 類型的鎖對(duì)象
- exit():讓線程退出
- acquire(wait=None):嘗試獲取鎖對(duì)象
- locked():如果獲取了鎖對(duì)象返回 True,否則返回 False
- release():釋放鎖
下面是一個(gè)使用 thread 的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
import thread from time import sleep, time def loop(num): print('start loop at:', time()) sleep(num) print('loop done at:', time()) def loop1(num): print('start loop 1 at:', time()) sleep(num) print('loop 1 done at:', time()) def main(): print('starting at:', time()) thread.start_new_thread(loop, (4,)) thread.start_new_thread(loop1, (5,)) sleep(6) print('all DONE at:', time()) if __name__ == '__main__': main() ('starting at:', 1489387024.886667) ('start loop at:', 1489387024.88705) ('start loop 1 at:', 1489387024.887277) ('loop done at:', 1489387028.888182) ('loop 1 done at:', 1489387029.888904) ('all DONE at:', 1489387030.889918) |
start_new_thread()要求一定要有前兩個(gè)參數(shù)。所以,就算我們想要運(yùn)行的函數(shù)不要參數(shù),也要傳一個(gè)空的元組。
為什么要加上sleep(6)這一句呢? 因?yàn)椋绻覀儧]有讓主線程停下來,那主線程就會(huì)運(yùn)行下一條語句,顯示 “all done”,然后就關(guān)閉運(yùn)行著 loop()和 loop1()的兩個(gè)線程,退出了。
我們有沒有更好的辦法替換使用sleep() 這種不靠譜的同步方式呢?答案是使用鎖,使用了鎖,我們就可以在兩個(gè)線程都退出之后馬上退出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
#! -*- coding: utf-8 -*- import thread from time import sleep, time loops = [4, 2] def loop(nloop, nsec, lock): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) # 每個(gè)線程都會(huì)被分配一個(gè)事先已經(jīng)獲得的鎖,在 sleep()的時(shí)間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個(gè)線程已經(jīng)結(jié)束了。 lock.release() def main(): print('starting at:', time()) locks = [] nloops = range(len(loops)) for i in nloops: # 調(diào)用 thread.allocate_lock()函數(shù)創(chuàng)建一個(gè)鎖的列表 lock = thread.allocate_lock() # 分別調(diào)用各個(gè)鎖的 acquire()函數(shù)獲得, 獲得鎖表示“把鎖鎖上” lock.acquire() locks.append(lock) for i in nloops: # 創(chuàng)建線程,每個(gè)線程都用各自的循環(huán)號(hào),睡眠時(shí)間和鎖為參數(shù)去調(diào)用 loop()函數(shù) thread.start_new_thread(loop, (i, loops[i], locks[i])) for i in nloops: # 在線程結(jié)束的時(shí)候,線程要自己去做解鎖操作 # 當(dāng)前循環(huán)只是坐在那一直等(達(dá)到暫停主 線程的目的),直到兩個(gè)鎖都被解鎖為止才繼續(xù)運(yùn)行。 while locks[i].locked(): pass print('all DONE at:', time()) if __name__ == '__main__': main() |
為什么我們不在創(chuàng)建鎖的循環(huán)里創(chuàng)建線程呢?有以下幾個(gè)原因:
- 我們想到實(shí)現(xiàn)線程的同步,所以要讓“所有的馬同時(shí)沖出柵欄”。
- 獲取鎖要花一些時(shí)間,如果你的 線程退出得“太快”,可能會(huì)導(dǎo)致還沒有獲得鎖,線程就已經(jīng)結(jié)束了的情況。
threading 模塊
threading 模塊不僅提供了 Thread 類,還 供了各 種非常好用的同步機(jī)制。
下面是threading 模塊里所有的對(duì)象:
- Thread: 表示一個(gè)線程的執(zhí)行的對(duì)象
- Lock: 鎖原語對(duì)象(跟 thread 模塊里的鎖對(duì)象相同)
- RLock: 可重入鎖對(duì)象。使單線程可以再次獲得已經(jīng)獲得了的鎖(遞歸鎖定)。
- Condition: 條件變量對(duì)象能讓一個(gè)線程停下來,等待其它線程滿足了某個(gè)“條件”。 如,狀態(tài)的改變或值的改變。
- Event: 通用的條件變量。多個(gè)線程可以等待某個(gè)事件的發(fā)生,在事件發(fā)生后, 所有的線程都會(huì)被激活。
- Semaphore: 為等待鎖的線程 供一個(gè)類似“等候室”的結(jié)構(gòu)
- BoundedSemaphore: 與 Semaphore 類似,只是它不允許超過初始值
- Timer: 與 Thread 相似,只是,它要等待一段時(shí)間后才開始運(yùn)行。
守護(hù)線程
另一個(gè)避免使用 thread 模塊的原因是,它不支持守護(hù)線程。當(dāng)主線程退出時(shí),所有的子線程不 論它們是否還在工作,都會(huì)被強(qiáng)行退出。有時(shí),我們并不期望這種行為,這時(shí),就引入了守護(hù)線程 的概念
threading 模塊支持守護(hù)線程,它們是這樣工作的:守護(hù)線程一般是一個(gè)等待客戶請(qǐng)求的服務(wù)器, 如果沒有客戶 出請(qǐng)求,它就在那等著。如果你設(shè)定一個(gè)線程為守護(hù)線程,就表示你在說這個(gè)線程 是不重要的,在進(jìn)程退出的時(shí)候,不用等待這個(gè)線程退出。
如果你的主線程要退出的時(shí)候,不用等待那些子線程完成,那就設(shè)定這些線程的 daemon 屬性。 即,在線程開始(調(diào)用 thread.start())之前,調(diào)用 setDaemon()函數(shù)設(shè)定線程的 daemon 標(biāo)志 (thread.setDaemon(True))就表示這個(gè)線程“不重要”
如果你想要等待子線程完成再退出,那就什么都不用做,或者顯式地調(diào)用 thread.setDaemon(False)以保證其 daemon 標(biāo)志為 False。你可以調(diào)用 thread.isDaemon()函數(shù)來判 斷其 daemon 標(biāo)志的值。新的子線程會(huì)繼承其父線程的 daemon 標(biāo)志。整個(gè) Python 會(huì)在所有的非守護(hù) 線程退出后才會(huì)結(jié)束,即進(jìn)程中沒有非守護(hù)線程存在的時(shí)候才結(jié)束。
Thread 類
Thread類提供了以下方法:
- run(): 用以表示線程活動(dòng)的方法。
- start():啟動(dòng)線程活動(dòng)。
- join([time]): 等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生。
- is_alive(): 返回線程是否活動(dòng)的。
- name(): 設(shè)置/返回線程名。
- daemon(): 返回/設(shè)置線程的 daemon 標(biāo)志,一定要在調(diào)用 start()函數(shù)前設(shè)置
用 Thread 類,你可以用多種方法來創(chuàng)建線程。我們在這里介紹三種比較相像的方法。
- 創(chuàng)建一個(gè)Thread的實(shí)例,傳給它一個(gè)函數(shù)
- 創(chuàng)建一個(gè)Thread的實(shí)例,傳給它一個(gè)可調(diào)用的類對(duì)象
- 從Thread派生出一個(gè)子類,創(chuàng)建一個(gè)這個(gè)子類的實(shí)例
下邊是三種不同方式的創(chuàng)建線程的示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
#! -*- coding: utf-8 -*- # 創(chuàng)建一個(gè)Thread的實(shí)例,傳給它一個(gè)函數(shù) import threading from time import sleep, time loops = [4, 2] def loop(nloop, nsec, lock): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) # 每個(gè)線程都會(huì)被分配一個(gè)事先已經(jīng)獲得的鎖,在 sleep()的時(shí)間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個(gè)線程已經(jīng)結(jié)束了。 def main(): print('starting at:', time()) threads = [] nloops = range(len(loops)) for i in nloops: t = threading.Thread(target=loop, args=(i, loops[i])) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()會(huì)等到線程結(jié)束,或者在給了 timeout 參數(shù)的時(shí)候,等到超時(shí)為止。 # 使用 join()看上去 會(huì)比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") threads[i].join() # threads to finish print('all DONE at:', time()) if __name__ == '__main__': main() |
與傳一個(gè)函數(shù)很相似的另一個(gè)方法是在創(chuàng)建線程的時(shí)候,傳一個(gè)可調(diào)用的類的實(shí)例供線程啟動(dòng) 的時(shí)候執(zhí)行——這是多線程編程的一個(gè)更為面向?qū)ο蟮姆椒āO鄬?duì)于一個(gè)或幾個(gè)函數(shù)來說,由于類 對(duì)象里可以使用類的強(qiáng)大的功能,可以保存更多的信息,這種方法更為靈活
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
|
#! -*- coding: utf-8 -*- # 創(chuàng)建一個(gè) Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類對(duì)象 from threading import Thread from time import sleep, time loops = [4, 2] class ThreadFunc(object): def __init__(self, func, args, name=""): self.name = name self.func = func self.args = args def __call__(self): # 創(chuàng)建新線程的時(shí)候,Thread 對(duì)象會(huì)調(diào)用我們的 ThreadFunc 對(duì)象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。 self.func(*self.args) def loop(nloop, nsec): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) def main(): print('starting at:', time()) threads = [] nloops = range(len(loops)) for i in nloops: t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__)) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()會(huì)等到線程結(jié)束,或者在給了 timeout 參數(shù)的時(shí)候,等到超時(shí)為止。 # 使用 join()看上去 會(huì)比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") threads[i].join() # threads to finish print('all DONE at:', time()) if __name__ == '__main__': main() |
最后一個(gè)例子介紹如何子類化 Thread 類,這與上一個(gè)例子中的創(chuàng)建一個(gè)可調(diào)用的類非常像。使 用子類化創(chuàng)建線程(第 29-30 行)使代碼看上去更清晰明了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
|
#! -*- coding: utf-8 -*- # 創(chuàng)建一個(gè) Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類對(duì)象 from threading import Thread from time import sleep, time loops = [4, 2] class MyThread(Thread): def __init__(self, func, args, name=""): super(MyThread, self).__init__() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 創(chuàng)建新線程的時(shí)候,Thread 對(duì)象會(huì)調(diào)用我們的 ThreadFunc 對(duì)象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。 print 'starting', self.name, 'at:', time() self.res = self.func(*self.args) print self.name, 'finished at:', time() def loop(nloop, nsec): print('start loop %s at: %s' % (nloop, time())) sleep(nsec) print('loop %s done at: %s' % (nloop, time())) def main(): print('starting at:', time()) threads = [] nloops = range(len(loops)) for i in nloops: t = MyThread(loop, (i, loops[i]), loop.__name__) threads.append(t) for i in nloops: # start threads threads[i].start() for i in nloops: # wait for all # join()會(huì)等到線程結(jié)束,或者在給了 timeout 參數(shù)的時(shí)候,等到超時(shí)為止。 # 使用 join()看上去 會(huì)比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock") threads[i].join() # threads to finish print('all DONE at:', time()) if __name__ == '__main__': main() |
除了各種同步對(duì)象和線程對(duì)象外,threading 模塊還 供了一些函數(shù)。
- active_count(): 當(dāng)前活動(dòng)的線程對(duì)象的數(shù)量
- current_thread(): 返回當(dāng)前線程對(duì)象
- enumerate(): 返回當(dāng)前活動(dòng)線程的列表
- settrace(func): 為所有線程設(shè)置一個(gè)跟蹤函數(shù)
- setprofile(func): 為所有線程設(shè)置一個(gè) profile 函數(shù)
Lock & RLock
原語鎖定是一個(gè)同步原語,狀態(tài)是鎖定或未鎖定。兩個(gè)方法acquire()和release() 用于加鎖和釋放鎖。
RLock 可重入鎖是一個(gè)類似于Lock對(duì)象的同步原語,但同一個(gè)線程可以多次調(diào)用。
Lock 不支持遞歸加鎖,也就是說即便在同 線程中,也必須等待鎖釋放。通常建議改 RLock, 它會(huì)處理 “owning thread” 和 “recursion level” 狀態(tài),對(duì)于同 線程的多次請(qǐng)求鎖 為,只累加
計(jì)數(shù)器。每次調(diào) release() 將遞減該計(jì)數(shù)器,直到 0 時(shí)釋放鎖,因此 acquire() 和 release() 必須 要成對(duì)出現(xiàn)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
from time import sleep from threading import current_thread, Thread lock = Rlock() def show(): with lock: print current_thread().name, i sleep(0.1) def test(): with lock: for i in range(3): show(i) for i in range(2): Thread(target=test).start() |
Event
事件用于在線程間通信。一個(gè)線程發(fā)出一個(gè)信號(hào),其他一個(gè)或多個(gè)線程等待。
Event 通過通過 個(gè)內(nèi)部標(biāo)記來協(xié)調(diào)多線程運(yùn) 。 法 wait() 阻塞線程執(zhí) ,直到標(biāo)記為 True。 set() 將標(biāo)記設(shè)為 True,clear() 更改標(biāo)記為 False。isSet() 用于判斷標(biāo)記狀態(tài)。
|
from threading import Event def test_event(): e = Event() def test(): for i in range(5): print 'start wait' e.wait() e.clear() # 如果不調(diào)用clear(),那么標(biāo)記一直為 True,wait()就不會(huì)發(fā)生阻塞行為 print i Thread(target=test).start() return e e = test_event() |
Condition
條件變量和 Lock 參數(shù)一樣,也是一個(gè),也是一個(gè)同步原語,當(dāng)需要線程關(guān)注特定的狀態(tài)變化或事件的發(fā)生時(shí)使用這個(gè)鎖定。
可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個(gè)等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個(gè)線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。
構(gòu)造方法:
Condition([lock/rlock])
Condition 有以下這些方法:
- acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。
- wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
- notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
- notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
from threading import Condition, current_thread, Thread con = Condition() def tc1(): with con: for i in range(5): print current_thread().name, i sleep(0.3) if i == 3: con.wait() def tc2(): with con: for i in range(5): print current_thread().name, i sleep(0.1) con.notify() Thread(target=tc1).start() Thread(target=tc2).start() Thread-1 0 Thread-1 1 Thread-1 2 Thread-1 3 # 讓出鎖 Thread-2 0 Thread-2 1 Thread-2 2 Thread-2 3 Thread-2 4 Thread-1 4 # 重新獲取鎖,繼續(xù)執(zhí) |
只有獲取鎖的線程才能調(diào)用 wait() 和 notify(),因此必須在鎖釋放前調(diào)用。
當(dāng) wait() 釋放鎖后,其他線程也可進(jìn)入 wait 狀態(tài)。notifyAll() 激活所有等待線程,讓它們?nèi)屾i然后完成后續(xù)執(zhí)行。
生產(chǎn)者-消費(fèi)者問題和 Queue 模塊
現(xiàn)在我們用一個(gè)經(jīng)典的(生產(chǎn)者消費(fèi)者)例子來介紹一下 Queue模塊。
生產(chǎn)者消費(fèi)者的場景是: 生產(chǎn)者生產(chǎn)貨物,然后把貨物放到一個(gè)隊(duì)列之類的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)貨物所要花費(fèi)的時(shí)間無法預(yù)先確定。消費(fèi)者消耗生產(chǎn)者生產(chǎn)的貨物的時(shí)間也是不確定的。
常用的 Queue 模塊的屬性:
- queue(size): 創(chuàng)建一個(gè)大小為size的Queue對(duì)象。
- qsize(): 返回隊(duì)列的大小(由于在返回的時(shí)候,隊(duì)列可能會(huì)被其它線程修改,所以這個(gè)值是近似值)
- empty(): 如果隊(duì)列為空返回 True,否則返回 False
- full(): 如果隊(duì)列已滿返回 True,否則返回 False
- put(item,block=0): 把item放到隊(duì)列中,如果給了block(不為0),函數(shù)會(huì)一直阻塞到隊(duì)列中有空間為止
- get(block=0): 從隊(duì)列中取一個(gè)對(duì)象,如果給了 block(不為 0),函數(shù)會(huì)一直阻塞到隊(duì)列中有對(duì)象為止
Queue 模塊可以用來進(jìn)行線程間通訊,讓各個(gè)線程之間共享數(shù)據(jù)。
現(xiàn)在,我們創(chuàng)建一個(gè)隊(duì)列,讓 生產(chǎn)者(線程)把新生產(chǎn)的貨物放進(jìn)去供消費(fèi)者(線程)使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
#! -*- coding: utf-8 -*- from Queue import Queue from random import randint from time import sleep, time from threading import Thread class MyThread(Thread): def __init__(self, func, args, name=""): super(MyThread, self).__init__() self.name = name self.func = func self.args = args def getResult(self): return self.res def run(self): # 創(chuàng)建新線程的時(shí)候,Thread 對(duì)象會(huì)調(diào)用我們的 ThreadFunc 對(duì)象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。 print 'starting', self.name, 'at:', time() self.res = self.func(*self.args) print self.name, 'finished at:', time() # writeQ()和 readQ()函數(shù)分別用來把對(duì)象放入隊(duì)列和消耗隊(duì)列中的一個(gè)對(duì)象。在這里我們使用 字符串'xxx'來表示隊(duì)列中的對(duì)象。 def writeQ(queue): print 'producing object for Q...' queue.put('xxx', 1) print "size now", queue.qsize() def readQ(queue): queue.get(1) print("consumed object from Q... size now", queue.qsize()) def writer(queue, loops): # writer()函數(shù)只做一件事,就是一次往隊(duì)列中放入一個(gè)對(duì)象,等待一會(huì),然后再做同樣的事 for i in range(loops): writeQ(queue) sleep(1) def reader(queue, loops): # reader()函數(shù)只做一件事,就是一次從隊(duì)列中取出一個(gè)對(duì)象,等待一會(huì),然后再做同樣的事 for i in range(loops): readQ(queue) sleep(randint(2, 5)) # 設(shè)置有多少個(gè)線程要被運(yùn)行 funcs = [writer, reader] nfuncs = range(len(funcs)) def main(): nloops = randint(10, 20) q = Queue(32) threads = [] for i in nfuncs: t = MyThread(funcs[i], (q, nloops), funcs[i].__name__) threads.append(t) for i in nfuncs: threads[i].start() for i in nfuncs: threads[i].join() print threads[i].getResult() print 'all DONE' if __name__ == '__main__': main() |
FAQ
進(jìn)程與線程。線程與進(jìn)程的區(qū)別是什么?
進(jìn)程(有時(shí)被稱為重量級(jí)進(jìn)程)是程序的一次 執(zhí)行。每個(gè)進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。
線程(有時(shí)被稱為輕量級(jí)進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個(gè)進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。
這篇文章很好的解釋了 線程和進(jìn)程的區(qū)別,推薦閱讀: http://www./blo…
Python 的線程。在 Python 中,哪一種多線程的程序表現(xiàn)得更好,I/O 密集型的還是計(jì)算 密集型的?
由于GIL的緣故,對(duì)所有面向 I/O 的(會(huì)調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會(huì)在這個(gè) I/O 調(diào)用之 前被釋放,以允許其它的線程在這個(gè)線程等待 I/O 的時(shí)候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會(huì)在自己的時(shí)間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計(jì)算密集 型的程序更能充分利用多線程環(huán)境的好處。
線程。你認(rèn)為,多 CPU 的系統(tǒng)與一般的系統(tǒng)有什么大的不同?多線程的程序在這種系統(tǒng)上的表現(xiàn)會(huì)怎么樣?
Python的線程就是C語言的一個(gè)pthread,并通過操作系統(tǒng)調(diào)度算法進(jìn)行調(diào)度(例如linux是CFS)。為了讓各個(gè)線程能夠平均利用CPU時(shí)間,python會(huì)計(jì)算當(dāng)前已執(zhí)行的微代碼數(shù)量,達(dá)到一定閾值后就強(qiáng)制釋放GIL。而這時(shí)也會(huì)觸發(fā)一次操作系統(tǒng)的線程調(diào)度(當(dāng)然是否真正進(jìn)行上下文切換由操作系統(tǒng)自主決定)。
偽代碼
|
while True: acquire GIL for i in 1000: do something release GIL /* Give Operating System a chance to do thread scheduling */ |
這種模式在只有一個(gè)CPU核心的情況下毫無問題。任何一個(gè)線程被喚起時(shí)都能成功獲得到GIL(因?yàn)橹挥嗅尫帕薌IL才會(huì)引發(fā)線程調(diào)度)。
但當(dāng)CPU有多個(gè)核心的時(shí)候,問題就來了。從偽代碼可以看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。所以當(dāng)其他在其他核心上的線程被喚醒時(shí),大部分情況下主線程已經(jīng)又再一次獲取到GIL了。這個(gè)時(shí)候被喚醒執(zhí)行的線程只能白白的浪費(fèi)CPU時(shí)間,看著另一個(gè)線程拿著GIL歡快的執(zhí)行著。然后達(dá)到切換時(shí)間后進(jìn)入待調(diào)度狀態(tài),再被喚醒,再等待,以此往復(fù)惡性循環(huán)。
簡單的總結(jié)下就是:Python的多線程在多核CPU上,只對(duì)于IO密集型計(jì)算產(chǎn)生正面效果;而當(dāng)有至少有一個(gè)CPU密集型線程存在,那么多線程效率會(huì)由于GIL而大幅下降。
線程池。修改 生成者消費(fèi)者 的代碼,不再是一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者,而是可以有任意個(gè) 消費(fèi)者線程(一個(gè)線程池),每個(gè)線程可以在任意時(shí)刻處理或消耗任意多個(gè)產(chǎn)品。
參考文章
|