小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

深入理解Java之線程池(中)

 月冷星河 2016-04-24

接上文


下面我們看一下Worker類的實(shí)現(xiàn):


private final class Worker implements Runnable {

    private final ReentrantLock runLock = new ReentrantLock();

    private Runnable firstTask;

    volatile long completedTasks;

    Thread thread;

    Worker(Runnable firstTask) {

        this.firstTask = firstTask;

    }

    boolean isActive() {

        return runLock.isLocked();

    }

    void interruptIfIdle() {

        final ReentrantLock runLock = this.runLock;

        if (runLock.tryLock()) {

            try {

        if (thread != Thread.currentThread())

        thread.interrupt();

            } finally {

                runLock.unlock();

            }

        }

    }

    void interruptNow() {

        thread.interrupt();

    }

 

    private void runTask(Runnable task) {

        final ReentrantLock runLock = this.runLock;

        runLock.lock();

        try {

            if (runState < STOP &&

                Thread.interrupted() &&

                runState >= STOP)

            boolean ran = false;

            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個(gè)方法,沒(méi)有具體實(shí)現(xiàn),用戶可以根據(jù)

            //自己需要重載這個(gè)方法和后面的afterExecute方法來(lái)進(jìn)行一些統(tǒng)計(jì)信息,比如某個(gè)任務(wù)的執(zhí)行時(shí)間等          

            try {

                task.run();

                ran = true;

                afterExecute(task, null);

                ++completedTasks;

            } catch (RuntimeException ex) {

                if (!ran)

                    afterExecute(task, ex);

                throw ex;

            }

        } finally {

            runLock.unlock();

        }

    }

 

    public void run() {

        try {

            Runnable task = firstTask;

            firstTask = null;

            while (task != null || (task = getTask()) != null) {

                runTask(task);

                task = null;

            }

        } finally {

            workerDone(this);   //當(dāng)任務(wù)隊(duì)列中沒(méi)有任務(wù)時(shí),進(jìn)行清理工作      

        }

    }

}


它實(shí)際上實(shí)現(xiàn)了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本一樣:


Thread t = new Thread(w);


相當(dāng)于傳進(jìn)去了一個(gè)Runnable任務(wù),在線程t中執(zhí)行這個(gè)Runnable。


既然Worker實(shí)現(xiàn)了Runnable接口,那么自然最核心的方法便是run()方法了:


public void run() {

    try {

        Runnable task = firstTask;

        firstTask = null;

        while (task != null || (task = getTask()) != null) {

            runTask(task);

            task = null;

        }

    } finally {

        workerDone(this);

    }

}


從run方法的實(shí)現(xiàn)可以看出,它首先執(zhí)行的是通過(guò)構(gòu)造器傳進(jìn)來(lái)的任務(wù)firstTask,在調(diào)用runTask()執(zhí)行完firstTask之后,在while循環(huán)里面不斷通過(guò)getTask()去取新的任務(wù)來(lái)執(zhí)行,那么去哪里取呢?自然是從任務(wù)緩存隊(duì)列里面去取,getTask是ThreadPoolExecutor類中的方法,并不是Worker類中的方法,下面是getTask方法的實(shí)現(xiàn):


Runnable getTask() {

    for (;;) {

        try {

            int state = runState;

            if (state > SHUTDOWN)

                return null;

            Runnable r;

            if (state == SHUTDOWN)  // Help drain queue

                r = workQueue.poll();

            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果線程數(shù)大于核心池大小或者允許為核心池線程設(shè)置空閑時(shí)間,

                //則通過(guò)poll取任務(wù),若等待一定的時(shí)間取不到任務(wù),則返回null

                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);

            else

                r = workQueue.take();

            if (r != null)

                return r;

            if (workerCanExit()) {    //如果沒(méi)取到任務(wù),即r為null,則判斷當(dāng)前的worker是否可以退出

                if (runState >= SHUTDOWN) // Wake up others

                    interruptIdleWorkers();   //中斷處于空閑狀態(tài)的worker

                return null;

            }

            // Else retry

        } catch (InterruptedException ie) {

            // On interruption, re-check runState

        }

    }

}


