小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

 Bladexu的文庫 2019-03-10
原文出處:http:/// 『chenssy』

作為Executor框架中最核心的類,ThreadPoolExecutor代表著鼎鼎大名的線程池,它給了我們足夠的理由來弄清楚它。

下面我們就通過源碼來一步一步弄清楚它。

內(nèi)部狀態(tài)

線程有五種狀態(tài):新建,就緒,運行,阻塞,死亡,線程池同樣有五種狀態(tài):Running, SHUTDOWN, STOP, TIDYING, TERMINATED。

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

變量ctl定義為AtomicInteger ,其功能非常強大,記錄了“線程池中的任務(wù)數(shù)量”和“線程池的狀態(tài)”兩個信息。共32位,其中高3位表示”線程池狀態(tài)”,低29位表示”線程池中的任務(wù)數(shù)量”。

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

RUNNING:處于RUNNING狀態(tài)的線程池能夠接受新任務(wù),以及對新添加的任務(wù)進行處理。

SHUTDOWN:處于SHUTDOWN狀態(tài)的線程池不可以接受新任務(wù),但是可以對已添加的任務(wù)進行處理。

STOP:處于STOP狀態(tài)的線程池不接收新任務(wù),不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)。

TIDYING:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時,進行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實現(xiàn)。

TERMINATED:線程池徹底終止的狀態(tài)。

各個狀態(tài)的轉(zhuǎn)換如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

創(chuàng)建線程池

我們可以通過ThreadPoolExecutor構(gòu)造函數(shù)來創(chuàng)建一個線程池:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

共有七個參數(shù),每個參數(shù)含義如下:

corePoolSize

線程池中核心線程的數(shù)量。當(dāng)提交一個任務(wù)時,線程池會新建一個線程來執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程。

maximumPoolSize

線程池中允許的最大線程數(shù)。線程池的阻塞隊列滿了之后,如果還有任務(wù)提交,如果當(dāng)前的線程數(shù)小于maximumPoolSize,則會新建線程來執(zhí)行任務(wù)。注意,如果使用的是無界隊列,該參數(shù)也就沒有什么效果了。

keepAliveTime

線程空閑的時間。線程的創(chuàng)建和銷毀是需要代價的。線程執(zhí)行完任務(wù)后不會立即銷毀,而是繼續(xù)存活一段時間:keepAliveTime。默認情況下,該參數(shù)只有在線程數(shù)大于corePoolSize時才會生效。

unit

keepAliveTime的單位。TimeUnit

workQueue

用來保存等待執(zhí)行的任務(wù)的阻塞隊列,等待的任務(wù)必須實現(xiàn)Runnable接口。我們可以選擇如下幾種:

  • ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,F(xiàn)IFO?!舅揽腏ava并發(fā)】—-J.U.C之阻塞隊列:ArrayBlockingQueue
  • LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的有界阻塞隊列,F(xiàn)IFO。
  • SynchronousQueue:不存儲元素的阻塞隊列,每個插入操作都必須等待一個移出操作,反之亦然?!舅揽腏ava并發(fā)】—-J.U.C之阻塞隊列:SynchronousQueue
  • PriorityBlockingQueue:具有優(yōu)先界別的阻塞隊列?!舅揽腏ava并發(fā)】—-J.U.C之阻塞隊列:PriorityBlockingQueue

threadFactory

用于設(shè)置創(chuàng)建線程的工廠。該對象可以通過Executors.defaultThreadFactory(),如下:

 public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); }

返回的是DefaultThreadFactory對象,源碼如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

ThreadFactory的左右就是提供創(chuàng)建線程的功能的線程工廠。他是通過newThread()方法提供創(chuàng)建線程的功能,newThread()方法創(chuàng)建的線程都是“非守護線程”而且“線程優(yōu)先級都是Thread.NORM_PRIORITY”。

handler

RejectedExecutionHandler,線程池的拒絕策略。所謂拒絕策略,是指將任務(wù)添加到線程池中時,線程池拒絕該任務(wù)所采取的相應(yīng)策略。當(dāng)向線程池中提交任務(wù)時,如果此時線程池中的線程已經(jīng)飽和了,而且阻塞隊列也已經(jīng)滿了,則線程池會選擇一種拒絕策略來處理該任務(wù)。

