小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

基于redis的分布式鎖實現(xiàn) | Aoho''s Blog

 Gtwo 2018-08-30

關于分布式鎖

很久之前有講過并發(fā)編程中的鎖并發(fā)編程的鎖機制:synchronized和lock。在單進程的系統(tǒng)中,當存在多個線程可以同時改變某個變量時,就需要對變量或代碼塊做同步,使其在修改這種變量時能夠線性執(zhí)行消除并發(fā)修改變量。而同步的本質是通過鎖來實現(xiàn)的。為了實現(xiàn)多個線程在一個時刻同一個代碼塊只能有一個線程可執(zhí)行,那么需要在某個地方做個標記,這個標記必須每個線程都能看到,當標記不存在時可以設置該標記,其余后續(xù)線程發(fā)現(xiàn)已經(jīng)有標記了則等待擁有標記的線程結束同步代碼塊取消標記后再去嘗試設置標記。

分布式環(huán)境下,數(shù)據(jù)一致性問題一直是一個比較重要的話題,而又不同于單進程的情況。分布式與單機情況下最大的不同在于其不是多線程而是多進程。多線程由于可以共享堆內存,因此可以簡單的采取內存作為標記存儲位置。而進程之間甚至可能都不在同一臺物理機上,因此需要將標記存儲在一個所有進程都能看到的地方。

常見的是秒殺場景,訂單服務部署了多個實例。如秒殺商品有4個,第一個用戶購買3個,第二個用戶購買2個,理想狀態(tài)下第一個用戶能購買成功,第二個用戶提示購買失敗,反之亦可。而實際可能出現(xiàn)的情況是,兩個用戶都得到庫存為4,第一個用戶買到了3個,更新庫存之前,第二個用戶下了2個商品的訂單,更新庫存為2,導致出錯。

在上面的場景中,商品的庫存是共享變量,面對高并發(fā)情形,需要保證對資源的訪問互斥。在單機環(huán)境中,Java中其實提供了很多并發(fā)處理相關的API,但是這些API在分布式場景中就無能為力了。也就是說單純的Java Api并不能提供分布式鎖的能力。分布式系統(tǒng)中,由于分布式系統(tǒng)的分布性,即多線程和多進程并且分布在不同機器中,synchronized和lock這兩種鎖將失去原有鎖的效果,需要我們自己實現(xiàn)分布式鎖。

常見的鎖方案如下:

  • 基于數(shù)據(jù)庫實現(xiàn)分布式鎖
  • 基于緩存,實現(xiàn)分布式鎖,如redis
  • 基于Zookeeper實現(xiàn)分布式鎖

下面我們簡單介紹下這幾種鎖的實現(xiàn)。

基于數(shù)據(jù)庫

基于數(shù)據(jù)庫的鎖實現(xiàn)也有兩種方式,一是基于數(shù)據(jù)庫表,另一種是基于數(shù)據(jù)庫排他鎖。

基于數(shù)據(jù)庫表的增刪

基于數(shù)據(jù)庫表增刪是最簡單的方式,首先創(chuàng)建一張鎖的表主要包含下列字段:方法名,時間戳等字段。

具體使用的方法,當需要鎖住某個方法時,往該表中插入一條相關的記錄。這邊需要注意,方法名是有唯一性約束的,如果有多個請求同時提交到數(shù)據(jù)庫的話,數(shù)據(jù)庫會保證只有一個操作可以成功,那么我們就可以認為操作成功的那個線程獲得了該方法的鎖,可以執(zhí)行方法體內容。

執(zhí)行完畢,需要delete該記錄。

當然,筆者這邊只是簡單介紹一下。對于上述方案可以進行優(yōu)化,如應用主從數(shù)據(jù)庫,數(shù)據(jù)之間雙向同步。一旦掛掉快速切換到備庫上;做一個定時任務,每隔一定時間把數(shù)據(jù)庫中的超時數(shù)據(jù)清理一遍;使用while循環(huán),直到insert成功再返回成功,雖然并不推薦這樣做;還可以記錄當前獲得鎖的機器的主機信息和線程信息,那么下次再獲取鎖的時候先查詢數(shù)據(jù)庫,如果當前機器的主機信息和線程信息在數(shù)據(jù)庫可以查到的話,直接把鎖分配給他就可以了,實現(xiàn)可重入鎖。

