小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

python線程筆記

 qweertt4747 2018-04-14
原文出處: goodspeed   

引言&動(dòng)機(jī)

考慮一下這個(gè)場景,我們有10000條數(shù)據(jù)需要處理,處理每條數(shù)據(jù)需要花費(fèi)1秒,但讀取數(shù)據(jù)只需要0.1秒,每條數(shù)據(jù)互不干擾。該如何執(zhí)行才能花費(fèi)時(shí)間最短呢?

在多線程(MT)編程出現(xiàn)之前,電腦程序的運(yùn)行由一個(gè)執(zhí)行序列組成,執(zhí)行序列按順序在主機(jī)的中央處理器(CPU)中運(yùn)行。無論是任務(wù)本身要求順序執(zhí)行還是整個(gè)程序是由多個(gè)子任務(wù)組成,程序都是按這種方式執(zhí)行的。即使子任務(wù)相互獨(dú)立,互相無關(guān)(即,一個(gè)子任務(wù)的結(jié)果不影響其它子 任務(wù)的結(jié)果)時(shí)也是這樣。

對(duì)于上邊的問題,如果使用一個(gè)執(zhí)行序列來完成,我們大約需要花費(fèi) 10000*0.1 + 10000 = 11000 秒。這個(gè)時(shí)間顯然是太長了。

那我們有沒有可能在執(zhí)行計(jì)算的同時(shí)取數(shù)據(jù)呢?或者是同時(shí)處理幾條數(shù)據(jù)呢?如果可以,這樣就能大幅提高任務(wù)的效率。這就是多線程編程的目的。

對(duì)于本質(zhì)上就是異步的, 需要有多個(gè)并發(fā)事務(wù),各個(gè)事務(wù)的運(yùn)行順序可以是不確定的,隨機(jī)的,不可預(yù)測的問題,多線程是最理想的解決方案。這樣的任務(wù)可以被分成多個(gè)執(zhí)行流,每個(gè)流都有一個(gè)要完成的目標(biāo),然后將得到的結(jié)果合并,得到最終的結(jié)果。

線程和進(jìn)程

什么是進(jìn)程

進(jìn)程(有時(shí)被稱為重量級(jí)進(jìn)程)是程序的一次 執(zhí)行。每個(gè)進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。操作系 統(tǒng)管理在其上運(yùn)行的所有進(jìn)程,并為這些進(jìn)程公平地分配時(shí)間。進(jìn)程也可以通過 fork 和 spawn 操作 來完成其它的任務(wù)。不過各個(gè)進(jìn)程有自己的內(nèi)存空間,數(shù)據(jù)棧等,所以只能使用進(jìn)程間通訊(IPC), 而不能直接共享信息。

什么是線程

