前言 線程池可以說是 Java 進(jìn)階必備的知識(shí)點(diǎn)了,也是面試中必備的考點(diǎn),可能不少人看了這篇文章 后能對(duì)線程池工作原理說上一二,但這還遠(yuǎn)遠(yuǎn)不夠,如果碰到比較有經(jīng)驗(yàn)的面試官再繼續(xù)追問,很可能會(huì)被吊打,考慮如下問題:
Tomcat 的線程池和 JDK 的線程池實(shí)現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實(shí)現(xiàn)嗎? 我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個(gè)問題:壓測(cè)時(shí)接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊(duì)列為 5000,你能從這個(gè)設(shè)置中發(fā)現(xiàn)出一些問題并對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)嗎? 線程池被 shutdown 后,還能產(chǎn)生新的線程? 線程池里的線程異常后會(huì)再次新增線程嗎,如何捕獲這些線程拋出的異常? 線程池的大小如何設(shè)置,如何動(dòng)態(tài)設(shè)置 線程池的參數(shù) 阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池? 使用線程池應(yīng)該避免哪些問題,能否簡(jiǎn)單說下線程池的最佳實(shí)踐? 如何對(duì)線程池進(jìn)行監(jiān)控 相信不少人看了這些問題會(huì)有些懵逼
其實(shí)這些問題的答案大多數(shù)都藏在線程池的源碼里,所以深入了解線程池的源碼非常重要,本章我們將會(huì)來學(xué)習(xí)一下線程池的源碼,相信看完之后,以上的問題大部分都能回答,另外一些問題我們也會(huì)在文中與大家一起探討。
本文將會(huì)從以下幾個(gè)方面來介紹線程池的原理。
相信大家看完對(duì)線程池的理解會(huì)更進(jìn)一步,肝文不易,看完別完了三連哦。
為什么要用線程池 在上文 也提到過,創(chuàng)建線程有三大開銷,如下:
1、其實(shí) Java 中的線程模型是基于操作系統(tǒng)原生線程模型實(shí)現(xiàn)的,也就是說 Java 中的線程其實(shí)是基于內(nèi)核線程實(shí)現(xiàn)的,線程的創(chuàng)建,析構(gòu)與同步都需要進(jìn)行系統(tǒng)調(diào)用,而系統(tǒng)調(diào)用需要在用戶態(tài)與內(nèi)核中來回切換,代價(jià)相對(duì)較高,線程的生命周期包括「線程創(chuàng)建時(shí)間」,「線程執(zhí)行任務(wù)時(shí)間」,「線程銷毀時(shí)間」,創(chuàng)建和銷毀都需要導(dǎo)致系統(tǒng)調(diào)用。2、每個(gè) Thread 都需要有一個(gè)內(nèi)核線程的支持,也就意味著每個(gè) Thread 都需要消耗一定的內(nèi)核資源(如內(nèi)核線程的??臻g),因此能創(chuàng)建的 Thread 是有限的,默認(rèn)一個(gè)線程的線程棧大小是 1 M,有圖有真相
圖中所示,在 Java 8 下,創(chuàng)建 19 個(gè)線程(thread #19)需要?jiǎng)?chuàng)建 19535 KB,即 1 M 左右,reserved 代表如果創(chuàng)建 19 個(gè)線程,操作系統(tǒng)保證會(huì)為其分配這么多空間(實(shí)際上并不一定分配),committed 則表示實(shí)際已分配的空間大小。
畫外音:注意,這是在 Java 8 下的線程占用空間情況,但在 Java 11 中,對(duì)線程作了很大的優(yōu)化,創(chuàng)建一個(gè)線程大概只需要 40 KB,空間消耗大大減少
3、線程多了,導(dǎo)致不可忽視的上下文切換開銷。
由此可見,線程的創(chuàng)建是昂貴的,所以必須以線程池的形式來管理這些線程,在線程池中合理設(shè)置線程大小和管理線程,以達(dá)到以合理的創(chuàng)建線程大小以達(dá)到最大化收益,最小化風(fēng)險(xiǎn)的目的 ,對(duì)于開發(fā)人員來說,要完成任務(wù)不用關(guān)心線程如何創(chuàng)建,如何銷毀,如何協(xié)作,只需要關(guān)心提交的任務(wù)何時(shí)完成即可,對(duì)線程的調(diào)優(yōu),監(jiān)控等這些細(xì)枝末節(jié)的工作通通交給線程池來實(shí)現(xiàn),所以也讓開發(fā)人員得到極大的解脫!
類似線程池的這種池化思想應(yīng)用在很多地方,比如數(shù)據(jù)庫連接池,Http 連接池等,避免了昂貴資源的創(chuàng)建,提升了性能,也解放了開發(fā)人員。
ThreadPoolExecutor 設(shè)計(jì)架構(gòu)圖 首先我們來看看 Executor 框架的設(shè)計(jì)圖
Executor: 最頂層的 Executor 接口只提供了一個(gè) execute 接口,實(shí)現(xiàn)了提交任務(wù)與執(zhí)行任務(wù)的解藕,這個(gè)方法是最核心的,也是我們?cè)创a剖析的重點(diǎn),此方法最終是由 ThreadPoolExecutor 實(shí)現(xiàn)的, ExecutorService 擴(kuò)展了 Executor 接口,實(shí)現(xiàn)了終止執(zhí)行器,單個(gè)/批量提交任務(wù)等方法 AbstractExecutorService 實(shí)現(xiàn)了 ExecutorService 接口,實(shí)現(xiàn)了除 execute 以外的所有方法,只將一個(gè)最重要的 execute 方法交給 ThreadPoolExecutor 實(shí)現(xiàn)。 這樣的分層設(shè)計(jì)雖然層次看起來挺多,但每一層每司其職,邏輯清晰,值得借鑒。
線程池是如何工作的 首先我們來看下如何創(chuàng)建一個(gè)線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10 , 20 , 600L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(4096 ), new NamedThreadFactory('common-work-thread' ));// 設(shè)置拒絕策略,默認(rèn)為 AbortPolicy threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
看下其構(gòu)造方法簽名如下
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 省略代碼若干 }
要理解這些參數(shù)具體代表的意義,必須清楚線程池提交任務(wù)與執(zhí)行任務(wù)流程,如下
圖片來自美團(tuán)技術(shù)團(tuán)隊(duì)
步驟如下
1、corePoolSize:如果提交任務(wù)后線程還在運(yùn)行,當(dāng)線程數(shù)小于 corePoolSize 值時(shí),無論線程池中的線程是否忙碌 ,都會(huì)創(chuàng)建線程,并把任務(wù)交給此新創(chuàng)建的線程進(jìn)行處理,如果線程數(shù)少于等于 corePoolSize,那么這些線程不會(huì)回收,除非將 allowCoreThreadTimeOut 設(shè)置為 true,但一般不這么干 ,因?yàn)轭l繁地創(chuàng)建銷毀線程會(huì)極大地增加系統(tǒng)調(diào)用的開銷。
2、workQueue:如果線程數(shù)大于核心數(shù)(corePoolSize)且小于最大線程數(shù)(maximumPoolSize),則會(huì)將任務(wù)先丟到阻塞隊(duì)列里,然后線程自己去阻塞隊(duì)列中拉取任務(wù)執(zhí)行。
3、maximumPoolSize: 線程池中最大可創(chuàng)建的線程數(shù),如果提交任務(wù)時(shí)隊(duì)列滿了且線程數(shù)未到達(dá)這個(gè)設(shè)定值,則會(huì)創(chuàng)建線程并執(zhí)行此次提交的任務(wù),如果提交任務(wù)時(shí)隊(duì)列滿了但線池?cái)?shù)已經(jīng)到達(dá)了這個(gè)值,此時(shí)說明已經(jīng)超出了線池程的負(fù)載能力,就會(huì)執(zhí)行拒絕策略,這也好理解,總不能讓源源不斷地任務(wù)進(jìn)來把線程池給壓垮了吧,我們首先要保證線程池能正常工作。
4、RejectedExecutionHandler:一共有以下四種拒絕策略
AbortPolicy:丟棄任務(wù)并拋出異常,這也是默認(rèn)策略; CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù),所以開頭的問題「線程把任務(wù)丟給線程池后肯定就馬上返回了?」我們可以回答了,如果用的是 CallerRunsPolicy 策略,提交任務(wù)的線程(比如主線程)提交任務(wù)后并不能保證馬上就返回,當(dāng)觸發(fā)了這個(gè) reject 策略不得不親自來處理這個(gè)任務(wù)。 DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。 DiscardPolicy:直接丟棄任務(wù),不拋出任何異常,這種策略只適用于不重要的任務(wù)。 5、keepAliveTime: 線程存活時(shí)間,如果在此時(shí)間內(nèi)超出 corePoolSize 大小的線程處于 idle 狀態(tài),這些線程會(huì)被回收
6、threadFactory:可以用此參數(shù)設(shè)置線程池的命名,指定 defaultUncaughtExceptionHandler(有啥用,后文闡述),甚至可以設(shè)定線程為守護(hù)線程。
現(xiàn)在問題來了,該如何合理設(shè)置這些參數(shù)呢。
首先來看線程大小設(shè)置 <<Java 并發(fā)編程實(shí)戰(zhàn)>>告訴我們應(yīng)該分兩種情況
針對(duì) CPU 密集型的任務(wù),在有 Ncpu 個(gè)處理器的系統(tǒng)上,當(dāng)線程池的大小為 Ncpu + 1 時(shí),通常能實(shí)現(xiàn)最優(yōu)的利用率,+1 是因?yàn)楫?dāng)計(jì)算密集型線程偶爾由于缺頁故障或其他原因而暫停工作時(shí),這個(gè)'額外'的線程也能確保 CPU 的時(shí)鐘周期不會(huì)被浪費(fèi),所謂 CPU 密集,就是線程一直在忙碌,這樣將線程池的大小設(shè)置為 Ncpu + 1 避免了線程的上下文切換,讓線程時(shí)刻處于忙碌狀態(tài),將 CPU 的利用率最大化。 針對(duì) IO 密集型的任務(wù),它也給出了如下計(jì)算公式 這些公式看看就好,實(shí)際的業(yè)務(wù)場(chǎng)景中基本用不上,這些公式太過理論化了,脫離業(yè)務(wù)場(chǎng)景,僅可作個(gè)理論參考,舉個(gè)例子,你說 CPU 密集型任務(wù)設(shè)置線程池大小為 N + 1個(gè),但實(shí)際上在業(yè)務(wù)中往往不只設(shè)置一個(gè)線程池,這種情況套用的公式就懵逼了
再來看 workQueue 的大小設(shè)置 由上文可知,如果最大線程大于核心線程數(shù),當(dāng)且僅當(dāng)核心線程滿了且 workQueue 也滿的情況下,才會(huì)新增新的線程,也就是說如果 workQueue 是無界隊(duì)列,那么當(dāng)線程數(shù)增加到 corePoolSize 后,永遠(yuǎn)不會(huì)再新增新的線程了,也就是說此時(shí) maximumPoolSize 的設(shè)置就無效了,也無法觸發(fā) RejectedExecutionHandler 拒絕策略,任務(wù)只會(huì)源源不斷地填充到 workQueue,直到 OOM。
所以 workQueue 應(yīng)該為有界隊(duì)列,至少保證在任務(wù)過載的情況下線程池還能正常工作,那么哪些是有有界隊(duì)列,哪些是無界隊(duì)列呢。
有界隊(duì)列我們常用的以下兩個(gè)
LinkedBlockingQueue: 鏈表構(gòu)成的有界隊(duì)列,按先進(jìn)先出(FIFO)的順序?qū)υ剡M(jìn)行排列,但注意在創(chuàng)建時(shí)需指定其大小,否則其大小默認(rèn)為 Integer.MAX_VALUE,相當(dāng)于無界隊(duì)列了 ArrayBlockingQueue: 數(shù)組實(shí)現(xiàn)的有界隊(duì)列,按先進(jìn)先出(FIFO)的順序?qū)υ剡M(jìn)行排列。 無界隊(duì)列我們常用 PriorityBlockingQueue 這個(gè)優(yōu)先級(jí)隊(duì)列,任務(wù)插入的時(shí)候可以指定其權(quán)重以讓這些任務(wù)優(yōu)先執(zhí)行,但這個(gè)隊(duì)列很少用,原因很簡(jiǎn)單,線程池里的任務(wù)執(zhí)行順序一般是平等的,如果真有必須某些類型的任務(wù)需要優(yōu)先執(zhí)行,大不了再開個(gè)線程池好了,將不同的任務(wù)類型用不同的線程池隔離開來,也是合理利用線程池的一種實(shí)踐。
說到這我相信大家應(yīng)該能回答開頭的問題「阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?」,最常見的是以下兩種創(chuàng)建方式
image-20201109002227476 newCachedThreadPool 方法的最大線程數(shù)設(shè)置成了 Integer.MAX_VALUE,而 newSingleThreadExecutor 方法創(chuàng)建 workQueue 時(shí) LinkedBlockingQueue 未聲明大小,相當(dāng)于創(chuàng)建了無界隊(duì)列,一不小心就會(huì)導(dǎo)致 OOM。
threadFactory 如何設(shè)置 一般業(yè)務(wù)中會(huì)有多個(gè)線程池,如果某個(gè)線程池出現(xiàn)了問題,定位是哪一個(gè)線程出問題很重要,所以為每個(gè)線程池取一個(gè)名字就很有必要了,我司用的 dubbo 的 NamedThreadFactory 來生成 threadFactory,創(chuàng)建很簡(jiǎn)單
new NamedThreadFactory('demo-work' )
它的實(shí)現(xiàn)還是很巧妙的,有興趣地可以看看它的源碼,每調(diào)用一次,底層有個(gè)計(jì)數(shù)器會(huì)加一,會(huì)依次命名為 「demo-work-thread-1」, 「demo-work-thread-2」, 「demo-work-thread-3」這樣遞增的字符串。
在實(shí)際的業(yè)務(wù)場(chǎng)景中,一般很難確定 corePoolSize, workQueue,maximumPoolSize 的大小,如果出問題了,一般來說只能重新設(shè)置一下這些參數(shù)再發(fā)布,這樣往往需要耗費(fèi)一些時(shí)間,美團(tuán)的這篇文章 給出了讓人眼前一亮的解決方案,當(dāng)發(fā)現(xiàn)問題(線程池監(jiān)控告警)時(shí),動(dòng)態(tài)調(diào)整這些參數(shù),可以讓這些參數(shù)實(shí)時(shí)生效,能在發(fā)現(xiàn)問題時(shí)及時(shí)解決,確實(shí)是個(gè)很好的思路。
線程池提交任務(wù)的兩種方式 線程池創(chuàng)建好了,該怎么給它提交任務(wù),有兩種方式,調(diào)用 execute 和 submit 方法,來看下這兩個(gè)方法的方法簽名
// 方式一:execute 方法 public void execute (Runnable command) { }// 方式二:ExecutorService 中 submit 的三個(gè)方法 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
區(qū)別在于調(diào)用 execute 無返回值,而調(diào)用 submit 可以返回 Future,那么這個(gè) Future 能到底能干啥呢,看它的接口
public interface Future <V > { /** * 取消正在執(zhí)行的任務(wù),如果任務(wù)已執(zhí)行或已被取消,或者由于某些原因不能取消則返回 false * 如果任務(wù)未開始或者任務(wù)已開始但可以中斷(mayInterruptIfRunning 為 true),則 * 可以取消/中斷此任務(wù) */ boolean cancel (boolean mayInterruptIfRunning) ; /** * 任務(wù)在完成前是否已被取消 */ boolean isCancelled () ; /** * 正常的執(zhí)行完流程流程,或拋出異常,或取消導(dǎo)致的任務(wù)完成都會(huì)返回 true */ boolean isDone () ; /** * 阻塞等待任務(wù)的執(zhí)行結(jié)果 */ V get () throws InterruptedException, ExecutionException ; /** * 阻塞等待任務(wù)的執(zhí)行結(jié)果,不過這里指定了時(shí)間,如果在 timeout 時(shí)間內(nèi)任務(wù)還未執(zhí)行完成, * 則拋出 TimeoutException 異常 */ V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException ; }
可以用 Future 取消任務(wù),判斷任務(wù)是否已取消/完成,甚至可以阻塞等待結(jié)果。
submit 為啥能提交任務(wù)(Runnable)的同時(shí)也能返回任務(wù)(Future)的執(zhí)行結(jié)果呢
原來在最后執(zhí)行 execute 前用 newTaskFor 將 task 封裝成了 RunnableFuture,newTaskFor 返回了 FutureTask 這個(gè)類,結(jié)構(gòu)圖如下
可以看到 FutureTask 這個(gè)接口既實(shí)現(xiàn)了 Runnable 接口,也實(shí)現(xiàn) Future 接口,所以在提交任務(wù)的同時(shí)也能利用 Future 接口來執(zhí)行任務(wù)的取消,獲取任務(wù)的狀態(tài),等待執(zhí)行結(jié)果這些操作。
execute 與 submit 除了是否能返回執(zhí)行結(jié)果這一區(qū)別外,還有一個(gè)重要區(qū)別,那就是使用 execute 執(zhí)行如果發(fā)生了異常,是捕獲不到的,默認(rèn)會(huì)執(zhí)行 ThreadGroup 的 uncaughtException 方法(下圖數(shù)字 2 對(duì)應(yīng)的邏輯)
所以如果你想監(jiān)控執(zhí)行 execute 方法時(shí)發(fā)生的異常,需要通過 threadFactory 來指定一個(gè) UncaughtExceptionHandler,這樣就會(huì)執(zhí)行上圖中的 1,進(jìn)而執(zhí)行 UncaughtExceptionHandler 中的邏輯,如下所示:
//1.實(shí)現(xiàn)一個(gè)自己的線程池工廠 ThreadFactory factory = (Runnable r) -> { //創(chuàng)建一個(gè)線程 Thread t = new Thread(r); //給創(chuàng)建的線程設(shè)置UncaughtExceptionHandler對(duì)象 里面實(shí)現(xiàn)異常的默認(rèn)邏輯 t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) -> { // 在此設(shè)置統(tǒng)計(jì)監(jiān)控邏輯 System.out.println('線程工廠設(shè)置的exceptionHandler' + e.getMessage()); }); return t; };// 2.創(chuàng)建一個(gè)自己定義的線程池,使用自己定義的線程工廠 ExecutorService service = new ThreadPoolExecutor(1 , 1 , 0 , TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10 ),factory);//3.提交任務(wù) service.execute(()->{ int i=1 /0 ; });
執(zhí)行以上邏輯最終會(huì)輸出「線程工廠設(shè)置的exceptionHandler/ by zero」,通過這樣的方式就能通過設(shè)定的 defaultUncaughtExceptionHandler 來執(zhí)行我們的監(jiān)控邏輯了。
如果用 submit ,如何捕獲異常呢,當(dāng)我們調(diào)用 future.get 就可以捕獲
Callable testCallable = xxx; Future future = executor.submit(myCallable);try { future1.get(3 )); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }
那么 future 為啥在 get 的時(shí)候才捕獲異步呢,因?yàn)樵趫?zhí)行 submit 時(shí)拋出異常后此異常被保存了起來,而在 get 的時(shí)候才被拋出
關(guān)于 execute 和 submit 的執(zhí)行流程 why 神的這篇文章 寫得非常透徹,我就不拾人牙慧了,建議大家好好品品,收獲會(huì)很大!
ThreadPoolExecutor 源碼剖析 前面鋪墊了這么多,終于到了最核心的源碼剖析環(huán)節(jié)了。
對(duì)于線程池來說,我們最關(guān)心的是它的「狀態(tài)」和「可運(yùn)行的線程數(shù)量」,一般來說我們可以選擇用兩個(gè)變量來記錄,不過 Doug Lea 只用了一個(gè)變量(ctl)就達(dá)成目的了,我們知道變量越多,代碼的可維護(hù)性就越差,也越容易出 bug, 所以只用一個(gè)變量就達(dá)成了兩個(gè)變量的效果,這讓代碼的可維護(hù)性大大提高,那么他是怎么設(shè)計(jì)的呢
// ThreadPoolExecutor.java public class ThreadPoolExecutor extends AbstractExecutorService { private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0 )); private static final int COUNT_BITS = Integer.SIZE - 3 ; private static final int CAPACITY = (1 << COUNT_BITS) - 1 ; // 結(jié)果:111 00000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS; // 結(jié)果: 000 00000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 結(jié)果: 001 00000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS; // 結(jié)果: 010 00000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS; // 結(jié)果: 011 00000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS; // 獲取線程池的狀態(tài) private static int runStateOf (int c) { return c & ~CAPACITY; } // 獲取線程數(shù)量 private static int workerCountOf (int c) { return c & CAPACITY; } }
可以看到,ctl 是一個(gè) 原子類的 Integer 變量,有 32 位,低 29 位表示線程數(shù)量, 29 位最大可以表示 (2^29)-1 (大概 5 億多),足夠記錄線程大小了,如果未來還是不夠,可以把 ctl 聲明為 AtomicLong,高 3 位用來表示線程池的狀態(tài),3 位可以表示 8 個(gè)線程池的狀態(tài),由于線程池總共只有五個(gè)狀態(tài),所以 3 位也是足夠了,線程池的五個(gè)狀態(tài)如下
RUNNING: 接收新的任務(wù),并能繼續(xù)處理 workQueue 中的任務(wù) SHUTDOWN: 不再接收新的任務(wù),不過能繼續(xù)處理 workQueue 中的任務(wù) STOP: 不再接收新的任務(wù),也不再處理 workQueue 中的任務(wù),并且會(huì)中斷正在處理任務(wù)的線程 TIDYING: 所有的任務(wù)都完結(jié)了,并且線程數(shù)量(workCount)為 0 時(shí)即為此狀態(tài),進(jìn)入此狀態(tài)后會(huì)調(diào)用 terminated() 這個(gè)鉤子方法進(jìn)入 TERMINATED 狀態(tài) TERMINATED: 調(diào)用 terminated() 方法后即為此狀態(tài) 線程池的狀態(tài)流轉(zhuǎn)及觸發(fā)條件如下
有了這些基礎(chǔ),我們來分析下 execute 的源碼
public void execute (Runnable command) { if (command == null ) throw new NullPointerException(); int c = ctl.get(); // 如果當(dāng)前線程數(shù)少于核心線程數(shù)(corePoolSize),無論核心線程是否忙碌,都創(chuàng)建線程,直到達(dá)到 corePoolSize 為止 if (workerCountOf(c) < corePoolSize) { // 創(chuàng)建線程并將此任務(wù)交給 worker 處理(此時(shí)此任務(wù)即 worker 中的 firstTask) if (addWorker(command, true )) return ; c = ctl.get(); } // 如果線程池處于 RUNNING 狀態(tài),并且線程數(shù)大于 corePoolSize 或者 // 線程數(shù)少于 corePoolSize 但創(chuàng)建線程失敗了,則將任務(wù)丟進(jìn) workQueue 中 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 這里需要再次檢查線程池是否處于 RUNNING 狀態(tài),因?yàn)樵谌蝿?wù)入隊(duì)后可能線程池狀態(tài)會(huì)發(fā)生變化,(比如調(diào)用了 shutdown 方法等),如果線程狀態(tài)發(fā)生變化了,則移除此任務(wù),執(zhí)行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池在 RUNNING 狀態(tài)下,線程數(shù)為 0,則新建線程加速處理 workQueue 中的任務(wù) else if (workerCountOf(recheck) == 0 ) addWorker(null , false ); } // 這段邏輯說明線程數(shù)大于 corePoolSize 且任務(wù)入隊(duì)失敗了,此時(shí)會(huì)以最大線程數(shù)(maximumPoolSize)為界來創(chuàng)建線程,如果失敗,說明線程數(shù)超過了 maximumPoolSize,則執(zhí)行拒絕策略 else if (!addWorker(command, false )) reject(command); }
從這段代碼中可以看到,創(chuàng)建線程是調(diào)用 addWorker 實(shí)現(xiàn)的,在分析 addWorker 之前,有必要簡(jiǎn)單提一下 Worker,線程池把每一個(gè)執(zhí)行任務(wù)的線程都封裝為 Worker 的形式,取名為 Worker 很形象,線程池的本質(zhì)是生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者不斷地往 workQueue 中丟 task, workQueue 就像流水線一樣不斷地輸送著任務(wù),而 worker(工人) 不斷地取任務(wù)來執(zhí)行
那么問題來了,為啥要把線程封裝到 worker 中呢,線程池拿到 task 后直接丟給線程處理或者讓線程自己去 workQueue 中處理不就完了? 將線程封裝為 worker 主要是為了更好地管理線程的中斷
來看下 Worker 的定義
// 此處可以看出 worker 既是一個(gè) Runnable 任務(wù),也實(shí)現(xiàn)了 AQS(實(shí)際上是用 AQS 實(shí)現(xiàn)了一個(gè)獨(dú)占鎖,這樣由于 worker 運(yùn)行時(shí)會(huì)上鎖,執(zhí)行 shutdown,setCorePoolSize,setMaximumPoolSize等方法時(shí)會(huì)試著中斷線程(interruptIdleWorkers) ,在這個(gè)方法中斷方法中會(huì)先嘗試獲取 worker 的鎖,如果不成功,說明 worker 在運(yùn)行中,此時(shí)會(huì)先讓 worker 執(zhí)行完任務(wù)再關(guān)閉 worker 的線程,實(shí)現(xiàn)優(yōu)雅關(guān)閉線程的目的) private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L ; // 實(shí)際執(zhí)行任務(wù)的線程 final Thread thread; // 上文提到,如果當(dāng)前線程數(shù)少于核心線程數(shù),創(chuàng)建線程并將提交的任務(wù)交給 worker 處理處理,此時(shí) firstTask 即為此提交的任務(wù),如果 worker 從 workQueue 中獲取任務(wù),則 firstTask 為空 Runnable firstTask; // 統(tǒng)計(jì)完成的任務(wù)數(shù) volatile long completedTasks; Worker(Runnable firstTask) { // 初始化為 -1,這樣在線程運(yùn)行前(調(diào)用runWorker)禁止中斷,在 interruptIfStarted() 方法中會(huì)判斷 getState()>=0 setState(-1 ); this .firstTask = firstTask; // 根據(jù)線程池的 threadFactory 創(chuàng)建一個(gè)線程,將 worker 本身傳給線程(因?yàn)?nbsp;worker 實(shí)現(xiàn)了 Runnable 接口) this .thread = getThreadFactory().newThread(this ); } public void run () { // thread 啟動(dòng)后會(huì)調(diào)用此方法 runWorker(this ); } // 1 代表被鎖住了,0 代表未鎖 protected boolean isHeldExclusively () { return getState() != 0 ; } // 嘗試獲取鎖 protected boolean tryAcquire (int unused) { // 從這里可以看出它是一個(gè)獨(dú)占鎖,因?yàn)楫?dāng)獲取鎖后,cas 設(shè)置 state 不可能成功,這里我們也能明白上文中將 state 設(shè)置為 -1 的作用,這種情況下永遠(yuǎn)不可能獲取得鎖,而 worker 要被中斷首先必須獲取鎖 if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } // 嘗試釋放鎖 protected boolean tryRelease (int unused) { setExclusiveOwnerThread(null ); setState(0 ); return true ; } public void lock () { acquire(1 ); } public boolean tryLock () { return tryAcquire(1 ); } public void unlock () { release(1 ); } public boolean isLocked () { return isHeldExclusively(); } // 中斷線程,這個(gè)方法會(huì)被 shutdowNow 調(diào)用,從中可以看出 shutdownNow 要中斷線程不需要獲取鎖,也就是說如果線程正在運(yùn)行,照樣會(huì)給你中斷掉,所以一般來說我們不用 shutdowNow 來中斷線程,太粗暴了,中斷時(shí)線程很可能在執(zhí)行任務(wù),影響任務(wù)執(zhí)行 void interruptIfStarted () { Thread t; // 中斷也是有條件的,必須是 state >= 0 且 t != null 且線程未被中斷 // 如果 state == -1 ,不執(zhí)行中斷,再次明白了為啥上文中 setState(-1) 的意義 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
通過上文對(duì) Worker 類的分析,相信大家不難理解 將線程封裝為 worker 主要是為了更好地管理線程的中斷 這句話。
理解了 Worker 的意義,我們?cè)賮砜?addWorker 的方法
private boolean addWorker (Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 獲取線程池的狀態(tài) int rs = runStateOf(c); // 如果線程池的狀態(tài) >= SHUTDOWN,即為 SHUTDOWN,STOP,TIDYING,TERMINATED 這四個(gè)狀態(tài),只有一種情況有可能創(chuàng)建線程,即線程狀態(tài)為 SHUTDOWN, 且隊(duì)列非空時(shí),firstTask == null 代表創(chuàng)建一個(gè)不接收新任務(wù)的線程(此線程會(huì)從 workQueue 中獲取任務(wù)再執(zhí)行),這種情況下創(chuàng)建線程是為了加速處理完 workQueue 中的任務(wù) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false ; for (;;) { // 獲取線程數(shù) int wc = workerCountOf(c); // 如果超過了線程池的最大 CAPACITY(5 億多,基本不可能) // 或者 超過了 corePoolSize(core 為 true) 或者 maximumPoolSize(core 為 false) 時(shí) // 則返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false ; // 否則 CAS 增加線程的數(shù)量,如果成功跳出雙重循環(huán) if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl // 如果線程運(yùn)行狀態(tài)發(fā)生變化,跳到外層循環(huán)繼續(xù)執(zhí)行 if (runStateOf(c) != rs) continue retry; // 說明是因?yàn)?nbsp;CAS 增加線程數(shù)量失敗所致,繼續(xù)執(zhí)行 retry 的內(nèi)層循環(huán) } } boolean workerStarted = false ; boolean workerAdded = false ; Worker w = null ; try { // 能執(zhí)行到這里,說明滿足增加 worker 的條件了,所以創(chuàng)建 worker,準(zhǔn)備添加進(jìn)線程池中執(zhí)行任務(wù) w = new Worker(firstTask); final Thread t = w.thread; if (t != null ) { // 加鎖,是因?yàn)橄挛囊?nbsp;w 添加進(jìn) workers 中, workers 是 HashSet,不是線程安全的,所以需要加鎖予以保證 final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { // 再次 check 線程池的狀態(tài)以防執(zhí)行到此步時(shí)發(fā)生中斷等 int rs = runStateOf(ctl.get()); // 如果線程池狀態(tài)小于 SHUTDOWN(即為 RUNNING), // 或者狀態(tài)為 SHUTDOWN 但 firstTask == null(代表不接收任務(wù),只是創(chuàng)建線程處理 workQueue 中的任務(wù)),則滿足添加 worker 的條件 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null )) { // 如果線程已啟動(dòng),顯然有問題(因?yàn)閯?chuàng)建 worker 后,還沒啟動(dòng)線程呢),拋出異常 if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); // 記錄最大的線程池大小以作監(jiān)控之用 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true ; } } finally { mainLock.unlock(); } // 說明往 workers 中添加 worker 成功,此時(shí)啟動(dòng)線程 if (workerAdded) { t.start(); workerStarted = true ; } } } finally { // 添加線程失敗,執(zhí)行 addWorkerFailed 方法,主要做了將 worker 從 workers 中移除,減少線程數(shù),并嘗試著關(guān)閉線程池這樣的操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
從這段代碼我們可以看到多線程下情況的不可預(yù)料性,我們發(fā)現(xiàn)在滿足條件情況下,又對(duì)線程狀態(tài)重新進(jìn)行了 check,以防期間出現(xiàn)中斷等線程池狀態(tài)發(fā)生變更的操作,這也給我們以啟發(fā):多線程環(huán)境下的各種臨界條件一定要考慮到位。
執(zhí)行 addWorker 創(chuàng)建 worker 成功后,線程開始執(zhí)行了(t.start()),由于在創(chuàng)建 Worker 時(shí),將 Worker 自己傳給了此線程,所以啟動(dòng)線程后,會(huì)調(diào)用 Worker 的 run 方法
public void run () { runWorker(this ); }
可以看到最終會(huì)調(diào)用 runWorker 方法,接下來我們來分析下 runWorker 方法
final void runWorker (Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null ; // unlock 會(huì)調(diào)用 tryRelease 方法將 state 設(shè)置成 0,代表允許中斷,允許中斷的條件上文我們?cè)?nbsp;interruptIfStarted() 中有提過,即 state >= 0 w.unlock(); boolean completedAbruptly = true ; try { // 如果在提交任務(wù)時(shí)創(chuàng)建了線程,并把任務(wù)丟給此線程,則會(huì)先執(zhí)行此 task // 否則從任務(wù)隊(duì)列中獲取 task 來執(zhí)行(即 getTask() 方法) while (task != null || (task = getTask()) != null ) { w.lock(); // 如果線程池狀態(tài)為 >= STOP(即 STOP,TIDYING,TERMINATED )時(shí),則線程應(yīng)該中斷 // 如果線程池狀態(tài) < STOP, 線程不應(yīng)該中斷,如果中斷了(Thread.interrupted() 返回 true,并清除標(biāo)志位),再次判斷線程池狀態(tài)(防止在清除標(biāo)志位時(shí)執(zhí)行了 shutdownNow() 這樣的方法),如果此時(shí)線程池為 STOP,執(zhí)行線程中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 執(zhí)行任務(wù)前,子類可實(shí)現(xiàn)此鉤子方法作為統(tǒng)計(jì)之用 beforeExecute(wt, task); Throwable thrown = null ; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 執(zhí)行任務(wù)后,子類可實(shí)現(xiàn)此鉤子方法作為統(tǒng)計(jì)之用 afterExecute(task, thrown); } } finally { task = null ; w.completedTasks++; w.unlock(); } } completedAbruptly = false ; } finally { // 如果執(zhí)行到這只有兩種可能,一種是執(zhí)行過程中異常中斷了,一種是隊(duì)列里沒有任務(wù)了,從這里可以看出線程沒有核心線程與非核心線程之分,哪個(gè)任務(wù)異常了或者正常退出了都會(huì)執(zhí)行此方法,此方法會(huì)根據(jù)情況將線程數(shù)-1 processWorkerExit(w, completedAbruptly); } }
來看看 processWorkerExit 方法是咋樣的
private void processWorkerExit (Worker w, boolean completedAbruptly) { // 如果異常退出,cas 執(zhí)行線程池減 1 操作 if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; // 加鎖確保線程安全地移除 worker workers.remove(w); } finally { mainLock.unlock(); } // woker 既然異常退出,可能線程池狀態(tài)變了(如執(zhí)行 shutdown 等),嘗試著關(guān)閉線程池 tryTerminate(); int c = ctl.get(); // 如果線程池處于 STOP 狀態(tài),則如果 woker 是異常退出的,重新新增一個(gè) woker, 如果是正常退出的,在 wokerQueue 為非空的條件下,確保至少有一個(gè)線程在運(yùn)行以執(zhí)行 wokerQueue 中的任務(wù)
if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1 ; if (workerCountOf(c) >= min) return ; // replacement not needed } addWorker(null , false ); } }
接下來我們分析 woker 從 workQueue 中取任務(wù)的方法 getTask
private Runnable getTask () { boolean timedOut = false ; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果線程池狀態(tài)至少為 STOP 或者 // 線程池狀態(tài) == SHUTDOWN 并且任務(wù)隊(duì)列是空的 // 則減少線程數(shù)量,返回 null,這種情況下上文分析的 runWorker 會(huì)執(zhí)行 processWorkerExit 從而讓獲取此 Task 的 woker 退出 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); // 如果 allowCoreThreadTimeOut 為 true,代表任何線程在 keepAliveTime 時(shí)間內(nèi)處于 idle 狀態(tài)都會(huì)被回收,如果線程數(shù)大于 corePoolSize,本身在 keepAliveTime 時(shí)間內(nèi)處于 idle 狀態(tài)就會(huì)被回收 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // worker 應(yīng)該被回收的幾個(gè)條件,這個(gè)比較簡(jiǎn)單,就此略過 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { // 阻塞獲取 task,如果在 keepAliveTime 時(shí)間內(nèi)未獲取任務(wù),說明超時(shí)了,此時(shí) timedOut 為 true Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
經(jīng)過以上源碼剖析,相信我們對(duì)線程池的工作原理了解得八九不離十了,再來簡(jiǎn)單過一下其他一些比較有用的方法,開頭我們提到線程池的監(jiān)控問題,我們看一下可以監(jiān)控哪些指標(biāo)
int getCorePoolSize():獲取核心線程數(shù)。 int getLargestPoolSize():歷史峰值線程數(shù)。 int getMaximumPoolSize():最大線程數(shù)(線程池線程容量)。 int getActiveCount():當(dāng)前活躍線程數(shù) int getPoolSize():當(dāng)前線程池中的線程總數(shù) BlockingQueuegetQueue() 當(dāng)前線程池的任務(wù)隊(duì)列,據(jù)此可以獲取積壓任務(wù)的總數(shù),getQueue.size() 監(jiān)控思路也很簡(jiǎn)單,開啟一個(gè)定時(shí)線程 ScheduledThreadPoolExecutor,定期對(duì)這些線程池指標(biāo)進(jìn)行采集,一般會(huì)采用一些開源工具如 Grafana + Prometheus + MicroMeter 來實(shí)現(xiàn)。
如何實(shí)現(xiàn)核心線程池的預(yù)熱
使用 prestartAllCoreThreads() 方法,這個(gè)方法會(huì)一次性創(chuàng)建 corePoolSize 個(gè)線程,無需等到提交任務(wù)時(shí)才創(chuàng)建,提交創(chuàng)建好線程的話,一有任務(wù)提交過來,這些線程就可以立即處理。
如何實(shí)現(xiàn)動(dòng)態(tài)調(diào)整線程池參數(shù)
setCorePoolSize(int corePoolSize) 調(diào)整核心線程池大小 setMaximumPoolSize(int maximumPoolSize) setKeepAliveTime() 設(shè)置線程的存活時(shí)間 解答開篇的問題 其它問題基本都在源碼剖析環(huán)節(jié)回答了,這里簡(jiǎn)單說下其他問題
1、Tomcat 的線程池和 JDK 的線程池實(shí)現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實(shí)現(xiàn)嗎?Dubbo 中一個(gè)叫 EagerThreadPool 的東西,可以看看它的使用說明
從注釋里可以看出,如果核心線程都處于 busy 狀態(tài),如果有新的請(qǐng)求進(jìn)來,EagerThreadPool 會(huì)選擇先創(chuàng)建線程,而不是將其放入任務(wù)隊(duì)列中,這樣可以更快地響應(yīng)這些請(qǐng)求。
Tomcat 實(shí)現(xiàn)也是與此類似的,只不過稍微有所不同,當(dāng) Tomcat 啟動(dòng)時(shí),會(huì)先創(chuàng)建 minSpareThreads 個(gè)線程,如果經(jīng)過一段時(shí)間收到請(qǐng)求時(shí)這些線程都處于忙碌狀態(tài),每次都會(huì)以 minSpareThreads 的步長(zhǎng)創(chuàng)建線程,本質(zhì)上也是為了更快地響應(yīng)處理請(qǐng)求。具體的源碼可以看它的 ThreadPool 實(shí)現(xiàn),這里就不展開了。
2、我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個(gè)問題:壓測(cè)時(shí)接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊(duì)列為 5000,你能從這個(gè)設(shè)置中發(fā)現(xiàn)出一些問題并對(duì)這些參數(shù)進(jìn)行調(diào)優(yōu)嗎?這個(gè)參數(shù)明顯能看出問題來,首先任務(wù)隊(duì)列設(shè)置過大,任務(wù)達(dá)到核心線程后,如果再有請(qǐng)求進(jìn)來會(huì)先進(jìn)入任務(wù)隊(duì)列,隊(duì)列滿了之后才創(chuàng)建線程,創(chuàng)建線程也是需要不少開銷的,所以我們后來把核心線程設(shè)置成了與最大線程一樣,并且調(diào)用 prestartAllCoreThreads() 來預(yù)熱核心線程,就不用等請(qǐng)求來時(shí)再創(chuàng)建線程了。
線程池的幾個(gè)最佳實(shí)踐 1、線程池執(zhí)行的任務(wù)應(yīng)該是互相獨(dú)立的,如果互相依賴的話,可能導(dǎo)致死鎖,比如下面這樣的代碼
ExecutorService pool = Executors .newSingleThreadExecutor(); pool.submit(() -> { try { String qq=pool.submit(()->'QQ' ).get(); System.out.println(qq); } catch (Exception e) { } });
2、核心任務(wù)與非核心任務(wù)最好能用多個(gè)線程池隔離開來
曾經(jīng)我們業(yè)務(wù)上就出現(xiàn)這樣的一個(gè)故障:突然很多用戶反饋短信收不到了,排查才發(fā)現(xiàn)發(fā)短信是在一個(gè)線程池里,而另外的定時(shí)腳本也是用的這個(gè)線程池來執(zhí)行任務(wù),這個(gè)腳本一分鐘可能產(chǎn)生幾百上千條任務(wù),導(dǎo)致發(fā)短信的方法在線程池里基本沒機(jī)會(huì)執(zhí)行,后來我們用了兩個(gè)線程池把發(fā)短信和執(zhí)行腳本隔離開來解決了問題。
3、添加線程池監(jiān)控,動(dòng)態(tài)設(shè)置線程池
如前文所述,線程池的各個(gè)參數(shù)很難一次性確定,既然難以確定,又要保證發(fā)現(xiàn)問題后及時(shí)解決,我們就需要為線程池增加監(jiān)控,監(jiān)控隊(duì)列大小,線程數(shù)量等,我們可以設(shè)置 3 分鐘內(nèi)比如隊(duì)列任務(wù)一直都是滿了的話,就觸發(fā)告警,這樣可以提前預(yù)警,如果線上因?yàn)榫€程池參數(shù)設(shè)置不合理而觸發(fā)了降級(jí)等操作,可以通過動(dòng)態(tài)設(shè)置線程池的方式來實(shí)時(shí)修改核心線程數(shù),最大線程數(shù)等,將問題及時(shí)修復(fù)。
總結(jié) 本文詳細(xì)剖析了線程池的工作原理,相信大家對(duì)其工作機(jī)制應(yīng)該有了較深入的了解,也對(duì)開頭的幾個(gè)問題有了較清楚的認(rèn)識(shí),本質(zhì)上設(shè)置線程池的目的是為了利用有效的資源最大化性能,最小化風(fēng)險(xiǎn),同時(shí)線程池的使用本質(zhì)上是為了更好地為用戶服務(wù),據(jù)此也不難明白 Tomcat, Dubbo 要另起爐灶來設(shè)置自己的線程池了。
最后歡迎大家加我私人微信,一起討論,共同進(jìn)步,拉你進(jìn)讀者群,2020 難過,我們一起抱團(tuán)取暖!
巨人的肩膀
https:///articles/how-much-memory-does-a-java-thread-take https://segmentfault.com/a/1190000021047279 https://www.cnblogs.com/trust-freedom/p/6681948.html 深入理解線程池 https:///y675j928 有的線程它死了,于是它變成一道面試題 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQ Java 并發(fā)編程實(shí)戰(zhàn) Java線程池實(shí)現(xiàn)原理及其在美團(tuán)業(yè)務(wù)中的實(shí)踐: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww 最后歡迎大家加我好友,拉你進(jìn)技術(shù)交流群,群里有很多 BAT 的大咖,可以提問,互相交流,內(nèi)推等,進(jìn)群一起抱團(tuán)取暖