小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

ThreadPoolExecutor的任務提交、任務處理、線程復用和維護相關源碼分析 | 三石·道

 飛起來的感覺 2014-03-07

前面一篇文章從Executors中的工廠方法入手,已經(jīng)對ThreadPoolExecutor的構造和使用做了一些整理。而這篇文章,我們將接著前面的介紹,從源碼實現(xiàn)上對ThreadPoolExecutor在任務的提交、執(zhí)行,線程重用和線程數(shù)維護等方面做下分析。

0.    ThreadPoolExecutor類的聲明屬性變量分析

1
public class ThreadPoolExecutor extends AbstractExecutorService

從這個類聲明中我們可以看到java.util.ThreadPoolExecutor是繼承于AbstractExecutorService的,而之前的文章我也提到過,AbstractExecutorService已經(jīng)實現(xiàn)了一些任務提交處理的方法,如submit()方法都是在這個抽象類中實現(xiàn)的。但submit()方法,最后也是會調(diào)用ThreadPoolExecutor的execute()方法。

打開SunJDK中的ThreadPoolExecutor類源碼,除了上篇文章提到的一些和構造方法中參數(shù)對應的屬性之外,讓我們看看還有什么:

  • mainLock 對整個ThreadPoolExecutor對象的鎖
  • workers  存儲工作線程對應Worker對象的HashSet
  • termination 線程池ThreadPoolExecutor對象的生命周期終止條件,和mainLock相關
  • largestPoolSize 線程池跑過的最大線程數(shù)
  • completedTaskCount 完成任務數(shù)
  • ctl 執(zhí)行器ThreadPoolExecutor的生命周期狀態(tài)和活動狀態(tài)的worker數(shù)封裝

稍微需要說一下最后一個, ctl是一個AtomicInteger對象,以位運算的方式打包封裝了當前線程池ThreadPoolExecutor對象的狀態(tài)和活動線程數(shù)兩個數(shù)據(jù)

1.    執(zhí)行器狀態(tài)

ExecutorService中已經(jīng)指定了這個接口對應的類要實現(xiàn)的方法,其中就包括shutdown()和shutdownNow()等方法。在ThreadPoolExecutor中指明了狀態(tài)的含義,并包含其于ctl屬性中。

ThreadPoolExecutor對象有五種狀態(tài),如下:

  • RUNNING 在ThreadPoolExecutor被實例化的時候就是這個狀態(tài)
  • SHUTDOWN 通常是已經(jīng)執(zhí)行過shutdown()方法,不再接受新任務,等待線程池中和隊列中任務完成
  • STOP 通常是已經(jīng)執(zhí)行過shutdownNow()方法,不接受新任務,隊列中的任務也不再執(zhí)行,并嘗試終止線程池中的線程
  • TIDYING 線程池為空,就會到達這個狀態(tài),執(zhí)行terminated()方法
  • TERMINATED terminated()執(zhí)行完畢,就會到達這個狀態(tài),ThreadPoolExecutor終結

2.    Worker內(nèi)部類

它既實現(xiàn)了Runnable,同時也是一個AQS ( AbstractQueuedSynchronizer )。

1
2
3
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable

封裝了3樣東西,Runnable類的首個任務對象,執(zhí)行的線程thread和完成的任務數(shù)(volatile)completedTasks。

1
2
3
final Thread thread;
Runnable firstTask;
volatile long completedTasks;

這個類還提供了interruptIfStarted()這樣一個方法,里面做了(getState()>= 0)的判斷。與此呼應,Worker的構造方法里對state設置了-1,避免在線程執(zhí)行前被停掉。

1
2
3
4
5
Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

3. 提交任務

上篇文章已經(jīng)提到了,提交新任務的時候,如果沒達到核心線程數(shù)corePoolSize,則開辟新線程執(zhí)行。如果達到核心線程數(shù)corePoolSize, 而隊列未滿,則放入隊列,否則開新線程處理任務,直到maximumPoolSize,超出則丟棄處理。

這段源碼邏輯如下,不細說了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

4. addWorker()的實現(xiàn)

在上面提交任務的時候,會出現(xiàn)開辟新的線程來執(zhí)行,這會調(diào)用addWorker()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

代碼較長,我們可以分兩大部分看:

第一段從第3行到第26行,是雙層無限循環(huán),嘗試增加線程數(shù)到ctl變量,并且做一些比較判斷,如果超出線程數(shù)限定或者ThreadPoolExecutor的狀態(tài)不符合要求,則直接返回false,增加worker失敗。