線程(有時(shí)被稱為輕量級(jí)進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個(gè)進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。

線程狀態(tài)如圖

線程有開始,順序執(zhí)行和結(jié)束三部分。它有一個(gè)自己的指令指針,記錄自己運(yùn)行到什么地方。 線程的運(yùn)行可能被搶占(中斷),或暫時(shí)的被掛起(也叫睡眠),讓其它的線程運(yùn)行,這叫做讓步。 一個(gè)進(jìn)程中的各個(gè)線程之間共享同一片數(shù)據(jù)空間,所以線程之間可以比進(jìn)程之間更方便地共享數(shù)據(jù)以及相互通訊。

當(dāng)然,這樣的共享并不是完全沒有危險(xiǎn)的。如果多個(gè)線程共同訪問同一片數(shù)據(jù),則由于數(shù)據(jù)訪 問的順序不一樣,有可能導(dǎo)致數(shù)據(jù)結(jié)果的不一致的問題。這叫做競態(tài)條件(race condition)。

線程一般都是并發(fā)執(zhí)行的,不過在單 CPU 的系統(tǒng)中,真正的并發(fā)是不可能的,每個(gè)線程會(huì)被安排成每次只運(yùn)行一小會(huì),然后就把 CPU 讓出來,讓其它的線程去運(yùn)行。由于有的函數(shù)會(huì)在完成之前阻塞住,在沒有特別為多線程做修改的情 況下,這種“貪婪”的函數(shù)會(huì)讓 CPU 的時(shí)間分配有所傾斜。導(dǎo)致各個(gè)線程分配到的運(yùn)行時(shí)間可能不 盡相同,不盡公平。

Python、線程和全局解釋器鎖

全局解釋器鎖(GIL)

首先需要明確的一點(diǎn)是GIL并不是Python的特性,它是在實(shí)現(xiàn)Python解析器(CPython)時(shí)所引入的一個(gè)概念。就好比C++是一套語言(語法)標(biāo)準(zhǔn),但是可以用不同的編譯器來編譯成可執(zhí)行代碼。同樣一段代碼可以通過CPython,PyPy,Psyco等不同的Python執(zhí)行環(huán)境來執(zhí)行(其中的JPython就沒有GIL)。

那么CPython實(shí)現(xiàn)中的GIL又是什么呢?GIL全稱Global Interpreter Lock為了避免誤導(dǎo),我們還是來看一下官方給出的解釋:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

盡管Python完全支持多線程編程, 但是解釋器的C語言實(shí)現(xiàn)部分在完全并行執(zhí)行時(shí)并不是線程安全的。 實(shí)際上,解釋器被一個(gè)全局解釋器鎖保護(hù)著,它確保任何時(shí)候都只有一個(gè)Python線程執(zhí)行。

在多線程環(huán)境中,Python 虛擬機(jī)按以下方式執(zhí)行:

  1. 設(shè)置GIL
  2. 切換到一個(gè)線程去執(zhí)行
  3. 運(yùn)行
    • 指定數(shù)量的字節(jié)碼指令
    • 線程主動(dòng)讓出控制(可以調(diào)用time.sleep(0))
  4. 把線程設(shè)置完睡眠狀態(tài)
  5. 解鎖GIL
  6. 再次重復(fù)以上步驟

對(duì)所有面向 I/O 的(會(huì)調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會(huì)在這個(gè) I/O 調(diào)用之 前被釋放,以允許其它的線程在這個(gè)線程等待 I/O 的時(shí)候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會(huì)在自己的時(shí)間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計(jì)算密集 型的程序更能充分利用多線程環(huán)境的好處。

退出線程

當(dāng)一個(gè)線程結(jié)束計(jì)算,它就退出了。線程可以調(diào)用 thread.exit()之類的退出函數(shù),也可以使用 Python 退出進(jìn)程的標(biāo)準(zhǔn)方法,如 sys.exit()或拋出一個(gè) SystemExit 異常等。不過,你不可以直接 “殺掉”(“kill”)一個(gè)線程。

在 Python 中使用線程

在 Win32 和 Linux, Solaris, MacOS, *BSD 等大多數(shù)類 Unix 系統(tǒng)上運(yùn)行時(shí),Python 支持多線程 編程。Python 使用 POSIX 兼容的線程,即 pthreads。

默認(rèn)情況下,只要在解釋器中

1
>> import thread

如果沒有報(bào)錯(cuò),則說明線程可用。

Python 的 threading 模塊

Python 供了幾個(gè)用于多線程編程的模塊,包括 thread, threading 和 Queue 等。thread 和 threading 模塊允許程序員創(chuàng)建和管理線程。thread 模塊 供了基本的線程和鎖的支持,而 threading 供了更高級(jí)別,功能更強(qiáng)的線程管理的功能。Queue 模塊允許用戶創(chuàng)建一個(gè)可以用于多個(gè)線程之間 共享數(shù)據(jù)的隊(duì)列數(shù)據(jù)結(jié)構(gòu)。

核心 示:避免使用 thread 模塊

出于以下幾點(diǎn)考慮,我們不建議您使用 thread 模塊。

  1. 更高級(jí)別的 threading 模塊更為先 進(jìn),對(duì)線程的支持更為完善,而且使用 thread 模塊里的屬性有可能會(huì)與 threading 出現(xiàn)沖突。其次, 低級(jí)別的 thread 模塊的同步原語很少(實(shí)際上只有一個(gè)),而 threading 模塊則有很多。
  2. 對(duì)于你的進(jìn)程什么時(shí)候應(yīng)該結(jié)束完全沒有控制,當(dāng)主線程結(jié)束 時(shí),所有的線程都會(huì)被強(qiáng)制結(jié)束掉,沒有警告也不會(huì)有正常的清除工作。我們之前說過,至少 threading 模塊能確保重要的子線程退出后進(jìn)程才退出。

thread 模塊

除了產(chǎn)生線程外,thread 模塊也提供了基本的同步數(shù) 據(jù)結(jié)構(gòu)鎖對(duì)象(lock object,也叫原語鎖,簡單鎖,互斥鎖,互斥量,二值信號(hào)量)。

thread 模塊函數(shù)

  • start_new_thread(function, args, kwargs=None):產(chǎn)生一個(gè)新的線程,在新線程中用指定的參數(shù)和可選的 kwargs 來調(diào)用這個(gè)函數(shù)。
  • allocate_lock():分配一個(gè) LockType 類型的鎖對(duì)象
  • exit():讓線程退出
  • acquire(wait=None):嘗試獲取鎖對(duì)象
  • locked():如果獲取了鎖對(duì)象返回 True,否則返回 False
  • release():釋放鎖

下面是一個(gè)使用 thread 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import thread
from time import sleep, time
def loop(num):
    print('start loop at:', time())
    sleep(num)
    print('loop done at:', time())
def loop1(num):
    print('start loop 1 at:', time())
    sleep(num)
    print('loop 1 done at:', time())
def main():
    print('starting at:', time())
    thread.start_new_thread(loop, (4,))
    thread.start_new_thread(loop1, (5,))
    sleep(6)
    print('all DONE at:', time())
if __name__ == '__main__':
    main()
('starting at:', 1489387024.886667)
('start loop at:', 1489387024.88705)
('start loop 1 at:', 1489387024.887277)
('loop done at:', 1489387028.888182)
('loop 1 done at:', 1489387029.888904)
('all DONE at:', 1489387030.889918)

start_new_thread()要求一定要有前兩個(gè)參數(shù)。所以,就算我們想要運(yùn)行的函數(shù)不要參數(shù),也要傳一個(gè)空的元組。
為什么要加上sleep(6)這一句呢? 因?yàn)椋绻覀儧]有讓主線程停下來,那主線程就會(huì)運(yùn)行下一條語句,顯示 “all done”,然后就關(guān)閉運(yùn)行著 loop()和 loop1()的兩個(gè)線程,退出了。