基于數(shù)據(jù)庫排他鎖

我們還可以通過數(shù)據(jù)庫的排他鎖來實現(xiàn)分布式鎖。基于MySql的InnoDB引擎,可以使用以下方法來實現(xiàn)加鎖操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void lock(){
connection.setAutoCommit(false)
int count = 0;
while(count < 4){
try{
select * from lock where lock_name=xxx for update;
if(結果不為空){
//代表獲取到鎖
return;
}
}catch(Exception e){
}
//為空或者拋異常的話都表示沒有獲取到鎖
sleep(1000);
count++;
}
throw new LockException();
}

在查詢語句后面增加for update,數(shù)據(jù)庫會在查詢過程中給數(shù)據(jù)庫表增加排他鎖。當某條記錄被加上排他鎖之后,其他線程無法再在該行記錄上增加排他鎖。其他沒有獲取到鎖的就會阻塞在上述select語句上,可能的結果有2種,在超時之前獲取到了鎖,在超時之前仍未獲取到鎖。

獲得排它鎖的線程即可獲得分布式鎖,當獲取到鎖之后,可以執(zhí)行方法的業(yè)務邏輯,執(zhí)行完方法之后,釋放鎖connection.commit()

存在的問題主要是性能不高和sql超時的異常。

基于數(shù)據(jù)庫鎖的優(yōu)缺點

上面兩種方式都是依賴數(shù)據(jù)庫的一張表,一種是通過表中的記錄的存在情況確定當前是否有鎖存在,另外一種是通過數(shù)據(jù)庫的排他鎖來實現(xiàn)分布式鎖。

  • 優(yōu)點是直接借助數(shù)據(jù)庫,簡單容易理解。
  • 缺點是操作數(shù)據(jù)庫需要一定的開銷,性能問題需要考慮。

基于Zookeeper

基于zookeeper臨時有序節(jié)點可以實現(xiàn)的分布式鎖。每個客戶端對某個方法加鎖時,在zookeeper上的與該方法對應的指定節(jié)點的目錄下,生成一個唯一的瞬時有序節(jié)點。 判斷是否獲取鎖的方式很簡單,只需要判斷有序節(jié)點中序號最小的一個。 當釋放鎖的時候,只需將這個瞬時節(jié)點刪除即可。同時,其可以避免服務宕機導致的鎖無法釋放,而產(chǎn)生的死鎖問題。

提供的第三方庫有curator,具體使用讀者可以自行去看一下。Curator提供的InterProcessMutex是分布式鎖的實現(xiàn)。acquire方法獲取鎖,release方法釋放鎖。另外,鎖釋放、阻塞鎖、可重入鎖等問題都可以有有效解決。講下阻塞鎖的實現(xiàn),客戶端可以通過在ZK中創(chuàng)建順序節(jié)點,并且在節(jié)點上綁定監(jiān)聽器,一旦節(jié)點有變化,Zookeeper會通知客戶端,客戶端可以檢查自己創(chuàng)建的節(jié)點是不是當前所有節(jié)點中序號最小的,如果是就獲取到鎖,便可以執(zhí)行業(yè)務邏輯。

最后,Zookeeper實現(xiàn)的分布式鎖其實存在一個缺點,那就是性能上可能并沒有緩存服務那么高。因為每次在創(chuàng)建鎖和釋放鎖的過程中,都要動態(tài)創(chuàng)建、銷毀瞬時節(jié)點來實現(xiàn)鎖功能。ZK中創(chuàng)建和刪除節(jié)點只能通過Leader服務器來執(zhí)行,然后將數(shù)據(jù)同不到所有的Follower機器上。并發(fā)問題,可能存在網(wǎng)絡抖動,客戶端和ZK集群的session連接斷了,zk集群以為客戶端掛了,就會刪除臨時節(jié)點,這時候其他客戶端就可以獲取到分布式鎖了。

基于緩存