在getTask中,先判斷當(dāng)前線程池狀態(tài),如果runState大于SHUTDOWN(即為STOP或者TERMINATED),則直接返回null。


如果runState為SHUTDOWN或者RUNNING,則從任務(wù)緩存隊(duì)列取任務(wù)。


如果當(dāng)前線程池的線程數(shù)大于核心池大小corePoolSize或者允許為核心池中的線程設(shè)置空閑存活時(shí)間,則調(diào)用poll(time,timeUnit)來(lái)取任務(wù),這個(gè)方法會(huì)等待一定的時(shí)間,如果取不到任務(wù)就返回null。


然后判斷取到的任務(wù)r是否為null,為null則通過(guò)調(diào)用workerCanExit()方法來(lái)判斷當(dāng)前worker是否可以退出,我們看一下workerCanExit()的實(shí)現(xiàn):


private boolean workerCanExit() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    boolean canExit;

    //如果runState大于等于STOP,或者任務(wù)緩存隊(duì)列為空了

    //或者  允許為核心池線程設(shè)置空閑存活時(shí)間并且線程池中的線程數(shù)目大于1

    try {

        canExit = runState >= STOP ||

            workQueue.isEmpty() ||

            (allowCoreThreadTimeOut &&

             poolSize > Math.max(1, corePoolSize));

    } finally {

        mainLock.unlock();

    }

    return canExit;

}


也就是說(shuō)如果線程池處于STOP狀態(tài)、或者任務(wù)隊(duì)列已為空或者允許為核心池線程設(shè)置空閑存活時(shí)間并且線程數(shù)大于1時(shí),允許worker退出。如果允許worker退出,則調(diào)用interruptIdleWorkers()中斷處于空閑狀態(tài)的worker,我們看一下interruptIdleWorkers()的實(shí)現(xiàn):


void interruptIdleWorkers() {

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        for (Worker w : workers)  //實(shí)際上調(diào)用的是worker的interruptIfIdle()方法

            w.interruptIfIdle();

    } finally {

        mainLock.unlock();

    }

}


從實(shí)現(xiàn)可以看出,它實(shí)際上調(diào)用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:


void interruptIfIdle() {

    final ReentrantLock runLock = this.runLock;

    if (runLock.tryLock()) {    //注意這里,是調(diào)用tryLock()來(lái)獲取鎖的,因?yàn)槿绻?dāng)前worker正在執(zhí)行任務(wù),鎖已經(jīng)被獲取了,是無(wú)法獲取到鎖的

                                //如果成功獲取了鎖,說(shuō)明當(dāng)前worker處于空閑狀態(tài)

        try {

    if (thread != Thread.currentThread()) 

    thread.interrupt();

        } finally {

            runLock.unlock();

        }

    }

}


這里有一個(gè)非常巧妙的設(shè)計(jì)方式,假如我們來(lái)設(shè)計(jì)線程池,可能會(huì)有一個(gè)任務(wù)分派線程,當(dāng)發(fā)現(xiàn)有線程空閑時(shí),就從任務(wù)緩存隊(duì)列中取一個(gè)任務(wù)交給空閑線程執(zhí)行。但是在這里,并沒(méi)有采用這樣的方式,因?yàn)檫@樣會(huì)要額外地對(duì)任務(wù)分派線程進(jìn)行管理,無(wú)形地會(huì)增加難度和復(fù)雜度,這里直接讓執(zhí)行完任務(wù)的線程去任務(wù)緩存隊(duì)列里面取任務(wù)來(lái)執(zhí)行。