我們有沒有更好的辦法替換使用sleep() 這種不靠譜的同步方式呢?答案是使用鎖,使用了鎖,我們就可以在兩個(gè)線程都退出之后馬上退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#! -*- coding: utf-8 -*-
import thread
from time import sleep, time
loops = [4, 2]
def loop(nloop, nsec, lock):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))
    # 每個(gè)線程都會(huì)被分配一個(gè)事先已經(jīng)獲得的鎖,在 sleep()的時(shí)間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個(gè)線程已經(jīng)結(jié)束了。
    lock.release()
def main():
    print('starting at:', time())
    locks = []
    nloops = range(len(loops))
    for i in nloops:
        # 調(diào)用 thread.allocate_lock()函數(shù)創(chuàng)建一個(gè)鎖的列表
        lock = thread.allocate_lock()
        # 分別調(diào)用各個(gè)鎖的 acquire()函數(shù)獲得, 獲得鎖表示“把鎖鎖上”
        lock.acquire()
        locks.append(lock)
    for i in nloops:
        # 創(chuàng)建線程,每個(gè)線程都用各自的循環(huán)號(hào),睡眠時(shí)間和鎖為參數(shù)去調(diào)用 loop()函數(shù)
        thread.start_new_thread(loop, (i, loops[i], locks[i]))
    for i in nloops:
        # 在線程結(jié)束的時(shí)候,線程要自己去做解鎖操作
        # 當(dāng)前循環(huán)只是坐在那一直等(達(dá)到暫停主 線程的目的),直到兩個(gè)鎖都被解鎖為止才繼續(xù)運(yùn)行。
        while locks[i].locked(): pass
    print('all DONE at:', time())
if __name__ == '__main__':
    main()

為什么我們不在創(chuàng)建鎖的循環(huán)里創(chuàng)建線程呢?有以下幾個(gè)原因:

  1. 我們想到實(shí)現(xiàn)線程的同步,所以要讓“所有的馬同時(shí)沖出柵欄”。
  2. 獲取鎖要花一些時(shí)間,如果你的 線程退出得“太快”,可能會(huì)導(dǎo)致還沒有獲得鎖,線程就已經(jīng)結(jié)束了的情況。

threading 模塊

threading 模塊不僅提供了 Thread 類,還 供了各 種非常好用的同步機(jī)制。