相對于基于數(shù)據(jù)庫實現(xiàn)分布式鎖的方案來說,基于緩存來實現(xiàn)在性能方面會表現(xiàn)的更好一點,存取速度快很多。而且很多緩存是可以集群部署的,可以解決單點問題。基于緩存的鎖有好幾種,如memcached、redis、本文下面主要講解基于redis的分布式實現(xiàn)。

基于redis的分布式鎖實現(xiàn)

SETNX

使用redis的SETNX實現(xiàn)分布式鎖,多個進程執(zhí)行以下Redis命令:

1
SETNX lock.id <current Unix time + lock timeout + 1>

SETNX是將 key 的值設為 value,當且僅當 key 不存在。若給定的 key 已經(jīng)存在,則 SETNX 不做任何動作。

  • 返回1,說明該進程獲得鎖,SETNX將鍵 lock.id 的值設置為鎖的超時時間,當前時間 +加上鎖的有效時間。
  • 返回0,說明其他進程已經(jīng)獲得了鎖,進程不能進入臨界區(qū)。進程可以在一個循環(huán)中不斷地嘗試 SETNX 操作,以獲得鎖。

存在死鎖的問題

SETNX實現(xiàn)分布式鎖,可能會存在死鎖的情況。與單機模式下的鎖相比,分布式環(huán)境下不僅需要保證進程可見,還需要考慮進程與鎖之間的網(wǎng)絡問題。某個線程獲取了鎖之后,斷開了與Redis 的連接,鎖沒有及時釋放,競爭該鎖的其他線程都會hung,產(chǎn)生死鎖的情況。

在使用 SETNX 獲得鎖時,我們將鍵 lock.id 的值設置為鎖的有效時間,線程獲得鎖后,其他線程還會不斷的檢測鎖是否已超時,如果超時,等待的線程也將有機會獲得鎖。然而,鎖超時,我們不能簡單地使用 DEL 命令刪除鍵 lock.id 以釋放鎖。

考慮以下情況:

  1. A已經(jīng)首先獲得了鎖 lock.id,然后線A斷線。B,C都在等待競爭該鎖;
  2. B,C讀取lock.id的值,比較當前時間和鍵 lock.id 的值來判斷是否超時,發(fā)現(xiàn)超時;
  3. B執(zhí)行 DEL lock.id命令,并執(zhí)行 SETNX lock.id 命令,并返回1,B獲得鎖;
  4. C由于各剛剛檢測到鎖已超時,執(zhí)行 DEL lock.id命令,將B剛剛設置的鍵 lock.id 刪除,執(zhí)行 SETNX lock.id命令,并返回1,即C獲得鎖。

上面的步驟很明顯出現(xiàn)了問題,導致B,C同時獲取了鎖。在檢測到鎖超時后,線程不能直接簡單地執(zhí)行 DEL 刪除鍵的操作以獲得鎖。

對于上面的步驟進行改進,問題是出在刪除鍵的操作上面,那么獲取鎖之后應該怎么改進呢?
首先看一下redis的GETSET這個操作,GETSET key value,將給定 key 的值設為 value ,并返回 key 的舊值(old value)。利用這個操作指令,我們改進一下上述的步驟。

  1. A已經(jīng)首先獲得了鎖 lock.id,然后線A斷線。B,C都在等待競爭該鎖;
  2. B,C讀取lock.id的值,比較當前時間和鍵 lock.id 的值來判斷是否超時,發(fā)現(xiàn)超時;
  3. B檢測到鎖已超時,即當前的時間大于鍵 lock.id 的值,B會執(zhí)行
    GETSET lock.id <current Unix timestamp + lock timeout + 1>設置時間戳,通過比較鍵 lock.id 的舊值是否小于當前時間,判斷進程是否已獲得鎖;
  4. B發(fā)現(xiàn)GETSET返回的值小于當前時間,則執(zhí)行 DEL lock.id命令,并執(zhí)行 SETNX lock.id 命令,并返回1,B獲得鎖;
  5. C執(zhí)行GETSET得到的時間大于當前時間,則繼續(xù)等待。