線程池提供了四種拒絕策略:

  1. AbortPolicy:直接拋出異常,默認策略;
  2. CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
  3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
  4. DiscardPolicy:直接丟棄任務(wù);

當(dāng)然我們也可以實現(xiàn)自己的拒絕策略,例如記錄日志等等,實現(xiàn)RejectedExecutionHandler接口即可。

線程池

Executor框架提供了三種線程池,他們都可以通過工具類Executors來創(chuàng)建。

FixedThreadPool

FixedThreadPool,可重用固定線程數(shù)的線程池,其定義如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

corePoolSize 和 maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads,意味著當(dāng)線程池滿時且阻塞隊列也已經(jīng)滿時,如果繼續(xù)提交任務(wù),則會直接走拒絕策略,該線程池不會再新建線程來執(zhí)行任務(wù),而是直接走拒絕策略。FixedThreadPool使用的是默認的拒絕策略,即AbortPolicy,則直接拋出異常。

keepAliveTime設(shè)置為0L,表示空閑的線程會立刻終止。

workQueue則是使用LinkedBlockingQueue,但是沒有設(shè)置范圍,那么則是最大值(Integer.MAX_VALUE),這基本就相當(dāng)于一個無界隊列了。使用該“無界隊列”則會帶來哪些影響呢?當(dāng)線程池中的線程數(shù)量等于corePoolSize 時,如果繼續(xù)提交任務(wù),該任務(wù)會被添加到阻塞隊列workQueue中,當(dāng)阻塞隊列也滿了之后,則線程池會新建線程執(zhí)行任務(wù)直到maximumPoolSize。由于FixedThreadPool使用的是“無界隊列”LinkedBlockingQueue,那么maximumPoolSize參數(shù)無效,同時指定的拒絕策略AbortPolicy也將無效。而且該線程池也不會拒絕提交的任務(wù),如果客戶端提交任務(wù)的速度快于任務(wù)的執(zhí)行,那么keepAliveTime也是一個無效參數(shù)。

其運行圖如下(參考《Java并發(fā)編程的藝術(shù)》):

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

SingleThreadExecutor

SingleThreadExecutor是使用單個worker線程的Executor,定義如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

作為單一worker線程的線程池,SingleThreadExecutor把corePool和maximumPoolSize均被設(shè)置為1,和FixedThreadPool一樣使用的是無界隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

CachedThreadPool

CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池 ,他定義如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

CachedThreadPool的corePool為0,maximumPoolSize為Integer.MAX_VALUE,這就意味著所有的任務(wù)一提交就會加入到阻塞隊列中。keepAliveTime這是為60L,unit設(shè)置為TimeUnit.SECONDS,意味著空閑線程等待新任務(wù)的最長時間為60秒,空閑線程超過60秒后將會被終止。阻塞隊列采用的SynchronousQueue,而我們在【死磕Java并發(fā)】—-J.U.C之阻塞隊列:SynchronousQueue中了解到SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,這樣就會存在一個問題,如果主線程提交任務(wù)的速度遠遠大于CachedThreadPool的處理速度,則CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù),這樣有可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源,所以在使用該線程池是,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴重的性能問題。

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

任務(wù)提交

線程池根據(jù)業(yè)務(wù)不同的需求提供了兩種方式提交任務(wù):Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以獲取該任務(wù)執(zhí)行的Future。

我們以Executor.execute()為例,來看看線程池的任務(wù)提交經(jīng)歷了那些過程。

定義:

public interface Executor { void execute(Runnable command);}

ThreadPoolExecutor提供實現(xiàn):

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