下面是threading 模塊里所有的對(duì)象:

  1. Thread: 表示一個(gè)線程的執(zhí)行的對(duì)象
  2. Lock: 鎖原語對(duì)象(跟 thread 模塊里的鎖對(duì)象相同)
  3. RLock: 可重入鎖對(duì)象。使單線程可以再次獲得已經(jīng)獲得了的鎖(遞歸鎖定)。
  4. Condition: 條件變量對(duì)象能讓一個(gè)線程停下來,等待其它線程滿足了某個(gè)“條件”。 如,狀態(tài)的改變或值的改變。
  5. Event: 通用的條件變量。多個(gè)線程可以等待某個(gè)事件的發(fā)生,在事件發(fā)生后, 所有的線程都會(huì)被激活。
  6. Semaphore: 為等待鎖的線程 供一個(gè)類似“等候室”的結(jié)構(gòu)
  7. BoundedSemaphore: 與 Semaphore 類似,只是它不允許超過初始值
  8. Timer: 與 Thread 相似,只是,它要等待一段時(shí)間后才開始運(yùn)行。
守護(hù)線程

另一個(gè)避免使用 thread 模塊的原因是,它不支持守護(hù)線程。當(dāng)主線程退出時(shí),所有的子線程不 論它們是否還在工作,都會(huì)被強(qiáng)行退出。有時(shí),我們并不期望這種行為,這時(shí),就引入了守護(hù)線程 的概念
threading 模塊支持守護(hù)線程,它們是這樣工作的:守護(hù)線程一般是一個(gè)等待客戶請(qǐng)求的服務(wù)器, 如果沒有客戶 出請(qǐng)求,它就在那等著。如果你設(shè)定一個(gè)線程為守護(hù)線程,就表示你在說這個(gè)線程 是不重要的,在進(jìn)程退出的時(shí)候,不用等待這個(gè)線程退出。
如果你的主線程要退出的時(shí)候,不用等待那些子線程完成,那就設(shè)定這些線程的 daemon 屬性。 即,在線程開始(調(diào)用 thread.start())之前,調(diào)用 setDaemon()函數(shù)設(shè)定線程的 daemon 標(biāo)志 (thread.setDaemon(True))就表示這個(gè)線程“不重要”
如果你想要等待子線程完成再退出,那就什么都不用做,或者顯式地調(diào)用 thread.setDaemon(False)以保證其 daemon 標(biāo)志為 False。你可以調(diào)用 thread.isDaemon()函數(shù)來判 斷其 daemon 標(biāo)志的值。新的子線程會(huì)繼承其父線程的 daemon 標(biāo)志。整個(gè) Python 會(huì)在所有的非守護(hù) 線程退出后才會(huì)結(jié)束,即進(jìn)程中沒有非守護(hù)線程存在的時(shí)候才結(jié)束。

Thread 類

Thread類提供了以下方法:

  • run(): 用以表示線程活動(dòng)的方法。
  • start():啟動(dòng)線程活動(dòng)。
  • join([time]): 等待至線程中止。這阻塞調(diào)用線程直至線程的join() 方法被調(diào)用中止-正常退出或者拋出未處理的異常-或者是可選的超時(shí)發(fā)生。
  • is_alive(): 返回線程是否活動(dòng)的。
  • name(): 設(shè)置/返回線程名。
  • daemon(): 返回/設(shè)置線程的 daemon 標(biāo)志,一定要在調(diào)用 start()函數(shù)前設(shè)置

用 Thread 類,你可以用多種方法來創(chuàng)建線程。我們在這里介紹三種比較相像的方法。

  • 創(chuàng)建一個(gè)Thread的實(shí)例,傳給它一個(gè)函數(shù)
  • 創(chuàng)建一個(gè)Thread的實(shí)例,傳給它一個(gè)可調(diào)用的類對(duì)象
  • 從Thread派生出一個(gè)子類,創(chuàng)建一個(gè)這個(gè)子類的實(shí)例

下邊是三種不同方式的創(chuàng)建線程的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#! -*- coding: utf-8 -*-
# 創(chuàng)建一個(gè)Thread的實(shí)例,傳給它一個(gè)函數(shù)
import threading
from time import sleep, time
loops = [4, 2]
def loop(nloop, nsec, lock):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))
    # 每個(gè)線程都會(huì)被分配一個(gè)事先已經(jīng)獲得的鎖,在 sleep()的時(shí)間到了之后就釋放 相應(yīng)的鎖以通知主線程,這個(gè)線程已經(jīng)結(jié)束了。
