概述java.util.concurrent 包是專為 Java并發(fā)編程 而設(shè)計(jì)的包。包下的所有類可以分為如下幾大類: - locks部分:顯式鎖(互斥鎖和速寫鎖)相關(guān);
- atomic部分:原子變量類相關(guān),是構(gòu)建非阻塞算法的基礎(chǔ);
- executor部分:線程池相關(guān);
- collections部分:并發(fā)容器相關(guān);
- tools部分:同步工具相關(guān),如信號(hào)量、閉鎖、柵欄等功能;
類圖結(jié)構(gòu):腦圖地址: http://www./m/tJy5,感謝深入淺出 Java Concurrency ,此腦圖在這篇基礎(chǔ)上修改而來。 BlockingQueue此接口是一個(gè)線程安全 的 存取實(shí)例 的隊(duì)列。 使用場景BlockingQueue通常用于一個(gè)線程生產(chǎn)對(duì)象,而另外一個(gè)線程消費(fèi)這些對(duì)象的場景。 注意事項(xiàng):- 此隊(duì)列是
有限的 ,如果隊(duì)列到達(dá)臨界點(diǎn),Thread1 就會(huì)阻塞,直到Thread2 從隊(duì)列中拿走一個(gè)對(duì)象。 - 若果隊(duì)列是空的,
Thread2 會(huì)阻塞,直到Thread1 把一個(gè)對(duì)象丟進(jìn)隊(duì)列。
相關(guān)方法BlockingQueue中包含了如下操作方法: | Throws Exception | Special Value | Blocks | Times Out | Insert | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) | Remove | remove(o) | poll() | take() | poll(timeout, timeunit) | Examine | element() | peek() | | |
名詞解釋:- Throws Exception: 如果試圖的操作無法立即執(zhí)行,拋一個(gè)異常。
- Special Value: 如果試圖的操作無法立即執(zhí)行,返回一個(gè)特定的值(常常是 true / false)。
- Blocks: 如果試圖的操作無法立即執(zhí)行,該方法調(diào)用將會(huì)發(fā)生阻塞,直到能夠執(zhí)行。
- Times Out: 如果試圖的操作無法立即執(zhí)行,該方法調(diào)用將會(huì)發(fā)生阻塞,直到能夠執(zhí)行,但等待時(shí)間不會(huì)超過給定值。返回一個(gè)特定值以告知該操作是否成功(典型的是 true / false)。
注意事項(xiàng):- 無法插入 null,否則會(huì)拋出一個(gè) NullPointerException。
- 隊(duì)列這種數(shù)據(jù)結(jié)構(gòu),導(dǎo)致除了獲取開始和結(jié)尾位置的其他對(duì)象的效率都不高,雖然可通過
remove(o) 來移除任一對(duì)象。
實(shí)現(xiàn)類因?yàn)槭且粋€(gè)接口,所以我們必須使用一個(gè)實(shí)現(xiàn)類來使用它,有如下實(shí)現(xiàn)類: - ArrayBlockingQueue: 數(shù)組阻塞隊(duì)列
- DelayQueue: 延遲隊(duì)列
- LinkedBlockingQueue: 鏈阻塞隊(duì)列
- PriorityBlockingQueue: 具有優(yōu)先級(jí)的阻塞隊(duì)列
- SynchronousQueue: 同步隊(duì)列
使用示例:見: BlockingQueue ArrayBlockingQueueArrayBlockingQueue 是一個(gè)有界的阻塞隊(duì)列 - 內(nèi)部實(shí)現(xiàn)是將對(duì)象放到一個(gè)數(shù)組里。數(shù)組有個(gè)特性:一旦初始化,大小就無法修改。因此無法修改
ArrayBlockingQueue 初始化時(shí)的上限。 ArrayBlockingQueue 內(nèi)部以 FIFO(先進(jìn)先出) 的順序?qū)υ剡M(jìn)行存儲(chǔ)。隊(duì)列中的頭元素在所有元素之中是放入時(shí)間最久的那個(gè),而尾元素則是最短的那個(gè)。
DelayQueueDelayQueue 對(duì)元素進(jìn)行持有直到一個(gè)特定的延遲到期。注入其中的元素必須實(shí)現(xiàn) java.util.concurrent.Delayed 接口: public interface Delayed extends ComparableDelayed<> public long getDelay(TimeUnit timeUnit); // 返回將要延遲的時(shí)間段} - 在每個(gè)元素的 getDelay() 方法返回的值的時(shí)間段之后才釋放掉該元素。如果返回的是 0 或者負(fù)值,延遲將被認(rèn)為過期,該元素將會(huì)在 DelayQueue 的下一次 take 被調(diào)用的時(shí)候被釋放掉。
Delayed 接口也繼承了 java.lang.Comparable 接口,Delayed 對(duì)象之間可以進(jìn)行對(duì)比。這對(duì)DelayQueue 隊(duì)列中的元素進(jìn)行排序時(shí)有用,因此它們可以根據(jù)過期時(shí)間進(jìn)行有序釋放。
LinkedBlockingQueue內(nèi)部以一個(gè)鏈?zhǔn)浇Y(jié)構(gòu)(鏈接節(jié)點(diǎn))對(duì)其元素進(jìn)行存儲(chǔ) 。 - 可以選擇一個(gè)上限。如果沒有定義上限,將使用 Integer.MAX_VALUE 作為上限。
- 內(nèi)部以 FIFO(先進(jìn)先出)的順序?qū)υ剡M(jìn)行存儲(chǔ)。
PriorityBlockingQueue一個(gè)無界的并發(fā)隊(duì)列,它使用了和類 java.util.PriorityQueue 一樣的排序規(guī)則。 - 無法向這個(gè)隊(duì)列中插入 null 值。
- 插入到 其中的元素必須實(shí)現(xiàn) java.lang.Comparable 接口。
- 對(duì)于具有相等優(yōu)先級(jí)(compare() == 0)的元素并不強(qiáng)制任何特定行為。
- 從一個(gè) PriorityBlockingQueue 獲得一個(gè) Iterator 的話,該 Iterator 并不能保證它對(duì)元素的遍歷是以優(yōu)先級(jí)為序的。
SynchronousQueue一個(gè)特殊的隊(duì)列,它的內(nèi)部同時(shí)只能夠容納單個(gè)元素。 - 如果該隊(duì)列已有一元素的話,試圖向隊(duì)列中插入一個(gè)新元素的線程將會(huì)阻塞,直到另一個(gè)線程將該元素從隊(duì)列中抽走。
- 如果該隊(duì)列為空,試圖向隊(duì)列中抽取一個(gè)元素的線程將會(huì)阻塞,直到另一個(gè)線程向隊(duì)列中插入了一條新的元素。
BlockingDeque此接口表示一個(gè)線程安全 放入和提取實(shí)例的雙端隊(duì)列 。 使用場景通常用在一個(gè)線程既是生產(chǎn)者又是消費(fèi)者的時(shí)候。
注意事項(xiàng)- 如果雙端隊(duì)列已滿,插入線程將被阻塞,直到一個(gè)移除線程從該隊(duì)列中移出了一個(gè)元素。
- 如果雙端隊(duì)列為空,移除線程將被阻塞,直到一個(gè)插入線程向該隊(duì)列插入了一個(gè)新元素。
相關(guān)方法 | Throws Exception | Special Value | Blocks | Times Out | Insert | addFirst(o) | offerFirst(o) | putFirst(o) | offerFirst(o, timeout, timeunit) | Remove | removeFirst(o) | pollFirst(o) | takeFirst(o) | pollFirst(timeout, timeunit) | Examine | getFirst(o) | peekFirst(o) | | |
| Throws Exception | Special Value | Blocks | Times Out | Insert | addLast(o) | offerLast(o) | putLast(o) | offerLast(o, timeout, timeunit) | Remove | removeLast(o) | pollLast(o) | takeLast(o) | pollLast(timeout, timeunit) | Examine | getLast(o) | peekLast(o) | | |
注意事項(xiàng)- 關(guān)于方法的處理方式和上節(jié)一樣。
- BlockingDeque 接口繼承自 BlockingQueue 接口,可以用其中定義的方法。
實(shí)現(xiàn)類- LinkedBlockingDeque : 鏈阻塞雙端隊(duì)列
LinkedBlockingDequeLinkedBlockingDeque 是一個(gè)雙端隊(duì)列,可以從任意一端插入或者抽取元素的隊(duì)列。 - 在它為空的時(shí)候,一個(gè)試圖從中抽取數(shù)據(jù)的線程將會(huì)阻塞,無論該線程是試圖從哪一端抽取數(shù)據(jù)。
ConcurrentMap一個(gè)能夠?qū)e人的訪問(插入和提取)進(jìn)行并發(fā)處理的 java.util.Map接口。 ConcurrentMap 除了從其父接口 java.util.Map 繼承來的方法之外還有一些額外的原子性方法。 實(shí)現(xiàn)類因?yàn)槭墙涌?,必須用?shí)現(xiàn)類來使用它,其實(shí)現(xiàn)類為 ConcurrentHashMap與HashTable比較- 更好的并發(fā)性能,在你從中讀取對(duì)象的時(shí)候 ConcurrentHashMap 并不會(huì)把整個(gè) Map 鎖住,只是把 Map 中正在被寫入的部分進(jìn)行鎖定。
- 在被遍歷的時(shí)候,即使是 ConcurrentHashMap 被改動(dòng),它也不會(huì)拋 ConcurrentModificationException。
ConcurrentNavigableMap一個(gè)支持并發(fā)訪問的 java.util.NavigableMap,它還能讓它的子 map 具備并發(fā)訪問的能力。 headMapheadMap(T toKey) 方法返回一個(gè)包含了小于給定 toKey 的 key 的子 map。 tailMaptailMap(T fromKey) 方法返回一個(gè)包含了不小于給定 fromKey 的 key 的子 map。 subMapsubMap() 方法返回原始 map 中,鍵介于 from(包含) 和 to (不包含) 之間的子 map。 更多方法- descendingKeySet()
- descendingMap()
- navigableKeySet()
CountDownLatchCountDownLatch 是一個(gè)并發(fā)構(gòu)造,它允許一個(gè)或多個(gè)線程等待一系列指定操作的完成。 - CountDownLatch 以一個(gè)給定的數(shù)量初始化。countDown() 每被調(diào)用一次,這一數(shù)量就減一。
- 通過調(diào)用 await() 方法之一,線程可以阻塞等待這一數(shù)量到達(dá)零。
CyclicBarrierCyclicBarrier 類是一種同步機(jī)制,它能夠?qū)μ幚硪恍┧惴ǖ木€程實(shí)現(xiàn)同步。 更多實(shí)例參考: CyclicBarrier ExchangerExchanger 類表示一種兩個(gè)線程可以進(jìn)行互相交換對(duì)象的會(huì)和點(diǎn)。 更多實(shí)例參考: Exchanger SemaphoreSemaphore 類是一個(gè)計(jì)數(shù)信號(hào)量。具備兩個(gè)主要方法: - 每調(diào)用一次 acquire(),一個(gè)許可會(huì)被調(diào)用線程取走。
- 每調(diào)用一次 release(),一個(gè)許可會(huì)被返還給信號(hào)量。
Semaphore 用法- 保護(hù)一個(gè)重要(代碼)部分防止一次超過 N 個(gè)線程進(jìn)入。
- 在兩個(gè)線程之間發(fā)送信號(hào)。
保護(hù)重要部分如果你將信號(hào)量用于保護(hù)一個(gè)重要部分,試圖進(jìn)入這一部分的代碼通常會(huì)首先嘗試獲得一個(gè)許可,然后才能進(jìn)入重要部分(代碼塊),執(zhí)行完之后,再把許可釋放掉。 Semaphore semaphore = new Semaphore(1); //critical section semaphore.acquire(); ... semaphore.release(); 在線程之間發(fā)送信號(hào)如果你將一個(gè)信號(hào)量用于在兩個(gè)線程之間傳送信號(hào),通常你應(yīng)該用一個(gè)線程調(diào)用 acquire() 方法,而另一個(gè)線程調(diào)用 release() 方法。 - 如果沒有可用的許可,acquire() 調(diào)用將會(huì)阻塞,直到一個(gè)許可被另一個(gè)線程釋放出來。
- 如果無法往信號(hào)量釋放更多許可時(shí),一個(gè) release() 調(diào)用也會(huì)阻塞。
公平性無法擔(dān)保掉第一個(gè)調(diào)用 acquire() 的線程會(huì)是第一個(gè)獲得一個(gè)許可的線程。 可以通過如下來強(qiáng)制公平: Semaphore semaphore = new Semaphore(1, true); - 需要注意,強(qiáng)制公平會(huì)影響到并發(fā)性能,建議不使用。
ExecutorService這里之前有過簡單的總結(jié): Java 中幾種常用的線程池 存在于 java.util.concurrent 包里的 ExecutorService 實(shí)現(xiàn)就是一個(gè)線程池實(shí)現(xiàn)。 實(shí)現(xiàn)類此接口實(shí)現(xiàn)類包括: - ScheduledThreadPoolExecutor : 通過
Executors.newScheduledThreadPool(10) 創(chuàng)建的 - ThreadPoolExecutor: 除了第一種的
其他三種方式 創(chuàng)建的
相關(guān)方法- execute(Runnable):
無法得知被執(zhí)行的 Runnable 的執(zhí)行結(jié)果 - submit(Runnable):
返回一個(gè) Future 對(duì)象,可以知道Runnable 是否執(zhí)行完畢。 - submit(Callable):
Callable 實(shí)例除了它的 call() 方法能夠返回一個(gè)結(jié)果,通過Future可以獲取。 - invokeAny(…):
傳入一系列的 Callable 或者其子接口的實(shí)例對(duì)象,無法保證返回的是哪個(gè) Callable 的結(jié)果 ,只能表明其中一個(gè)已執(zhí)行結(jié)束。 如果其中一個(gè)任務(wù)執(zhí)行結(jié)束(或者拋了一個(gè)異常),其他 Callable 將被取消。 - invokeAll(…):
返回一系列的 Future 對(duì)象,通過它們你可以獲取每個(gè) Callable 的執(zhí)行結(jié)果。
關(guān)閉ExecutorService- shutdown() : 不會(huì)立即關(guān)閉,但它將不再接受新的任務(wù)
- shutdownNow(): 立即關(guān)閉
ThreadPoolExecutor- ThreadPoolExecutor 使用其內(nèi)部池中的線程執(zhí)行給定任務(wù)(Callable 或者 Runnable)。
ScheduledExecutorService(接口,其實(shí)現(xiàn)類為ScheduledThreadPoolExecutor)- ScheduledExecutorService能夠?qū)⑷蝿?wù)延后執(zhí)行,或者間隔固定時(shí)間多次執(zhí)行。
- ScheduledExecutorService中的 任務(wù)由一個(gè)工作者線程異步執(zhí)行,而不是由提交任務(wù)給 ScheduledExecutorService 的那個(gè)線程執(zhí)行。
相關(guān)方法- schedule (Callable task, long delay, TimeUnit timeunit):
Callable 在給定的延遲之后執(zhí)行,并返回結(jié)果。 - schedule (Runnable task, long delay, TimeUnit timeunit)
除了 Runnable 無法返回一個(gè)結(jié)果之外,和第一個(gè)方法類似。 - scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
這一方法規(guī)劃一個(gè)任務(wù)將被定期執(zhí)行。該任務(wù)將會(huì)在首個(gè) initialDelay 之后得到執(zhí)行,然后每個(gè) period 時(shí)間之后重復(fù)執(zhí)行。 period 被解釋為前一個(gè)執(zhí)行的開始和下一個(gè)執(zhí)行的開始之間的間隔時(shí)間。 - scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
和上一個(gè)方法類似,只是period 則被解釋為前一個(gè)執(zhí)行的結(jié)束和下一個(gè)執(zhí)行的結(jié)束之間的間隔。
ForkJoinPoolForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一點(diǎn)不同。ForkJoinPool 讓我們可以很方便地把任務(wù)分裂成幾個(gè)更小的任務(wù),這些分裂出來的任務(wù)也將會(huì)提交給 ForkJoinPool。 用法參考:Java Fork and Join using ForkJoinPool
LockLock 是一個(gè)類似于 synchronized 塊的線程同步機(jī)制。但是 Lock 比 synchronized 塊更加靈活、精細(xì)。 實(shí)現(xiàn)類Lock是一個(gè)接口,其實(shí)現(xiàn)類包括: 示例Lock lock = new ReentrantLock(); lock.lock(); //critical section lock.unlock(); - 調(diào)用
lock() 方法 之后,這個(gè) lock 實(shí)例就被鎖住啦。 - 當(dāng)lock示例被鎖后,任何其他再過來調(diào)用 lock() 方法的線程將會(huì)被阻塞住,直到調(diào)用了unlock() 方法。
- unlock() 被調(diào)用了,lock 對(duì)象解鎖了,其他線程可以對(duì)它進(jìn)行鎖定了。
Lock 和 synchronized區(qū)別- synchronized 代碼塊不能夠保證進(jìn)入訪問等待的線程的先后順序。
- 你不能夠傳遞任何參數(shù)給一個(gè) synchronized 代碼塊的入口。因此,對(duì)于 synchronized 代碼塊的訪問等待設(shè)置超時(shí)時(shí)間是不可能的事情。
- synchronized 塊必須被完整地包含在單個(gè)方法里。而一個(gè) Lock 對(duì)象可以把它的 lock() 和 unlock() 方法的調(diào)用放在不同的方法里。
ReadWriteLock讀寫鎖一種先進(jìn)的線程鎖機(jī)制。 - 允許多個(gè)線程在同一時(shí)間對(duì)某特定資源進(jìn)行讀取,
- 但同一時(shí)間內(nèi)只能有一個(gè)線程對(duì)其進(jìn)行寫入。
實(shí)現(xiàn)類規(guī)則- 如果沒有任何寫操作鎖定,那么可以有多個(gè)讀操作鎖定該鎖
- 如果沒有任何讀操作或者寫操作,只能有一個(gè)寫線程對(duì)該鎖進(jìn)行鎖定。
示例: ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); readWriteLock.readLock().lock(); // multiple readers can enter this section // if not locked for writing, and not writers waiting // to lock for writing. readWriteLock.readLock().unlock(); readWriteLock.writeLock().lock(); // only one writer can enter this section, // and only if no threads are currently reading. readWriteLock.writeLock().unlock(); 更多原子性包裝類位于 atomic 包下,包含一系列原子性變量。 - AtomicBoolean
- AtomicInteger
- AtomicLong
- AtomicReference
…
參考資料: Java 并發(fā)工具包 java.util.concurrent 用戶指南 java.util.concurrency - Java Concurrency Utilities 擴(kuò)展閱讀: 深入淺出 Java Concurrency : 講解的很詳細(xì)
|