在線程釋放鎖,即執(zhí)行 DEL lock.id 操作前,需要先判斷鎖是否已超時。如果鎖已超時,那么鎖可能已由其他線程獲得,這時直接執(zhí)行 DEL lock.id 操作會導致把其他線程已獲得的鎖釋放掉。

一種實現(xiàn)方式

獲取鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {
acquireTimeout = timeUnit.toMillis(acquireTimeout);
long acquireTime = acquireTimeout + System.currentTimeMillis();
//使用J.U.C的ReentrantLock
threadLock.tryLock(acquireTimeout, timeUnit);
try {
//循環(huán)嘗試
while (true) {
//調用tryLock
boolean hasLock = tryLock();
if (hasLock) {
//獲取鎖成功
return true;
} else if (acquireTime < System.currentTimeMillis()) {
break;
}
Thread.sleep(sleepTime);
}
} finally {
if (threadLock.isHeldByCurrentThread()) {
threadLock.unlock();
}
}
return false;
}
public boolean tryLock() {
long currentTime = System.currentTimeMillis();
String expires = String.valueOf(timeout + currentTime);
//設置互斥量
if (redisHelper.setNx(mutex, expires) > 0) {
//獲取鎖,設置超時時間
setLockStatus(expires);
return true;
} else {
String currentLockTime = redisUtil.get(mutex);
//檢查鎖是否超時
if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {
//獲取舊的鎖時間并設置互斥量
String oldLockTime = redisHelper.getSet(mutex, expires);
//舊值與當前時間比較
if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {
//獲取鎖,設置超時時間
setLockStatus(expires);
return true;
}
}
return false;
}
}

lock調用tryLock方法,參數(shù)為獲取的超時時間與單位,線程在超時時間內,獲取鎖操作將自旋在那里,直到該自旋鎖的保持者釋放了鎖。

tryLock方法中,主要邏輯如下:

  • setnx(lockkey, 當前時間+過期超時時間) ,如果返回1,則獲取鎖成功;如果返回0則沒有獲取到鎖
  • get(lockkey)獲取值oldExpireTime ,并將這個value值與當前的系統(tǒng)時間進行比較,如果小于當前系統(tǒng)時間,則認為這個鎖已經(jīng)超時,可以允許別的請求重新獲取
  • 計算newExpireTime=當前時間+過期超時時間,然后getset(lockkey, newExpireTime) 會返回當前l(fā)ockkey的值currentExpireTime
  • 判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明當前getset設置成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那么當前請求可以直接返回失敗,或者繼續(xù)重試

釋放鎖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean unlock() {
//只有鎖的持有線程才能解鎖
if (lockHolder == Thread.currentThread()) {
//判斷鎖是否超時,沒有超時才將互斥量刪除
if (lockExpiresTime > System.currentTimeMillis()) {
redisHelper.del(mutex);
logger.info("刪除互斥量[{}]", mutex);
}
lockHolder = null;
logger.info("釋放[{}]鎖成功", mutex);
return true;
} else {
throw new IllegalMonitorStateException("沒有獲取到鎖的線程無法執(zhí)行解鎖操作");
}
}

在上面獲取鎖的實現(xiàn)下,其實此處的釋放鎖函數(shù)可以不需要了,有興趣的讀者可以結合上面的代碼看下為什么?有想法可以留言哦!

總結

本文主要講解了基于redis分布式鎖的實現(xiàn),在分布式環(huán)境下,數(shù)據(jù)一致性問題一直是一個比較重要的話題,而synchronized和lock鎖在分布式環(huán)境已經(jīng)失去了作用。常見的鎖的方案有基于數(shù)據(jù)庫實現(xiàn)分布式鎖、基于緩存實現(xiàn)分布式鎖、基于Zookeeper實現(xiàn)分布式鎖,簡單介紹了每種鎖的實現(xiàn)特點;然后,文中探索了一下redis鎖的實現(xiàn)方案;最后,本文給出了基于Java實現(xiàn)的redis分布式鎖,讀者可以自行驗證一下。

    本站是提供個人知識管理的網(wǎng)絡存儲空間,所有內容均由用戶發(fā)布,不代表本站觀點。請注意甄別內容中的聯(lián)系方式、誘導購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權內容,請點擊一鍵舉報。
    轉藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多