def main():
    print('starting at:', time())
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = threading.Thread(target=loop, args=(i, loops[i]))
        threads.append(t)
    for i in nloops:
        # start threads
        threads[i].start()
    for i in nloops:
        # wait for all
        # join()會(huì)等到線程結(jié)束,或者在給了 timeout 參數(shù)的時(shí)候,等到超時(shí)為止。
        # 使用 join()看上去 會(huì)比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        threads[i].join()  # threads to finish
    print('all DONE at:', time())
if __name__ == '__main__':
    main()

與傳一個(gè)函數(shù)很相似的另一個(gè)方法是在創(chuàng)建線程的時(shí)候,傳一個(gè)可調(diào)用的類的實(shí)例供線程啟動(dòng) 的時(shí)候執(zhí)行——這是多線程編程的一個(gè)更為面向?qū)ο蟮姆椒āO鄬?duì)于一個(gè)或幾個(gè)函數(shù)來說,由于類 對(duì)象里可以使用類的強(qiáng)大的功能,可以保存更多的信息,這種方法更為靈活

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#! -*- coding: utf-8 -*-
# 創(chuàng)建一個(gè) Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類對(duì)象
from threading import Thread
from time import sleep, time
loops = [4, 2]
class ThreadFunc(object):
    def __init__(self, func, args, name=""):
        self.name = name
        self.func = func
        self.args = args
    def __call__(self):
        # 創(chuàng)建新線程的時(shí)候,Thread 對(duì)象會(huì)調(diào)用我們的 ThreadFunc 對(duì)象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。
        self.func(*self.args)
def loop(nloop, nsec):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))
def main():
    print('starting at:', time())
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))
        threads.append(t)
    for i in nloops:
        # start threads
        threads[i].start()
    for i in nloops:
        # wait for all
        # join()會(huì)等到線程結(jié)束,或者在給了 timeout 參數(shù)的時(shí)候,等到超時(shí)為止。
        # 使用 join()看上去 會(huì)比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        threads[i].join()  # threads to finish
    print('all DONE at:', time())
if __name__ == '__main__':
    main()

最后一個(gè)例子介紹如何子類化 Thread 類,這與上一個(gè)例子中的創(chuàng)建一個(gè)可調(diào)用的類非常像。使 用子類化創(chuàng)建線程(第 29-30 行)使代碼看上去更清晰明了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#! -*- coding: utf-8 -*-
# 創(chuàng)建一個(gè) Thread 的實(shí)例,傳給它一個(gè)可調(diào)用的類對(duì)象
from threading import Thread
from time import sleep, time
loops = [4, 2]
class MyThread(Thread):
    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args
    def getResult(self):
        return self.res
    def run(self):
        # 創(chuàng)建新線程的時(shí)候,Thread 對(duì)象會(huì)調(diào)用我們的 ThreadFunc 對(duì)象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。
        print 'starting', self.name, 'at:', time()
        self.res = self.func(*self.args)
        print self.name, 'finished at:', time()
def loop(nloop, nsec):
    print('start loop %s at: %s' % (nloop, time()))
    sleep(nsec)
    print('loop %s done at: %s' % (nloop, time()))
def main():
    print('starting at:', time())
    threads = []
    nloops = range(len(loops))
    for i in nloops:
        t = MyThread(loop, (i, loops[i]), loop.__name__)
        threads.append(t)
    for i in nloops:
        # start threads
        threads[i].start()
    for i in nloops:
        # wait for all
        # join()會(huì)等到線程結(jié)束,或者在給了 timeout 參數(shù)的時(shí)候,等到超時(shí)為止。
        # 使用 join()看上去 會(huì)比使用一個(gè)等待鎖釋放的無限循環(huán)清楚一些(這種鎖也被稱為"spinlock")
        threads[i].join()  # threads to finish
    print('all DONE at:', time())
if __name__ == '__main__':
    main()

除了各種同步對(duì)象和線程對(duì)象外,threading 模塊還 供了一些函數(shù)。

  • active_count(): 當(dāng)前活動(dòng)的線程對(duì)象的數(shù)量
  • current_thread(): 返回當(dāng)前線程對(duì)象
  • enumerate(): 返回當(dāng)前活動(dòng)線程的列表
  • settrace(func): 為所有線程設(shè)置一個(gè)跟蹤函數(shù)
  • setprofile(func): 為所有線程設(shè)置一個(gè) profile 函數(shù)