第二段從第28行開始到結尾,把firstTask這個Runnable對象傳給Worker構造方法,賦值給Worker對象的task屬性。Worker對象把自身(也是一個Runnable)封裝成一個Thread對象賦予Worker對象的thread屬性。鎖住整個線程池并實際增加worker到workers的HashSet對象當中。成功增加后開始執(zhí)行t.start(),就是worker的thread屬性開始運行,實際上就是運行Worker對象的run方法。Worker的run()方法實際上調(diào)用了ThreadPoolExecutor的runWorker()方法。

5. 任務的執(zhí)行runWorker()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這段代碼實際上就是執(zhí)行提交給線程池執(zhí)行的Runnable任務的實際內(nèi)容。其中,值得注意的有以下幾點:

  • 線程開始執(zhí)行前,需要對worker加鎖,完成一個任務后執(zhí)行unlock()
  • 在任務執(zhí)行前后,執(zhí)行beforeExecute()和afterExecute()方法
  • 記錄任務執(zhí)行中的異常后,繼續(xù)拋出
  • 每個任務完成后,會記錄當前線程完成的任務數(shù)
  • 當worker執(zhí)行完一個任務的時候,包括初始任務firstTask,會調(diào)用getTask()繼續(xù)獲取任務,這個方法調(diào)用是可以阻塞的
  • 線程退出,執(zhí)行processWorkerExit(w, completedAbruptly)處理

5. Worker線程的復用和任務的獲取getTask()

在上一段代碼中,也就是runWorker()方法,任務的執(zhí)行過程是嵌套在while循環(huán)語句塊中的。每當一個任務執(zhí)行完畢,會從頭開始做下一次循環(huán)執(zhí)行,實現(xiàn)了空閑線程的復用。而要執(zhí)行的任務則是來自于getTask()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            boolean timed;      // Are workers subject to culling?
            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                     break;
                if (compareAndDecrementWorkerCount(c))
                     return null;
                c = ctl.get();
                // Re-read ctl
                if (runStateOf(c) != rs)
                     continue retry;
                // else CAS failed due to workerCount change; retry inner loop
             }
             try {
                 Runnable r = timed ?
                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                     workQueue.take();
                 if (r != null)
                     return r;
                 timedOut = true;
             } catch (InterruptedException retry) {
                 timedOut = false;
             }
         }
     }

getTask()實際上是從工作隊列(workQueue)中取提交進來的任務。這個workQueue是一個BlockingQueue,通常當隊列中沒有新任務的時候,則getTask()會阻塞。另外,還有定時阻塞這樣一段邏輯:如果從隊列中取任務是計時的,則用poll()方法,并設置等待時間為keepAlive,否則調(diào)用阻塞方法take()。當poll()超時,則獲取到的任務為null,timeOut設置為 true。這段代碼也是放在一個for(;;)循環(huán)中,前面有判斷超時的語句,如果超時,則return null。這意味著runWorker()方法的while循環(huán)結束,線程將退出,執(zhí)行processWorkerExit()方法。

回頭看看是否計時是如何確定的。

1
2
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;

即判斷當前線程池的線程數(shù)是否超出corePoolSize,如果超出這個值并且空閑時間多于keepAlive則當前線程退出。

另外一種情況就是allowCoreThreadTimeOut為true,就是允許核心在空閑超時的情況下停掉。

6. 線程池線程數(shù)的維護和線程的退出處理

剛剛也提到了,我們再看下processWorkerExit()方法。這個方法最主要就是從workers的Set中remove掉一個多余的線程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void processWorkerExit(Worker w, boolean completedAbruptly) {
         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
             decrementWorkerCount();
         final ReentrantLock mainLock = this.mainLock;
         mainLock.lock();
         try {
             completedTaskCount += w.completedTasks;
             workers.remove(w);
         } finally {
             mainLock.unlock();
         }
         tryTerminate();
         int c = ctl.get();
         if (runStateLessThan(c, STOP)) {
             if (!completedAbruptly) {
                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                 if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

這個方法的第二個參數(shù)是判斷是否在runWorker()中正常退出了循環(huán)向下執(zhí)行,如果不是,說明在執(zhí)行任務的過程中出現(xiàn)了異常,completedAbruptly為true,線程直接退出,需要直接對活動線程數(shù)減1 。

之后,加鎖統(tǒng)計完成的任務數(shù),并從workers這個集合中移除當前worker。

執(zhí)行tryTerminate(),這個方法后面會詳細說,主要就是嘗試將線程池推向TERMINATED狀態(tài)。

最后比較當前線程數(shù)是不是已經(jīng)低于應有的線程數(shù),如果這個情況發(fā)生,則添加無任務的空Worker到線程池中待命。

以上,增加新的線程和剔除多余的線程的過程大概就是如此,這樣線程池能保持額定的線程數(shù),并彈性伸縮,保證系統(tǒng)的資源不至于過度消耗。

0

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多