我們?cè)倏碼ddIfUnderMaximumPoolSize方法的實(shí)現(xiàn),這個(gè)方法的實(shí)現(xiàn)思想和addIfUnderCorePoolSize方法的實(shí)現(xiàn)思想非常相似,唯一的區(qū)別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數(shù)達(dá)到了核心池大小并且往任務(wù)隊(duì)列中添加任務(wù)失敗的情況下執(zhí)行的:


private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

    Thread t = null;

    final ReentrantLock mainLock = this.mainLock;

    mainLock.lock();

    try {

        if (poolSize < maximumPoolSize && runState == RUNNING)

            t = addThread(firstTask);

    } finally {

        mainLock.unlock();

    }

    if (t == null)

        return false;

    t.start();

    return true;

}


看到?jīng)]有,其實(shí)它和addIfUnderCorePoolSize方法的實(shí)現(xiàn)基本一模一樣,只是if語(yǔ)句判斷條件中的poolSize < maximumPoolSize不同而已。


到這里,大部分朋友應(yīng)該對(duì)任務(wù)提交給線程池之后到被執(zhí)行的整個(gè)過(guò)程有了一個(gè)基本的了解,下面總結(jié)一下:


1)首先,要清楚corePoolSize和maximumPoolSize的含義;


2)其次,要知道Worker是用來(lái)起到什么作用的;


3)要知道任務(wù)提交給線程池之后的處理策略,這里總結(jié)一下主要有4點(diǎn):


  • 如果當(dāng)前線程池中的線程數(shù)目小于corePoolSize,則每來(lái)一個(gè)任務(wù),就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行這個(gè)任務(wù);


  • 如果當(dāng)前線程池中的線程數(shù)目>=corePoolSize,則每來(lái)一個(gè)任務(wù),會(huì)嘗試將其添加到任務(wù)緩存隊(duì)列當(dāng)中,若添加成功,則該任務(wù)會(huì)等待空閑線程將其取出去執(zhí)行;若添加失?。ㄒ话銇?lái)說(shuō)是任務(wù)緩存隊(duì)列已滿),則會(huì)嘗試創(chuàng)建新的線程去執(zhí)行這個(gè)任務(wù);


  • 如果當(dāng)前線程池中的線程數(shù)目達(dá)到maximumPoolSize,則會(huì)采取任務(wù)拒絕策略進(jìn)行處理;


  • 如果線程池中的線程數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;如果允許為核心池中的線程設(shè)置存活時(shí)間,那么核心池中的線程空閑時(shí)間超過(guò)keepAliveTime,線程也會(huì)被終止。


3.線程池中的線程初始化


默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒(méi)有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。


在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過(guò)以下兩個(gè)方法辦到:


  • prestartCoreThread():初始化一個(gè)核心線程;

  • prestartAllCoreThreads():初始化所有核心線程


下面是這2個(gè)方法的實(shí)現(xiàn):


public boolean prestartCoreThread() {

    return addIfUnderCorePoolSize(null); //注意傳進(jìn)去的參數(shù)是null

}

 

public int prestartAllCoreThreads() {

    int n = 0;

    while (addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null

        ++n;

    return n;

}


注意上面?zhèn)鬟M(jìn)去的參數(shù)是null,根據(jù)第2小節(jié)的分析可知如果傳進(jìn)去的參數(shù)為null,則最后執(zhí)行線程會(huì)阻塞在getTask方法中的


r = workQueue.take();


即等待任務(wù)隊(duì)列中有任務(wù)。


4.任務(wù)緩存隊(duì)列及排隊(duì)策略


在前面我們多次提到了任務(wù)緩存隊(duì)列,即workQueue,它用來(lái)存放等待執(zhí)行的任務(wù)。


workQueue的類型為BlockingQueue,通??梢匀∠旅嫒N類型:


1)ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時(shí)必須指定大小;


2)LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒(méi)有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;


3)synchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來(lái)執(zhí)行新來(lái)的任務(wù)。


5.任務(wù)拒絕策略


當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來(lái)就會(huì)采取任務(wù)拒絕策略,通常有以下四種策略:


ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。

ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。

ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過(guò)程)

ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)