Lock & RLock

原語鎖定是一個(gè)同步原語,狀態(tài)是鎖定或未鎖定。兩個(gè)方法acquire()和release() 用于加鎖和釋放鎖。
RLock 可重入鎖是一個(gè)類似于Lock對(duì)象的同步原語,但同一個(gè)線程可以多次調(diào)用。

Lock 不支持遞歸加鎖,也就是說即便在同 線程中,也必須等待鎖釋放。通常建議改 RLock, 它會(huì)處理 “owning thread” 和 “recursion level” 狀態(tài),對(duì)于同 線程的多次請(qǐng)求鎖 為,只累加
計(jì)數(shù)器。每次調(diào) release() 將遞減該計(jì)數(shù)器,直到 0 時(shí)釋放鎖,因此 acquire() 和 release() 必須 要成對(duì)出現(xiàn)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from time import sleep
from threading import current_thread, Thread
lock = Rlock()
def show():
    with lock:
        print current_thread().name, i
        sleep(0.1)
def test():
    with lock:
        for i in range(3):
            show(i)
for i in range(2):
    Thread(target=test).start()

Event

事件用于在線程間通信。一個(gè)線程發(fā)出一個(gè)信號(hào),其他一個(gè)或多個(gè)線程等待。
Event 通過通過 個(gè)內(nèi)部標(biāo)記來協(xié)調(diào)多線程運(yùn) 。 法 wait() 阻塞線程執(zhí) ,直到標(biāo)記為 True。 set() 將標(biāo)記設(shè)為 True,clear() 更改標(biāo)記為 False。isSet() 用于判斷標(biāo)記狀態(tài)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from threading import Event
def test_event():
    e = Event()
    def test():
        for i in range(5):
            print 'start wait'
            e.wait()
            e.clear()  # 如果不調(diào)用clear(),那么標(biāo)記一直為 True,wait()就不會(huì)發(fā)生阻塞行為
            print i
Thread(target=test).start()
return e
e = test_event()

Condition

條件變量和 Lock 參數(shù)一樣,也是一個(gè),也是一個(gè)同步原語,當(dāng)需要線程關(guān)注特定的狀態(tài)變化或事件的發(fā)生時(shí)使用這個(gè)鎖定。

可以認(rèn)為,除了Lock帶有的鎖定池外,Condition還包含一個(gè)等待池,池中的線程處于狀態(tài)圖中的等待阻塞狀態(tài),直到另一個(gè)線程調(diào)用notify()/notifyAll()通知;得到通知后線程進(jìn)入鎖定池等待鎖定。

構(gòu)造方法:
Condition([lock/rlock])

Condition 有以下這些方法:

  • acquire([timeout])/release(): 調(diào)用關(guān)聯(lián)的鎖的相應(yīng)方法。
  • wait([timeout]): 調(diào)用這個(gè)方法將使線程進(jìn)入Condition的等待池等待通知,并釋放鎖。使用前線程必須已獲得鎖定,否則將拋出異常。
  • notify(): 調(diào)用這個(gè)方法將從等待池挑選一個(gè)線程并通知,收到通知的線程將自動(dòng)調(diào)用acquire()嘗試獲得鎖定(進(jìn)入鎖定池);其他線程仍然在等待池中。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。
  • notifyAll(): 調(diào)用這個(gè)方法將通知等待池中所有的線程,這些線程都將進(jìn)入鎖定池嘗試獲得鎖定。調(diào)用這個(gè)方法不會(huì)釋放鎖定。使用前線程必須已獲得鎖定,否則將拋出異常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from threading import Condition, current_thread, Thread
con = Condition()
def tc1():
    with con:
        for i in range(5):
            print current_thread().name, i
            sleep(0.3)
            if i == 3:
                con.wait()
def tc2():
    with con:
        for i in range(5):
            print current_thread().name, i
            sleep(0.1)
            con.notify()
Thread(target=tc1).start()
Thread(target=tc2).start()
Thread-1 0
Thread-1 1
Thread-1 2
Thread-1 3    # 讓出鎖
Thread-2 0
Thread-2 1
Thread-2 2
Thread-2 3
Thread-2 4
Thread-1 4    # 重新獲取鎖,繼續(xù)執(zhí)

