前面一篇文章從Executors中的工廠方法入手,已經(jīng)對ThreadPoolExecutor的構造和使用做了一些整理。而這篇文章,我們將接著前面的介紹,從源碼實現(xiàn)上對ThreadPoolExecutor在任務的提交、執(zhí)行,線程重用和線程數(shù)維護等方面做下分析。
0. ThreadPoolExecutor類的聲明屬性變量分析
1 | public class ThreadPoolExecutor extends AbstractExecutorService
|
從這個類聲明中我們可以看到java.util.ThreadPoolExecutor是繼承于AbstractExecutorService的,而之前的文章我也提到過,AbstractExecutorService已經(jīng)實現(xiàn)了一些任務提交處理的方法,如submit()方法都是在這個抽象類中實現(xiàn)的。但submit()方法,最后也是會調(diào)用ThreadPoolExecutor的execute()方法。
打開SunJDK中的ThreadPoolExecutor類源碼,除了上篇文章提到的一些和構造方法中參數(shù)對應的屬性之外,讓我們看看還有什么:
- mainLock 對整個ThreadPoolExecutor對象的鎖
- workers 存儲工作線程對應Worker對象的HashSet
- termination 線程池ThreadPoolExecutor對象的生命周期終止條件,和mainLock相關
- largestPoolSize 線程池跑過的最大線程數(shù)
- completedTaskCount 完成任務數(shù)
- ctl 執(zhí)行器ThreadPoolExecutor的生命周期狀態(tài)和活動狀態(tài)的worker數(shù)封裝
稍微需要說一下最后一個, ctl是一個AtomicInteger對象,以位運算的方式打包封裝了當前線程池ThreadPoolExecutor對象的狀態(tài)和活動線程數(shù)兩個數(shù)據(jù)
1. 執(zhí)行器狀態(tài)
ExecutorService中已經(jīng)指定了這個接口對應的類要實現(xiàn)的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了狀態(tài)的含義,并包含其于ctl屬性中。
ThreadPoolExecutor對象有五種狀態(tài),如下:
- RUNNING 在ThreadPoolExecutor被實例化的時候就是這個狀態(tài)
- SHUTDOWN 通常是已經(jīng)執(zhí)行過shutdown()方法,不再接受新任務,等待線程池中和隊列中任務完成
- STOP 通常是已經(jīng)執(zhí)行過shutdownNow()方法,不接受新任務,隊列中的任務也不再執(zhí)行,并嘗試終止線程池中的線程
- TIDYING 線程池為空,就會到達這個狀態(tài),執(zhí)行terminated()方法
- TERMINATED terminated()執(zhí)行完畢,就會到達這個狀態(tài),ThreadPoolExecutor終結
2. Worker內(nèi)部類
它既實現(xiàn)了Runnable,同時也是一個AQS ( AbstractQueuedSynchronizer )。
1 2 3 | private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
|
封裝了3樣東西,Runnable類的首個任務對象,執(zhí)行的線程thread和完成的任務數(shù)(volatile)completedTasks。
1 2 3 | final Thread thread;
Runnable firstTask;
volatile long completedTasks;
|
這個類還提供了interruptIfStarted()這樣一個方法,里面做了(getState()>= 0)的判斷。與此呼應,Worker的構造方法里對state設置了-1,避免在線程執(zhí)行前被停掉。
1 2 3 4 5 | Worker(Runnable firstTask) {
setState(- 1 ); // inhibit interrupts until runWorker
this .firstTask = firstTask;
this .thread = getThreadFactory().newThread( this );
}
|
3. 提交任務
上篇文章已經(jīng)提到了,提交新任務的時候,如果沒達到核心線程數(shù)corePoolSize,則開辟新線程執(zhí)行。如果達到核心線程數(shù)corePoolSize, 而隊列未滿,則放入隊列,否則開新線程處理任務,直到maximumPoolSize,超出則丟棄處理。
這段源碼邏輯如下,不細說了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | public void execute(Runnable command) {
if (command == null )
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true ))
return ;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0 )
addWorker( null , false );
}
else if (!addWorker(command, false ))
reject(command);
}
|
4. addWorker()的實現(xiàn)
在上面提交任務的時候,會出現(xiàn)開辟新的線程來執(zhí)行,這會調(diào)用addWorker()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false ;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false ;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false ;
boolean workerAdded = false ;
Worker w = null ;
try {
final ReentrantLock mainLock = this .mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null ) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null )) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true ;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true ;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
|
代碼較長,我們可以分兩大部分看:
第一段從第3行到第26行,是雙層無限循環(huán),嘗試增加線程數(shù)到ctl變量,并且做一些比較判斷,如果超出線程數(shù)限定或者ThreadPoolExecutor的狀態(tài)不符合要求,則直接返回false,增加worker失敗。
第二段從第28行開始到結尾,把firstTask這個Runnable對象傳給Worker構造方法,賦值給Worker對象的task屬性。Worker對象把自身(也是一個Runnable)封裝成一個Thread對象賦予Worker對象的thread屬性。鎖住整個線程池并實際增加worker到workers的HashSet對象當中。成功增加后開始執(zhí)行t.start(),就是worker的thread屬性開始運行,實際上就是運行Worker對象的run方法。Worker的run()方法實際上調(diào)用了ThreadPoolExecutor的runWorker()方法。
5. 任務的執(zhí)行runWorker()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null ;
w.unlock(); // allow interrupts
boolean completedAbruptly = true ;
try {
while (task != null || (task = getTask()) != null ) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null ;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null ;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false ;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
|
這段代碼實際上就是執(zhí)行提交給線程池執(zhí)行的Runnable任務的實際內(nèi)容。其中,值得注意的有以下幾點:
- 線程開始執(zhí)行前,需要對worker加鎖,完成一個任務后執(zhí)行unlock()
- 在任務執(zhí)行前后,執(zhí)行beforeExecute()和afterExecute()方法
- 記錄任務執(zhí)行中的異常后,繼續(xù)拋出
- 每個任務完成后,會記錄當前線程完成的任務數(shù)
- 當worker執(zhí)行完一個任務的時候,包括初始任務firstTask,會調(diào)用getTask()繼續(xù)獲取任務,這個方法調(diào)用是可以阻塞的
- 線程退出,執(zhí)行processWorkerExit(w, completedAbruptly)處理
5. Worker線程的復用和任務的獲取getTask()
在上一段代碼中,也就是runWorker()方法,任務的執(zhí)行過程是嵌套在while循環(huán)語句塊中的。每當一個任務執(zhí)行完畢,會從頭開始做下一次循環(huán)執(zhí)行,實現(xiàn)了空閑線程的復用。而要執(zhí)行的任務則是來自于getTask()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | private Runnable getTask() {
boolean timedOut = false ; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null ;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break ;
if (compareAndDecrementWorkerCount(c))
return null ;
c = ctl.get();
// Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null )
return r;
timedOut = true ;
} catch (InterruptedException retry) {
timedOut = false ;
}
}
}
|
getTask()實際上是從工作隊列(workQueue)中取提交進來的任務。這個workQueue是一個BlockingQueue,通常當隊列中沒有新任務的時候,則getTask()會阻塞。另外,還有定時阻塞這樣一段邏輯:如果從隊列中取任務是計時的,則用poll()方法,并設置等待時間為keepAlive,否則調(diào)用阻塞方法take()。當poll()超時,則獲取到的任務為null,timeOut設置為 true。這段代碼也是放在一個for(;;)循環(huán)中,前面有判斷超時的語句,如果超時,則return null。這意味著runWorker()方法的while循環(huán)結束,線程將退出,執(zhí)行processWorkerExit()方法。
回頭看看是否計時是如何確定的。
1 2 | int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
|
即判斷當前線程池的線程數(shù)是否超出corePoolSize,如果超出這個值并且空閑時間多于keepAlive則當前線程退出。
另外一種情況就是allowCoreThreadTimeOut為true,就是允許核心在空閑超時的情況下停掉。
6. 線程池線程數(shù)的維護和線程的退出處理
剛剛也提到了,我們再看下processWorkerExit()方法。這個方法最主要就是從workers的Set中remove掉一個多余的線程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this .mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1 ;
if (workerCountOf(c) >= min)
return ; // replacement not needed
}
addWorker( null , false );
}
}
|
這個方法的第二個參數(shù)是判斷是否在runWorker()中正常退出了循環(huán)向下執(zhí)行,如果不是,說明在執(zhí)行任務的過程中出現(xiàn)了異常,completedAbruptly為true,線程直接退出,需要直接對活動線程數(shù)減1 。
之后,加鎖統(tǒng)計完成的任務數(shù),并從workers這個集合中移除當前worker。
執(zhí)行tryTerminate(),這個方法后面會詳細說,主要就是嘗試將線程池推向TERMINATED狀態(tài)。
最后比較當前線程數(shù)是不是已經(jīng)低于應有的線程數(shù),如果這個情況發(fā)生,則添加無任務的空Worker到線程池中待命。
以上,增加新的線程和剔除多余的線程的過程大概就是如此,這樣線程池能保持額定的線程數(shù),并彈性伸縮,保證系統(tǒng)的資源不至于過度消耗。
|