原文出處:http:/// 『chenssy』 作為Executor框架中最核心的類,ThreadPoolExecutor代表著鼎鼎大名的線程池,它給了我們足夠的理由來弄清楚它。 下面我們就通過源碼來一步一步弄清楚它。 內(nèi)部狀態(tài) 線程有五種狀態(tài):新建,就緒,運行,阻塞,死亡,線程池同樣有五種狀態(tài):Running, SHUTDOWN, STOP, TIDYING, TERMINATED。 變量ctl定義為AtomicInteger ,其功能非常強大,記錄了“線程池中的任務(wù)數(shù)量”和“線程池的狀態(tài)”兩個信息。共32位,其中高3位表示”線程池狀態(tài)”,低29位表示”線程池中的任務(wù)數(shù)量”。 RUNNING:處于RUNNING狀態(tài)的線程池能夠接受新任務(wù),以及對新添加的任務(wù)進行處理。 SHUTDOWN:處于SHUTDOWN狀態(tài)的線程池不可以接受新任務(wù),但是可以對已添加的任務(wù)進行處理。 STOP:處于STOP狀態(tài)的線程池不接收新任務(wù),不處理已添加的任務(wù),并且會中斷正在處理的任務(wù)。 TIDYING:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)門IDYING狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時,進行相應(yīng)的處理;可以通過重載terminated()函數(shù)來實現(xiàn)。 TERMINATED:線程池徹底終止的狀態(tài)。 各個狀態(tài)的轉(zhuǎn)換如下: 創(chuàng)建線程池 我們可以通過ThreadPoolExecutor構(gòu)造函數(shù)來創(chuàng)建一個線程池: 共有七個參數(shù),每個參數(shù)含義如下: corePoolSize 線程池中核心線程的數(shù)量。當(dāng)提交一個任務(wù)時,線程池會新建一個線程來執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有基本線程。 maximumPoolSize 線程池中允許的最大線程數(shù)。線程池的阻塞隊列滿了之后,如果還有任務(wù)提交,如果當(dāng)前的線程數(shù)小于maximumPoolSize,則會新建線程來執(zhí)行任務(wù)。注意,如果使用的是無界隊列,該參數(shù)也就沒有什么效果了。 keepAliveTime 線程空閑的時間。線程的創(chuàng)建和銷毀是需要代價的。線程執(zhí)行完任務(wù)后不會立即銷毀,而是繼續(xù)存活一段時間:keepAliveTime。默認情況下,該參數(shù)只有在線程數(shù)大于corePoolSize時才會生效。 unit keepAliveTime的單位。TimeUnit workQueue 用來保存等待執(zhí)行的任務(wù)的阻塞隊列,等待的任務(wù)必須實現(xiàn)Runnable接口。我們可以選擇如下幾種:
threadFactory 用于設(shè)置創(chuàng)建線程的工廠。該對象可以通過Executors.defaultThreadFactory(),如下: public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } 返回的是DefaultThreadFactory對象,源碼如下: ThreadFactory的左右就是提供創(chuàng)建線程的功能的線程工廠。他是通過newThread()方法提供創(chuàng)建線程的功能,newThread()方法創(chuàng)建的線程都是“非守護線程”而且“線程優(yōu)先級都是Thread.NORM_PRIORITY”。 handler RejectedExecutionHandler,線程池的拒絕策略。所謂拒絕策略,是指將任務(wù)添加到線程池中時,線程池拒絕該任務(wù)所采取的相應(yīng)策略。當(dāng)向線程池中提交任務(wù)時,如果此時線程池中的線程已經(jīng)飽和了,而且阻塞隊列也已經(jīng)滿了,則線程池會選擇一種拒絕策略來處理該任務(wù)。 線程池提供了四種拒絕策略:
當(dāng)然我們也可以實現(xiàn)自己的拒絕策略,例如記錄日志等等,實現(xiàn)RejectedExecutionHandler接口即可。 線程池 Executor框架提供了三種線程池,他們都可以通過工具類Executors來創(chuàng)建。 FixedThreadPool FixedThreadPool,可重用固定線程數(shù)的線程池,其定義如下: corePoolSize 和 maximumPoolSize都設(shè)置為創(chuàng)建FixedThreadPool時指定的參數(shù)nThreads,意味著當(dāng)線程池滿時且阻塞隊列也已經(jīng)滿時,如果繼續(xù)提交任務(wù),則會直接走拒絕策略,該線程池不會再新建線程來執(zhí)行任務(wù),而是直接走拒絕策略。FixedThreadPool使用的是默認的拒絕策略,即AbortPolicy,則直接拋出異常。 keepAliveTime設(shè)置為0L,表示空閑的線程會立刻終止。 workQueue則是使用LinkedBlockingQueue,但是沒有設(shè)置范圍,那么則是最大值(Integer.MAX_VALUE),這基本就相當(dāng)于一個無界隊列了。使用該“無界隊列”則會帶來哪些影響呢?當(dāng)線程池中的線程數(shù)量等于corePoolSize 時,如果繼續(xù)提交任務(wù),該任務(wù)會被添加到阻塞隊列workQueue中,當(dāng)阻塞隊列也滿了之后,則線程池會新建線程執(zhí)行任務(wù)直到maximumPoolSize。由于FixedThreadPool使用的是“無界隊列”LinkedBlockingQueue,那么maximumPoolSize參數(shù)無效,同時指定的拒絕策略AbortPolicy也將無效。而且該線程池也不會拒絕提交的任務(wù),如果客戶端提交任務(wù)的速度快于任務(wù)的執(zhí)行,那么keepAliveTime也是一個無效參數(shù)。 其運行圖如下(參考《Java并發(fā)編程的藝術(shù)》): SingleThreadExecutor SingleThreadExecutor是使用單個worker線程的Executor,定義如下: 作為單一worker線程的線程池,SingleThreadExecutor把corePool和maximumPoolSize均被設(shè)置為1,和FixedThreadPool一樣使用的是無界隊列LinkedBlockingQueue,所以帶來的影響和FixedThreadPool一樣。 CachedThreadPool CachedThreadPool是一個會根據(jù)需要創(chuàng)建新線程的線程池 ,他定義如下: CachedThreadPool的corePool為0,maximumPoolSize為Integer.MAX_VALUE,這就意味著所有的任務(wù)一提交就會加入到阻塞隊列中。keepAliveTime這是為60L,unit設(shè)置為TimeUnit.SECONDS,意味著空閑線程等待新任務(wù)的最長時間為60秒,空閑線程超過60秒后將會被終止。阻塞隊列采用的SynchronousQueue,而我們在【死磕Java并發(fā)】—-J.U.C之阻塞隊列:SynchronousQueue中了解到SynchronousQueue是一個沒有元素的阻塞隊列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,這樣就會存在一個問題,如果主線程提交任務(wù)的速度遠遠大于CachedThreadPool的處理速度,則CachedThreadPool會不斷地創(chuàng)建新線程來執(zhí)行任務(wù),這樣有可能會導(dǎo)致系統(tǒng)耗盡CPU和內(nèi)存資源,所以在使用該線程池是,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴重的性能問題。 任務(wù)提交 線程池根據(jù)業(yè)務(wù)不同的需求提供了兩種方式提交任務(wù):Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以獲取該任務(wù)執(zhí)行的Future。 我們以Executor.execute()為例,來看看線程池的任務(wù)提交經(jīng)歷了那些過程。 定義: public interface Executor { void execute(Runnable command);} ThreadPoolExecutor提供實現(xiàn): 執(zhí)行流程如下:
在步驟2中如果加入阻塞隊列成功了,則會進行一個Double Check的過程。Double Check過程的主要目的是判斷加入到阻塞隊里中的線程是否可以被執(zhí)行。如果線程池不是RUNNING狀態(tài),則調(diào)用remove()方法從阻塞隊列中刪除該任務(wù),然后調(diào)用reject()方法處理任務(wù)。否則需要確保還有線程執(zhí)行。 addWorker 當(dāng)線程中的當(dāng)前線程數(shù)小于corePoolSize,則調(diào)用addWorker()創(chuàng)建新線程執(zhí)行任務(wù),當(dāng)前線程數(shù)則是根據(jù)ctl變量來獲取的,調(diào)用workerCountOf(ctl)獲取低29位即可: private static int workerCountOf(int c) { return c & CAPACITY; } addWorker(Runnable firstTask, boolean core)方法用于創(chuàng)建線程執(zhí)行任務(wù),源碼如下:
在這里需要好好理論addWorker中的參數(shù),在execute()方法中,有三處調(diào)用了該方法:
在新建線程執(zhí)行任務(wù)時,將講Runnable包裝成一個Worker,Woker為ThreadPoolExecutor的內(nèi)部類 Woker內(nèi)部類 Woker的源碼如下: 從Worker的源碼中我們可以看到Woker繼承AQS,實現(xiàn)Runnable接口,所以可以認為Worker既是一個可以執(zhí)行的任務(wù),也可以達到獲取鎖釋放鎖的效果。這里繼承AQS主要是為了方便線程的中斷處理。這里注意兩個地方:構(gòu)造函數(shù)、run()。構(gòu)造函數(shù)主要是做三件事:1.設(shè)置同步狀態(tài)state為-1,同步狀態(tài)大于0表示就已經(jīng)獲取了鎖,2.設(shè)置將當(dāng)前任務(wù)task設(shè)置為firstTask,3.利用Worker本身對象this和ThreadFactory創(chuàng)建線程對象。 當(dāng)線程thread啟動(調(diào)用start()方法)時,其實就是執(zhí)行Worker的run()方法,內(nèi)部調(diào)用runWorker()。 runWorker 運行流程
getTask() timed == true,調(diào)用poll()方法,如果在keepAliveTime時間內(nèi)還沒有獲取task的話,則返回null,繼續(xù)循環(huán)。timed == false,則調(diào)用take()方法,該方法為一個阻塞方法,沒有任務(wù)時會一直阻塞掛起,直到有任務(wù)加入時對該線程喚醒,返回任務(wù)。 在runWorker()方法中,無論最終結(jié)果如何,都會執(zhí)行processWorkerExit()方法對worker進行退出處理。 processWorkerExit() 首先completedAbruptly的值來判斷是否需要對線程數(shù)-1處理,如果completedAbruptly == true,說明在任務(wù)運行過程中出現(xiàn)了異常,那么需要進行減1處理,否則不需要,因為減1處理在getTask()方法中處理了。然后從HashSet中移出該worker,過程需要獲取mainlock。然后調(diào)用tryTerminate()方法處理,該方法是對最后一個線程退出做終止線程池動作。如果線程池沒有終止,那么線程池需要保持一定數(shù)量的線程,則通過addWorker(null,false)新增一個空的線程。 addWorkerFailed() 在addWorker()方法中,如果線程t==null,或者在add過程出現(xiàn)異常,會導(dǎo)致workerStarted == false,那么在最后會調(diào)用addWorkerFailed()方法: 整個邏輯顯得比較簡單。 tryTerminate() 當(dāng)線程池涉及到要移除worker時候都會調(diào)用tryTerminate(),該方法主要用于判斷線程池中的線程是否已經(jīng)全部移除了,如果是的話則關(guān)閉線程池。 在關(guān)閉線程池的過程中,如果線程池處于STOP狀態(tài)或者處于SHUDOWN狀態(tài)且阻塞隊列為null,則線程池會調(diào)用interruptIdleWorkers()方法中斷所有線程,注意ONLY_ONE== true,表示僅中斷一個線程。 interruptIdleWorkers onlyOne==true僅終止一個線程,否則終止所有線程。 線程終止 線程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于關(guān)閉線程池。 shutdown():按過去執(zhí)行已提交任務(wù)的順序發(fā)起一個有序的關(guān)閉,但是不接受新任務(wù)。 shutdownNow() :嘗試停止所有的活動執(zhí)行任務(wù)、暫停等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。 shutdown shutdownNow 與shutdown不同,shutdownNow會調(diào)用interruptWorkers()方法中斷所有線程。 |
|
來自: Bladexu的文庫 > 《技術(shù)文摘》