只有獲取鎖的線程才能調(diào)用 wait() 和 notify(),因此必須在鎖釋放前調(diào)用。
當(dāng) wait() 釋放鎖后,其他線程也可進(jìn)入 wait 狀態(tài)。notifyAll() 激活所有等待線程,讓它們?nèi)屾i然后完成后續(xù)執(zhí)行。

生產(chǎn)者-消費(fèi)者問題和 Queue 模塊

現(xiàn)在我們用一個(gè)經(jīng)典的(生產(chǎn)者消費(fèi)者)例子來介紹一下 Queue模塊。

生產(chǎn)者消費(fèi)者的場景是: 生產(chǎn)者生產(chǎn)貨物,然后把貨物放到一個(gè)隊(duì)列之類的數(shù)據(jù)結(jié)構(gòu)中,生產(chǎn)貨物所要花費(fèi)的時(shí)間無法預(yù)先確定。消費(fèi)者消耗生產(chǎn)者生產(chǎn)的貨物的時(shí)間也是不確定的。

常用的 Queue 模塊的屬性:

  • queue(size): 創(chuàng)建一個(gè)大小為size的Queue對(duì)象。
  • qsize(): 返回隊(duì)列的大小(由于在返回的時(shí)候,隊(duì)列可能會(huì)被其它線程修改,所以這個(gè)值是近似值)
  • empty(): 如果隊(duì)列為空返回 True,否則返回 False
  • full(): 如果隊(duì)列已滿返回 True,否則返回 False
  • put(item,block=0): 把item放到隊(duì)列中,如果給了block(不為0),函數(shù)會(huì)一直阻塞到隊(duì)列中有空間為止
  • get(block=0): 從隊(duì)列中取一個(gè)對(duì)象,如果給了 block(不為 0),函數(shù)會(huì)一直阻塞到隊(duì)列中有對(duì)象為止

Queue 模塊可以用來進(jìn)行線程間通訊,讓各個(gè)線程之間共享數(shù)據(jù)。

現(xiàn)在,我們創(chuàng)建一個(gè)隊(duì)列,讓 生產(chǎn)者(線程)把新生產(chǎn)的貨物放進(jìn)去供消費(fèi)者(線程)使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#! -*- coding: utf-8 -*-
from Queue import Queue
from random import randint
from time import sleep, time
from threading import Thread
class MyThread(Thread):
    def __init__(self, func, args, name=""):
        super(MyThread, self).__init__()
        self.name = name
        self.func = func
        self.args = args
    def getResult(self):
        return self.res
    def run(self):
        # 創(chuàng)建新線程的時(shí)候,Thread 對(duì)象會(huì)調(diào)用我們的 ThreadFunc 對(duì)象,這時(shí)會(huì)用到一個(gè)特殊函數(shù) __call__()。
        print 'starting', self.name, 'at:', time()
        self.res = self.func(*self.args)
        print self.name, 'finished at:', time()
# writeQ()和 readQ()函數(shù)分別用來把對(duì)象放入隊(duì)列和消耗隊(duì)列中的一個(gè)對(duì)象。在這里我們使用 字符串'xxx'來表示隊(duì)列中的對(duì)象。
def writeQ(queue):
    print 'producing object for Q...'
    queue.put('xxx', 1)
    print "size now", queue.qsize()
def readQ(queue):
    queue.get(1)
    print("consumed object from Q... size now", queue.qsize())
def writer(queue, loops):
    # writer()函數(shù)只做一件事,就是一次往隊(duì)列中放入一個(gè)對(duì)象,等待一會(huì),然后再做同樣的事
    for i in range(loops):
        writeQ(queue)
        sleep(1)
def reader(queue, loops):
    # reader()函數(shù)只做一件事,就是一次從隊(duì)列中取出一個(gè)對(duì)象,等待一會(huì),然后再做同樣的事
    for i in range(loops):
        readQ(queue)
        sleep(randint(2, 5))
# 設(shè)置有多少個(gè)線程要被運(yùn)行
funcs = [writer, reader]
nfuncs = range(len(funcs))
def main():
    nloops = randint(10, 20)
    q = Queue(32)
    threads = []
    for i in nfuncs:
        t = MyThread(funcs[i], (q, nloops), funcs[i].__name__)
        threads.append(t)
    for i in nfuncs:
        threads[i].start()
    for i in nfuncs:
        threads[i].join()
        print threads[i].getResult()
    print 'all DONE'