執(zhí)行流程如下:

  1. 如果線程池當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker創(chuàng)建新線程執(zhí)行任務(wù),成功返回true,失敗執(zhí)行步驟2。
  2. 如果線程池處于RUNNING狀態(tài),則嘗試加入阻塞隊列,如果加入阻塞隊列成功,則嘗試進行Double Check,如果加入失敗,則執(zhí)行步驟3。
  3. 如果線程池不是RUNNING狀態(tài)或者加入阻塞隊列失敗,則嘗試創(chuàng)建新線程直到maxPoolSize,如果失敗,則調(diào)用reject()方法運行相應(yīng)的拒絕策略。

在步驟2中如果加入阻塞隊列成功了,則會進行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊里中的線程是否可以被執(zhí)行。如果線程池不是RUNNING狀態(tài),則調(diào)用remove()方法從阻塞隊列中刪除該任務(wù),然后調(diào)用reject()方法處理任務(wù)。否則需要確保還有線程執(zhí)行。

addWorker

當(dāng)線程中的當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker()創(chuàng)建新線程執(zhí)行任務(wù),當(dāng)前線程數(shù)則是根據(jù)ctl變量來獲取的,調(diào)用workerCountOf(ctl)獲取低29位即可:

 private static int workerCountOf(int c) { return c & CAPACITY; }

addWorker(Runnable firstTask, boolean core)方法用于創(chuàng)建線程執(zhí)行任務(wù),源碼如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

  1. 判斷當(dāng)前線程是否可以添加任務(wù),如果可以則進行下一步,否則return false;
  2. rs >= SHUTDOWN ,表示當(dāng)前線程處于SHUTDOWN ,STOP、TIDYING、TERMINATED狀態(tài)
  3. rs == SHUTDOWN , firstTask != null時不允許添加線程,因為線程處于SHUTDOWN 狀態(tài),不允許添加任務(wù)
  4. rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允許添加線程,因為firstTask == null是為了添加一個沒有任務(wù)的線程然后再從workQueue中獲取任務(wù)的,如果workQueue == null,則說明添加的任務(wù)沒有任何意義。
  5. 內(nèi)嵌循環(huán),通過CAS worker + 1
  6. 獲取主鎖mailLock,如果線程池處于RUNNING狀態(tài)獲取處于SHUTDOWN狀態(tài)且 firstTask == null,則將任務(wù)添加到workers Queue中,然后釋放主鎖mainLock,然后啟動線程,然后return true,如果中途失敗導(dǎo)致workerStarted= false,則調(diào)用addWorkerFailed()方法進行處理。

在這里需要好好理論addWorker中的參數(shù),在execute()方法中,有三處調(diào)用了該方法:

  • 第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true),這個很好理解,當(dāng)然線程池的線程數(shù)量小于 corePoolSize ,則新建線程執(zhí)行任務(wù)即可,在執(zhí)行過程core == true,內(nèi)部與corePoolSize比較即可。
  • 第二次:加入阻塞隊列進行Double Check時,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)。如果線程池中的線程==0,按照道理應(yīng)該該任務(wù)應(yīng)該新建線程執(zhí)行任務(wù),但是由于已經(jīng)該任務(wù)已經(jīng)添加到了阻塞隊列,那么就在線程池中新建一個空線程,然后從阻塞隊列中取線程即可。
  • 第三次:線程池不是RUNNING狀態(tài)或者加入阻塞隊列失?。篹lse if (!addWorker(command, false)),這里core == fase,則意味著是與maximumPoolSize比較。

在新建線程執(zhí)行任務(wù)時,將講Runnable包裝成一個Worker,Woker為ThreadPoolExecutor的內(nèi)部類

Woker內(nèi)部類

Woker的源碼如下:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

從Worker的源碼中我們可以看到Woker繼承AQS,實現(xiàn)Runnable接口,所以可以認為Worker既是一個可以執(zhí)行的任務(wù),也可以達到獲取鎖釋放鎖的效果。這里繼承AQS主要是為了方便線程的中斷處理。這里注意兩個地方:構(gòu)造函數(shù)、run()。構(gòu)造函數(shù)主要是做三件事:1.設(shè)置同步狀態(tài)state為-1,同步狀態(tài)大于0表示就已經(jīng)獲取了鎖,2.設(shè)置將當(dāng)前任務(wù)task設(shè)置為firstTask,3.利用Worker本身對象this和ThreadFactory創(chuàng)建線程對象。

