來源:albenw.github.io/posts/f5cda8d1/ 概要線程池,大家都很熟悉了,我們在平時(shí)應(yīng)用中也用的很多。對線程池,ThreadPoolExecutor 的實(shí)現(xiàn)原理有一定理解后,我們可以把它用的更好,對它的參數(shù)有更加深刻的理解,甚至我們可以擴(kuò)展,監(jiān)控自己的線程池。 ThreadPoolExecutor實(shí)現(xiàn)原理本文代碼基于JDK1.8 線程池相關(guān)的類的關(guān)系我們先看看主要的類ThreadPoolExecutor的繼承關(guān)系 平時(shí)可能還有用到的 Executors 類,這是一個(gè)工具類,提供newFixedThreadPoolnewCachedThreadPoolnewScheduledThreadPool等靜態(tài)方法方便我們創(chuàng)建線程池,最終還是調(diào)用 ThreadPoolExecutor 來創(chuàng)建的,一般規(guī)范不建議直接使用 Executors 來創(chuàng)建線程池。 線程池創(chuàng)建的主流程線程池的狀態(tài)先看看線程池的狀態(tài)有哪些,對它有初步的理解 線程池的狀態(tài)和運(yùn)行的線程數(shù)只用了一個(gè) int,其中高3位表示狀態(tài),低29位表示線程數(shù)。狀態(tài)表示的意思和狀態(tài)之間的轉(zhuǎn)換:RUNNING - 可以接受新的任務(wù),及執(zhí)行隊(duì)列中的任務(wù) SHUTDOWN - 不接受新的任務(wù),但會繼續(xù)執(zhí)行隊(duì)列中的任務(wù) STOP - 不接受新的任務(wù),既不會執(zhí)行隊(duì)列中的任務(wù),還會中斷執(zhí)行中的任務(wù) TIDYING - 全部任務(wù)已經(jīng)終止,且線程數(shù)為0 TERMINATED - 線程池完全終止 RUNNING -> SHUTDOWN - 執(zhí)行了 shutdown() (RUNNING or SHUTDOWN) -> STOP - 執(zhí)行了shutdownNow() SHUTDOWN -> TIDYING - 隊(duì)列和線程為空 STOP -> TIDYING - 線程為空 TIDYING -> TERMINATED - terminated() 這個(gè)勾子方法執(zhí)行完畢 線程池的創(chuàng)建ThreadPoolExecutor 的構(gòu)造函數(shù) 線程池的創(chuàng)建只是初始化了一些參數(shù),但理解好這些參數(shù)對我們使用線程池很有幫助。corePoolSize - 核心線程數(shù),除非 allowCoreThreadTimeOut 為 true(默認(rèn)false),否則即使沒有任務(wù),也會維持這么多線程。 maximumPoolSize - 最大線程數(shù),corePoolSize 滿了的話,會把任務(wù)放到隊(duì)列,如果隊(duì)列滿了的話(假設(shè)隊(duì)列有界),就會繼續(xù)創(chuàng)建線程直到 maximumPoolSize,如果超過 maximumPoolSize 則會執(zhí)行 reject 策略。 workQueue - 用來存放任務(wù)的隊(duì)列,是一個(gè) BlockingQueue,常用的有 LinkedBlockingQueue,ArrayBlockingQueue,SynchronousQueue。 keepAliveTime - 空閑線程的存活時(shí)間,如果當(dāng)前線程數(shù) > corePoolSize,且某個(gè)線程空閑了這么多時(shí)間(沒有獲取到任務(wù)并運(yùn)行),那么這個(gè)線程會被 remove 掉。 unit - keepAliveTime 的單位,內(nèi)部會統(tǒng)一轉(zhuǎn)換成 nanos threadFactory - 用來創(chuàng)建線程的 ThreadFactory,主要用來給線程命名(方便查看日志),設(shè)置 daemon,優(yōu)先級和 group 等,Executors 有 DefaultThreadFactory 這個(gè)默認(rèn)實(shí)現(xiàn)。 handler - reject 具體執(zhí)行策略,reject 的條件上面已經(jīng)說了,一般內(nèi)置有以下幾個(gè),也可以自己實(shí)現(xiàn) CallerRunsPolicy - 不使用線程池的線程執(zhí)行該任務(wù),直接用當(dāng)前執(zhí)行任務(wù)(例如 main 線程) AbortPolicy - 直接拋異常 DiscardPolicy - 無視不做處理,相當(dāng)于拋棄掉 DiscardOldestPolicy - 將隊(duì)列頭的任務(wù)取出來拋棄掉,然后運(yùn)行當(dāng)前任務(wù) 線程池的執(zhí)行一般我們使用 ExecutorService 的 submit 方法來使用線程池執(zhí)行一個(gè)任務(wù),這個(gè)方法調(diào)用到 AbstractExecutorService 這里我們看到所有 task 無論是 Callable 還是 Runnable 的都會包裝成一個(gè) RunnableFuture 也就是 FutureTask,返回給我們。 execute方法重點(diǎn)看 execute 方法,調(diào)用了 ThreadPoolExecutor 的 execute 我們分三種情形來看,每個(gè)是一個(gè) if 條件:第一,當(dāng) workCount < corePoolSize 時(shí),直接創(chuàng)建 worker 線程; 第二,如果上面創(chuàng)建失?。赡苁蔷€程池正在處于關(guān)閉狀態(tài),可能是 workCount > corePoolSize 了 - 并發(fā)場景),那么這時(shí)把任務(wù)放入 workQueue 隊(duì)列;下面的判斷是用來防止,線程池不在運(yùn)行了,就把任務(wù)刪掉;如果沒有線程了就加一個(gè); 第三,來到這步說明上面放隊(duì)列不成功(可能是隊(duì)列是有界的,滿了),那么就繼續(xù)創(chuàng)建線程來滿足,如果這都創(chuàng)建失?。赡苁?> maximumPoolSize)就 reject 了; addWorker方法繼續(xù)看看重點(diǎn)的 addWorker 方法,addWorker 分開兩部分來看。 這一步是判斷是否可以增加 worker 的重點(diǎn): 第一,首先開始的判斷有點(diǎn)奇怪,我也不是很明白,先認(rèn)為它是如果狀態(tài)是 SHUTDOWN 則不允許創(chuàng)建線程; 第二,下面有個(gè) core 參數(shù),true 使用 corePoolSize,false 使用 maximumPoolSize,我們上面說的 execute 方法第一次就是傳 true 的,第二次就傳 false。所以這里就是對 size 做判斷, 如果 >= size 則返回 false,否則 cas 一下,成功了就 break 執(zhí)行下面的代碼; 第三,如果 cas 失敗,說明有其他并發(fā)競爭,則 cintinue 循環(huán)上面的步驟判斷。 來到這一步說明可以創(chuàng)建 worker 了,這里用了一個(gè)全局 lock,來控制創(chuàng)建線程和關(guān)閉線程的不能同時(shí)做。可以看到步驟就是 new 一個(gè) worker,add 到 workers 里,workers 是一個(gè) HashSet。 largestPoolSize 來用記錄最大的線程數(shù)。如果 workerStarted == false(線程池在關(guān)閉或創(chuàng)建 worker 時(shí)異常),則會 addWorkerFailed 方法。 主要就是 remove 掉 worker,扣減計(jì)數(shù),這里還會調(diào)用 tryTerminate 。這個(gè)方法會在很多地方用到,它的作用就是防止線程池在終止?fàn)顟B(tài)這種情形,去終止線程。 Worker是什么我們剛剛一直說 worker,那到底 Worker 究竟是什么?我們現(xiàn)在來看看 我們可以看到 Worker 是一個(gè) AQS 和 Runnable。為什么是一個(gè) AQS ?我們可以結(jié)合注釋和代碼可以得到,worker 在跑任務(wù)的時(shí)候會 lock 住,在被中斷時(shí)會 tryLock,利用上鎖這一點(diǎn)來區(qū)分這個(gè) worker 是否空閑。Worker 中重寫 AQS 的方法。(感概:AQS 真是個(gè)簡單易用,用于并發(fā)控制的好工具!) 為什么是一個(gè) Runnable ?我們看看 Worker 的構(gòu)造函數(shù),在創(chuàng)建 Thread 時(shí)把自己 this 傳到 thread 的參數(shù),說明 worker 的 thread 跑的是自己,這時(shí)我們就知道 worker 的入口了。 Worker 的 run 方法 runWorker方法重點(diǎn)的 runWorker 方法 task 可能是傳進(jìn)來的 firstTask 或者 getTask() 獲取到的,getTask 也是重點(diǎn)方法,等下講到;運(yùn)行 task 時(shí)會上鎖,鎖的作用我剛剛已經(jīng)說了;如果線程池狀態(tài)是 STOP 則中斷線程;這里放了兩個(gè)勾子 beforeExecute 和 afterExecute 方法來提供給子類做點(diǎn)事情,一般用于監(jiān)控或統(tǒng)計(jì)線程池的執(zhí)行情況;執(zhí)行任務(wù)就直接 task.run() 了,還記得我說過這個(gè) task 是一個(gè) FutureTask,如果run 的時(shí)候拋出異常,F(xiàn)utureTask 會 catch 掉不會再 throw(如果大家對 FutureTask 不熟悉就先這樣理解),所以這里不會進(jìn)入 catch,也就是不會 throw x 了。如果 task 不像 FutureTask 一樣自己處理掉異常,那就會 throw x 了,那么 worker 的線程就會跳出 while 循環(huán),完成使命,結(jié)束自己;獲取不到 task (task 為null)或者循環(huán)過程中異常,最后都會執(zhí)行 processWorkerExit。 processWorkerExit 的作用主要就是 remove 掉 worker,那么扣減 workCount 呢?好像沒有看到。這里用了 completedAbruptly 這一變量來控制是否在 processWorkerExit 扣減 workCount,因?yàn)橛锌赡苁窃?getTask 已經(jīng)扣減了,那么在 processWorkerExit 就不用重復(fù)扣減。我們結(jié)合 runWorker 來看看,分兩種情況: 1、如果 firstTask 為 null,那么會走到 getTask 方法,如果 getTask 返回 null,那么說明已經(jīng)是扣減了,這時(shí)退出循環(huán),completedAbruptly = false,不用重復(fù)扣減。 2、如果 firstTask 不為 null(1)執(zhí)行 firstTask 正常結(jié)束,然后循環(huán),走到 getTask,如果返回 task 為 null,那么 completedAbruptly = false,不用重復(fù)扣減。 (2)執(zhí)行 firstTask 執(zhí)行異常,這時(shí) completedAbruptly = true,需要扣減這里我們又看到 tryTerminate 了;下面的判斷主要是嘗試去增加一個(gè) worker,因?yàn)槟?remove 掉了一個(gè),如果條件允許,那就加回一個(gè)唄。 getTask方法看看重點(diǎn)的 getTask 方法 在 getTask 時(shí)如果發(fā)現(xiàn)時(shí)線程池在關(guān)閉狀態(tài),那么就需要停止獲取任務(wù)了; 如果 wc > maximumPoolSize,超過了最大 size 了,就去 cas 扣減 workCount 一下,成功就返回 null; 如果 wc > corePoolSize(小于 maximumPoolSize),且 timedOut 的話,說明這個(gè) worker 也有點(diǎn)“多余”,也去扣減 workCount。注意這里對 timedOut 的判斷,通過 queue 的定時(shí) poll 方法,時(shí)間是線程池的構(gòu)造參數(shù) keepAliveTime,如果過了這么久都還沒獲取 task,說明 queue 是空的,有空閑的線程,那就把這個(gè) worker remove 掉吧; 如果 wc < corePoolSize 的話,那就調(diào)用 queue 的 take 方法一直阻塞獲取 task; 還有 allowCoreThreadTimeOut 參數(shù),它的意思是忽略對 corePoolSize 的判斷。 關(guān)閉線程池上面已經(jīng)把線程的創(chuàng)建和執(zhí)行基本說得7788了,我們看看關(guān)閉線程池是如何做的,其實(shí)在上面的很多方法中,都看到很多對如 SHUTDOWN 的判斷了。主要有 shutdown 和 shutdownNow 這兩個(gè)方法。 這兩個(gè)方法很相似,從名字來看 shutdown 顯得更加的柔性,實(shí)際也是。shutdown –不接受新的 task,在運(yùn)行和已經(jīng)在隊(duì)列的 task 可以繼續(xù)運(yùn)行;把狀態(tài)改為 SHUTDOWN;中斷空閑 worker,這個(gè)在上面已經(jīng)提到過了,用鎖對是否空閑做判斷。 interruptIdleWorkers 打斷空閑的線程 這里還有個(gè) onShutdown 勾子方法。 shutdownNow –不接受新的 task,中斷已經(jīng)在運(yùn)行的線程,清空隊(duì)列;把狀態(tài)改為 STOP;強(qiáng)制中斷所有在運(yùn)行 worker 的線程;drainQueue,相當(dāng)于把隊(duì)列的 task 丟棄掉; 總結(jié)線程池ThreadPoolExecutor實(shí)現(xiàn)的原理,就是用 Worker 線程不停得取出隊(duì)列中的任務(wù)來執(zhí)行,根據(jù)參數(shù)對任務(wù)隊(duì)列和 Workers 做限制,回收,調(diào)整。 參考資料END推薦好文強(qiáng)大,10k+點(diǎn)贊的 SpringBoot 后臺管理系統(tǒng)竟然出了詳細(xì)教程! 為什么建議大家使用 Linux 開發(fā)?爽(外加七個(gè)感嘆號) IntelliJ IDEA 15款 神級超級牛逼插件推薦(自用,真的超級牛逼) 炫酷,SpringBoot+Echarts實(shí)現(xiàn)用戶訪問地圖可視化(附源碼) 十分鐘學(xué)會使用 Elasticsearch 優(yōu)雅搭建自己的搜索系統(tǒng)(附源碼) |
|