小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

深入分析java線程池的實(shí)現(xiàn)原理

 SheldonDemo 2017-04-26

簡書 占小狼
轉(zhuǎn)載請注明原創(chuàng)出處,謝謝!

2017/04/23 于復(fù)興中路裸心社
回頭看看之前寫的這篇文章,印象中讀源碼的興趣源頭似乎來自于Java線程池,當(dāng)山頭被一座一座攻克時,你會發(fā)現(xiàn)掉到一個大坑中,因?yàn)椴欢念I(lǐng)域的實(shí)在太多。

前言

線程是稀缺資源,如果被無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,合理的使用線程池對線程進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控,有以下好處:
1、降低資源消耗;
2、提高響應(yīng)速度;
3、提高線程的可管理性。

Java1.5中引入的Executor框架把任務(wù)的提交和執(zhí)行進(jìn)行解耦,只需要定義好任務(wù),然后提交給線程池,而不用關(guān)心該任務(wù)是如何執(zhí)行、被哪個線程執(zhí)行,以及什么時候執(zhí)行。

demo


1、Executors.newFixedThreadPool(10)初始化一個包含10個線程的線程池executor;
2、通過executor.execute方法提交20個任務(wù),每個任務(wù)打印當(dāng)前的線程名;
3、負(fù)責(zé)執(zhí)行任務(wù)的線程的生命周期都由Executor框架進(jìn)行管理;

ThreadPoolExecutor

Executors是java線程池的工廠類,通過它可以快速初始化一個符合業(yè)務(wù)需求的線程池,如Executors.newFixedThreadPool方法可以生成一個擁有固定線程數(shù)的線程池。


其本質(zhì)是通過不同的參數(shù)初始化一個ThreadPoolExecutor對象,具體參數(shù)描述如下:

corePoolSize

線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊(duì)列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。

maximumPoolSize

線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊(duì)列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;

keepAliveTime

線程空閑時的存活時間,即當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間;默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時才有用;

unit

keepAliveTime的單位;

workQueue

用來保存等待被執(zhí)行的任務(wù)的阻塞隊(duì)列,且任務(wù)必須實(shí)現(xiàn)Runable接口,在JDK中提供了如下阻塞隊(duì)列:
1、ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,按FIFO排序任務(wù);
2、LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;
3、SynchronousQuene:一個不存儲元素的阻塞隊(duì)列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockingQuene;
4、priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊(duì)列;

threadFactory

創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設(shè)置一個具有識別度的線程名。


handler

線程池的飽和策略,當(dāng)阻塞隊(duì)列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:
1、AbortPolicy:直接拋出異常,默認(rèn)策略;
2、CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
3、DiscardOldestPolicy:丟棄阻塞隊(duì)列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
4、DiscardPolicy:直接丟棄任務(wù);
當(dāng)然也可以根據(jù)應(yīng)用場景實(shí)現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。

Exectors

Exectors工廠類提供了線程池的初始化接口,主要有如下幾種:

newFixedThreadPool

初始化一個指定線程數(shù)的線程池,其中corePoolSize == maximumPoolSize,使用LinkedBlockingQuene作為阻塞隊(duì)列,不過當(dāng)線程池沒有可執(zhí)行任務(wù)時,也不會釋放線程。

newCachedThreadPool

1、初始化一個可以緩存線程的線程池,默認(rèn)緩存60s,線程池的線程數(shù)可達(dá)到Integer.MAX_VALUE,即2147483647,內(nèi)部使用SynchronousQueue作為阻塞隊(duì)列;
2、和newFixedThreadPool創(chuàng)建的線程池不同,newCachedThreadPool在沒有任務(wù)執(zhí)行時,當(dāng)線程的空閑時間超過keepAliveTime,會自動釋放線程資源,當(dāng)提交新任務(wù)時,如果沒有空閑線程,則創(chuàng)建新線程執(zhí)行任務(wù),會導(dǎo)致一定的系統(tǒng)開銷;

所以,使用該線程池時,一定要注意控制并發(fā)的任務(wù)數(shù),否則創(chuàng)建大量的線程可能導(dǎo)致嚴(yán)重的性能問題。

newSingleThreadExecutor

初始化的線程池中只有一個線程,如果該線程異常結(jié)束,會重新創(chuàng)建一個新的線程繼續(xù)執(zhí)行任務(wù),唯一的線程可以保證所提交任務(wù)的順序執(zhí)行,內(nèi)部使用LinkedBlockingQueue作為阻塞隊(duì)列。