6.線程池的關(guān)閉


ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:


  • shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)


  • shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)


7.線程池容量的動(dòng)態(tài)調(diào)整


ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),


  • setCorePoolSize:設(shè)置核心池大小


  • setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小


當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來(lái)執(zhí)行任務(wù)。


三.使用示例


前面我們討論了關(guān)于線程池的實(shí)現(xiàn)原理,這一節(jié)我們來(lái)看一下它的具體使用:


public class Test {

     public static void main(String[] args) {  

         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,

                 new ArrayBlockingQueue(5));

 

         for(int i=0;i<15;i++){

             MyTask myTask = new MyTask(i);

             executor.execute(myTask);

             System.out.println('線程池中線程數(shù)目:'+executor.getPoolSize()+',隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:'+

             executor.getQueue().size()+',已執(zhí)行玩別的任務(wù)數(shù)目:'+executor.getCompletedTaskCount());

         }

         executor.shutdown();

     }

}

 

class MyTask implements Runnable {

    private int taskNum;

 

    public MyTask(int num) {

        this.taskNum = num;

    }

 

    @Override

    public void run() {

        System.out.println('正在執(zhí)行task '+taskNum);

        try {

            Thread.currentThread().sleep(4000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        System.out.println('task '+taskNum+'執(zhí)行完畢');

    }

}


執(zhí)行結(jié)果:


正在執(zhí)行task 0

線程池中線程數(shù)目:1,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0

線程池中線程數(shù)目:2,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 1

線程池中線程數(shù)目:3,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 2

線程池中線程數(shù)目:4,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 3

線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:0,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 4

線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:1,已執(zhí)行玩別的任務(wù)數(shù)目:0

線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:2,已執(zhí)行玩別的任務(wù)數(shù)目:0

線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:3,已執(zhí)行玩別的任務(wù)數(shù)目:0

線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:4,已執(zhí)行玩別的任務(wù)數(shù)目:0

線程池中線程數(shù)目:5,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0

線程池中線程數(shù)目:6,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 10

線程池中線程數(shù)目:7,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 11

線程池中線程數(shù)目:8,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 12

線程池中線程數(shù)目:9,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 13

線程池中線程數(shù)目:10,隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:5,已執(zhí)行玩別的任務(wù)數(shù)目:0

正在執(zhí)行task 14

task 3執(zhí)行完畢

task 0執(zhí)行完畢

task 2執(zhí)行完畢

task 1執(zhí)行完畢

正在執(zhí)行task 8

正在執(zhí)行task 7

正在執(zhí)行task 6

正在執(zhí)行task 5

task 4執(zhí)行完畢

task 10執(zhí)行完畢

task 11執(zhí)行完畢

task 13執(zhí)行完畢

task 12執(zhí)行完畢

正在執(zhí)行task 9

task 14執(zhí)行完畢

task 8執(zhí)行完畢

task 5執(zhí)行完畢

task 7執(zhí)行完畢

task 6執(zhí)行完畢

task 9執(zhí)行完畢


從執(zhí)行結(jié)果可以看出,當(dāng)線程池中線程的數(shù)目大于5時(shí),便將任務(wù)放入任務(wù)緩存隊(duì)列里面,當(dāng)任務(wù)緩存隊(duì)列滿了之后,便創(chuàng)建新的線程。如果上面程序中,將for循環(huán)中改成執(zhí)行20個(gè)任務(wù),就會(huì)拋出任務(wù)拒絕異常了。


不過(guò)在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個(gè)靜態(tài)方法來(lái)創(chuàng)建線程池:


Executors.newCachedThreadPool();        //創(chuàng)建一個(gè)緩沖池,緩沖池容量大小為Integer.MAX_VALUE

Executors.newSingleThreadExecutor();   //創(chuàng)建容量為1的緩沖池

Executors.newFixedThreadPool(int);    //創(chuàng)建固定容量大小的緩沖池


接下文


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買(mǎi)等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多