if __name__ == '__main__':
    main()

FAQ

進(jìn)程與線程。線程與進(jìn)程的區(qū)別是什么?

進(jìn)程(有時(shí)被稱為重量級(jí)進(jìn)程)是程序的一次 執(zhí)行。每個(gè)進(jìn)程都有自己的地址空間,內(nèi)存,數(shù)據(jù)棧以及其它記錄其運(yùn)行軌跡的輔助數(shù)據(jù)。
線程(有時(shí)被稱為輕量級(jí)進(jìn)程)跟進(jìn)程有些相似,不同的是,所有的線程運(yùn)行在同一個(gè)進(jìn)程中, 共享相同的運(yùn)行環(huán)境。它們可以想像成是在主進(jìn)程或“主線程”中并行運(yùn)行的“迷你進(jìn)程”。

這篇文章很好的解釋了 線程和進(jìn)程的區(qū)別,推薦閱讀: http://www./blo…

Python 的線程。在 Python 中,哪一種多線程的程序表現(xiàn)得更好,I/O 密集型的還是計(jì)算 密集型的?

由于GIL的緣故,對(duì)所有面向 I/O 的(會(huì)調(diào)用內(nèi)建的操作系統(tǒng) C 代碼的)程序來說,GIL 會(huì)在這個(gè) I/O 調(diào)用之 前被釋放,以允許其它的線程在這個(gè)線程等待 I/O 的時(shí)候運(yùn)行。如果某線程并未使用很多 I/O 操作, 它會(huì)在自己的時(shí)間片內(nèi)一直占用處理器(和 GIL)。也就是說,I/O 密集型的 Python 程序比計(jì)算密集 型的程序更能充分利用多線程環(huán)境的好處。

線程。你認(rèn)為,多 CPU 的系統(tǒng)與一般的系統(tǒng)有什么大的不同?多線程的程序在這種系統(tǒng)上的表現(xiàn)會(huì)怎么樣?

Python的線程就是C語言的一個(gè)pthread,并通過操作系統(tǒng)調(diào)度算法進(jìn)行調(diào)度(例如linux是CFS)。為了讓各個(gè)線程能夠平均利用CPU時(shí)間,python會(huì)計(jì)算當(dāng)前已執(zhí)行的微代碼數(shù)量,達(dá)到一定閾值后就強(qiáng)制釋放GIL。而這時(shí)也會(huì)觸發(fā)一次操作系統(tǒng)的線程調(diào)度(當(dāng)然是否真正進(jìn)行上下文切換由操作系統(tǒng)自主決定)。
偽代碼

1
2
3
4
5
6
while True:
    acquire GIL
    for i in 1000:
        do something
    release GIL
    /* Give Operating System a chance to do thread scheduling */

這種模式在只有一個(gè)CPU核心的情況下毫無問題。任何一個(gè)線程被喚起時(shí)都能成功獲得到GIL(因?yàn)橹挥嗅尫帕薌IL才會(huì)引發(fā)線程調(diào)度)。
但當(dāng)CPU有多個(gè)核心的時(shí)候,問題就來了。從偽代碼可以看到,從release GIL到acquire GIL之間幾乎是沒有間隙的。所以當(dāng)其他在其他核心上的線程被喚醒時(shí),大部分情況下主線程已經(jīng)又再一次獲取到GIL了。這個(gè)時(shí)候被喚醒執(zhí)行的線程只能白白的浪費(fèi)CPU時(shí)間,看著另一個(gè)線程拿著GIL歡快的執(zhí)行著。然后達(dá)到切換時(shí)間后進(jìn)入待調(diào)度狀態(tài),再被喚醒,再等待,以此往復(fù)惡性循環(huán)。
簡單的總結(jié)下就是:Python的多線程在多核CPU上,只對(duì)于IO密集型計(jì)算產(chǎn)生正面效果;而當(dāng)有至少有一個(gè)CPU密集型線程存在,那么多線程效率會(huì)由于GIL而大幅下降。

線程池。修改 生成者消費(fèi)者 的代碼,不再是一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者,而是可以有任意個(gè) 消費(fèi)者線程(一個(gè)線程池),每個(gè)線程可以在任意時(shí)刻處理或消耗任意多個(gè)產(chǎn)品。

參考文章


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多