newScheduledThreadPool

初始化的線程池可以在指定的時間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實(shí)際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)。

實(shí)現(xiàn)原理

除了newScheduledThreadPool的內(nèi)部實(shí)現(xiàn)特殊一點(diǎn)之外,其它幾個線程池都是基于ThreadPoolExecutor類實(shí)現(xiàn)的。

線程池內(nèi)部狀態(tài)


其中AtomicInteger變量ctl的功能非常強(qiáng)大:利用低29位表示線程池中線程數(shù),通過高3位表示線程池的運(yùn)行狀態(tài):
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態(tài)的線程池會接收新任務(wù),并處理阻塞隊(duì)列中的任務(wù);
2、SHUTDOWN: 0 << COUNT_BITS,即高3位為000,該狀態(tài)的線程池不會接收新任務(wù),但會處理阻塞隊(duì)列中的任務(wù);
3、STOP : 1 << COUNT_BITS,即高3位為001,該狀態(tài)的線程不會接收新任務(wù),也不會處理阻塞隊(duì)列中的任務(wù),而且會中斷正在運(yùn)行的任務(wù);
4、TIDYING : 2 << COUNT_BITS,即高3位為010;
5、TERMINATED: 3 << COUNT_BITS,即高3位為011;

任務(wù)提交

線程池框架提供了兩種方式提交任務(wù),根據(jù)不同的業(yè)務(wù)需求選擇不同的方式。

Executor.execute()

通過Executor.execute()方法提交的任務(wù),必須實(shí)現(xiàn)Runnable接口,該方式提交的任務(wù)不能獲取返回值,因此無法判斷任務(wù)是否執(zhí)行成功。

ExecutorService.submit()

通過ExecutorService.submit()方法提交的任務(wù),可以獲取任務(wù)執(zhí)行完的返回值。

任務(wù)執(zhí)行

當(dāng)向線程池中提交一個任務(wù),線程池會如何處理該任務(wù)?

execute實(shí)現(xiàn)

具體的執(zhí)行流程如下:
1、workerCountOf方法根據(jù)ctl的低29位,得到線程池的當(dāng)前線程數(shù),如果線程數(shù)小于corePoolSize,則執(zhí)行addWorker方法創(chuàng)建新的線程執(zhí)行任務(wù);否則執(zhí)行步驟(2);
2、如果線程池處于RUNNING狀態(tài),且把提交的任務(wù)成功放入阻塞隊(duì)列中,則執(zhí)行步驟(3),否則執(zhí)行步驟(4);
3、再次檢查線程池的狀態(tài),如果線程池沒有RUNNING,且成功從阻塞隊(duì)列中刪除任務(wù),則執(zhí)行reject方法處理任務(wù);
4、執(zhí)行addWorker方法創(chuàng)建新的線程執(zhí)行任務(wù),如果addWoker執(zhí)行失敗,則執(zhí)行reject方法處理任務(wù);


addWorker實(shí)現(xiàn)

從方法execute的實(shí)現(xiàn)可以看出:addWorker主要負(fù)責(zé)創(chuàng)建新的線程并執(zhí)行任務(wù),代碼實(shí)現(xiàn)如下:


這只是addWoker方法實(shí)現(xiàn)的前半部分:
1、判斷線程池的狀態(tài),如果線程池的狀態(tài)值大于或等SHUTDOWN,則不處理提交的任務(wù),直接返回;
2、通過參數(shù)core判斷當(dāng)前需要創(chuàng)建的線程是否為核心線程,如果core為true,且當(dāng)前線程數(shù)小于corePoolSize,則跳出循環(huán),開始創(chuàng)建新的線程,具體實(shí)現(xiàn)如下:


線程池的工作線程通過Woker類實(shí)現(xiàn),在ReentrantLock鎖的保證下,把Woker實(shí)例插入到HashSet后,并啟動Woker中的線程,其中Worker類設(shè)計(jì)如下:
1、繼承了AQS類,可以方便的實(shí)現(xiàn)工作線程的中止操作;
2、實(shí)現(xiàn)了Runnable接口,可以將自身作為一個任務(wù)在工作線程中執(zhí)行;
3、當(dāng)前提交的任務(wù)firstTask作為參數(shù)傳入Worker的構(gòu)造方法;


從Woker類的構(gòu)造方法實(shí)現(xiàn)可以發(fā)現(xiàn):線程工廠在創(chuàng)建線程thread時,將Woker實(shí)例本身this作為參數(shù)傳入,當(dāng)執(zhí)行start方法啟動線程thread時,本質(zhì)是執(zhí)行了Worker的runWorker方法。