當(dāng)線程thread啟動(調(diào)用start()方法)時,其實就是執(zhí)行Worker的run()方法,內(nèi)部調(diào)用runWorker()。

runWorker

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

運行流程

  1. 根據(jù)worker獲取要執(zhí)行的任務(wù)task,然后調(diào)用unlock()方法釋放鎖,這里釋放鎖的主要目的在于中斷,因為在new Worker時,設(shè)置的state為-1,調(diào)用unlock()方法可以將state設(shè)置為0,這里主要原因就在于interruptWorkers()方法只有在state >= 0時才會執(zhí)行;
  2. 通過getTask()獲取執(zhí)行的任務(wù),調(diào)用task.run()執(zhí)行,當(dāng)然在執(zhí)行之前會調(diào)用worker.lock()上鎖,執(zhí)行之后調(diào)用worker.unlock()放鎖;
  3. 在任務(wù)執(zhí)行前后,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute() 和 afterExecute()方法,則兩個方法在ThreadPoolExecutor中是空實現(xiàn);
  4. 如果線程執(zhí)行完成,則會調(diào)用getTask()方法從阻塞隊列中獲取新任務(wù),如果阻塞隊列為空,則根據(jù)是否超時來判斷是否需要阻塞;
  5. task == null或者拋出異常(beforeExecute()、task.run()、afterExecute()均有可能)導(dǎo)致worker線程終止,則調(diào)用processWorkerExit()方法處理worker退出流程。

getTask()

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

timed == true,調(diào)用poll()方法,如果在keepAliveTime時間內(nèi)還沒有獲取task的話,則返回null,繼續(xù)循環(huán)。timed == false,則調(diào)用take()方法,該方法為一個阻塞方法,沒有任務(wù)時會一直阻塞掛起,直到有任務(wù)加入時對該線程喚醒,返回任務(wù)。

在runWorker()方法中,無論最終結(jié)果如何,都會執(zhí)行processWorkerExit()方法對worker進行退出處理。

processWorkerExit()

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

首先completedAbruptly的值來判斷是否需要對線程數(shù)-1處理,如果completedAbruptly == true,說明在任務(wù)運行過程中出現(xiàn)了異常,那么需要進行減1處理,否則不需要,因為減1處理在getTask()方法中處理了。然后從HashSet中移出該worker,過程需要獲取mainlock。然后調(diào)用tryTerminate()方法處理,該方法是對最后一個線程退出做終止線程池動作。如果線程池沒有終止,那么線程池需要保持一定數(shù)量的線程,則通過addWorker(null,false)新增一個空的線程。

addWorkerFailed()

在addWorker()方法中,如果線程t==null,或者在add過程出現(xiàn)異常,會導(dǎo)致workerStarted == false,那么在最后會調(diào)用addWorkerFailed()方法:

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

整個邏輯顯得比較簡單。

tryTerminate()

當(dāng)線程池涉及到要移除worker時候都會調(diào)用tryTerminate(),該方法主要用于判斷線程池中的線程是否已經(jīng)全部移除了,如果是的話則關(guān)閉線程池。

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

在關(guān)閉線程池的過程中,如果線程池處于STOP狀態(tài)或者處于SHUDOWN狀態(tài)且阻塞隊列為null,則線程池會調(diào)用interruptIdleWorkers()方法中斷所有線程,注意ONLY_ONE== true,表示僅中斷一個線程。

interruptIdleWorkers

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

onlyOne==true僅終止一個線程,否則終止所有線程。

線程終止

線程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于關(guān)閉線程池。

shutdown():按過去執(zhí)行已提交任務(wù)的順序發(fā)起一個有序的關(guān)閉,但是不接受新任務(wù)。

shutdownNow() :嘗試停止所有的活動執(zhí)行任務(wù)、暫停等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。

shutdown

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

shutdownNow

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

與shutdown不同,shutdownNow會調(diào)用interruptWorkers()方法中斷所有線程。

【死磕Java并發(fā)】—–J.U.C之線程池:ThreadPoolExecutor

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多