runWorker實(shí)現(xiàn)

runWorker方法是線程池的核心:
1、線程啟動之后,通過unlock方法釋放鎖,設(shè)置AQS的state為0,表示運(yùn)行中斷;
2、獲取第一個任務(wù)firstTask,執(zhí)行任務(wù)的run方法,不過在執(zhí)行任務(wù)之前,會進(jìn)行加鎖操作,任務(wù)執(zhí)行完會釋放鎖;
3、在執(zhí)行任務(wù)的前后,可以根據(jù)業(yè)務(wù)場景自定義beforeExecute和afterExecute方法;
4、firstTask執(zhí)行完成之后,通過getTask方法從阻塞隊(duì)列中獲取等待的任務(wù),如果隊(duì)列中沒有任務(wù),getTask方法會被阻塞并掛起,不會占用cpu資源;

getTask實(shí)現(xiàn)

整個getTask操作在自旋下完成:
1、workQueue.take:如果阻塞隊(duì)列為空,當(dāng)前線程會被掛起等待;當(dāng)隊(duì)列中有任務(wù)加入時,線程被喚醒,take方法返回任務(wù),并執(zhí)行;
2、workQueue.poll:如果在keepAliveTime時間內(nèi),阻塞隊(duì)列還是沒有任務(wù),則返回null;

所以,線程池中實(shí)現(xiàn)的線程可以一直執(zhí)行由用戶提交的任務(wù)。

Future和Callable實(shí)現(xiàn)

通過ExecutorService.submit()方法提交的任務(wù),可以獲取任務(wù)執(zhí)行完的返回值。


在實(shí)際業(yè)務(wù)場景中,F(xiàn)uture和Callable基本是成對出現(xiàn)的,Callable負(fù)責(zé)產(chǎn)生結(jié)果,F(xiàn)uture負(fù)責(zé)獲取結(jié)果。
1、Callable接口類似于Runnable,只是Runnable沒有返回值。
2、Callable任務(wù)除了返回正常結(jié)果之外,如果發(fā)生異常,該異常也會被返回,即Future可以拿到異步執(zhí)行任務(wù)各種結(jié)果;
3、Future.get方法會導(dǎo)致主線程阻塞,直到Callable任務(wù)執(zhí)行完成;

submit實(shí)現(xiàn)

通過submit方法提交的Callable任務(wù)會被封裝成了一個FutureTask對象。

FutureTask


1、FutureTask在不同階段擁有不同的狀態(tài)state,初始化為NEW;
2、FutureTask類實(shí)現(xiàn)了Runnable接口,這樣就可以通過Executor.execute方法提交FutureTask到線程池中等待被執(zhí)行,最終執(zhí)行的是FutureTask的run方法;

FutureTask.get實(shí)現(xiàn)


內(nèi)部通過awaitDone方法對主線程進(jìn)行阻塞,具體實(shí)現(xiàn)如下:


1、如果主線程被中斷,則拋出中斷異常;
2、判斷FutureTask當(dāng)前的state,如果大于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完成,則直接返回;
3、如果當(dāng)前state等于COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完,這時主線程只需通過yield方法讓出cpu資源,等待state變成NORMAL;
4、通過WaitNode類封裝當(dāng)前線程,并通過UNSAFE添加到waiters鏈表;
5、最終通過LockSupport的park或parkNanos掛起線程;

FutureTask.run實(shí)現(xiàn)


FutureTask.run方法是在線程池中被執(zhí)行的,而非主線程
1、通過執(zhí)行Callable任務(wù)的call方法;
2、如果call執(zhí)行成功,則通過set方法保存結(jié)果;
3、如果call執(zhí)行有異常,則通過setException保存異常;

set

setException

set和setException方法中,都會通過UnSAFE修改FutureTask的狀態(tài),并執(zhí)行finishCompletion方法通知主線程任務(wù)已經(jīng)執(zhí)行完成;

finishCompletion

1、執(zhí)行FutureTask類的get方法時,會把主線程封裝成WaitNode節(jié)點(diǎn)并保存在waiters鏈表中;
2、FutureTask任務(wù)執(zhí)行完成后,通過UNSAFE設(shè)置waiters的值,并通過LockSupport類unpark方法喚醒主線程;


我是占小狼
坐標(biāo)魔都,白天上班族,晚上是知識的分享者
如果讀完覺得有收獲的話,歡迎點(diǎn)贊加關(guān)注


    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多