rocketMq消費(fèi)端消費(fèi)分?jǐn)?shù)以上三個(gè)步驟: 第一: 消費(fèi)端從rocketMq服務(wù)端pull消息,到本地。 第二: 消費(fèi)端消費(fèi)pull到的消息。 第三: 消費(fèi)消費(fèi)結(jié)束后,回復(fù)Ack到rocketMq,偏移消費(fèi)位置。
代碼:
/**
* 測試mq 并發(fā) 接受
*/
@Component
@RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
@SneakyThrows
@Override
public void onMessage(LikeWritingParams params) {
System.out.println("睡上10s");
//Thread.sleep(10000);
long begin = System.currentTimeMillis();
System.out.println("mq消費(fèi)速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
//writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
long end = System.currentTimeMillis();
// System.out.println("消費(fèi):: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
defaultMQPushConsumer.setConsumeThreadMin(2); //消費(fèi)端拉去到消息以后分配線索去消費(fèi)
defaultMQPushConsumer.setConsumeThreadMax(10);//最大消費(fèi)線程,一般情況下,默認(rèn)隊(duì)列沒有塞滿,是不會(huì)啟用新的線程的
defaultMQPushConsumer.setPullInterval(1000);//消費(fèi)端多久一次去rocketMq 拉去消息
defaultMQPushConsumer.setPullBatchSize(32); //消費(fèi)端每個(gè)隊(duì)列一次拉去多少個(gè)消息,若該消費(fèi)端分賠了N個(gè)監(jiān)控隊(duì)列,那么消費(fèi)端每次去rocketMq拉去消息說為N*1
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
}
}
第一:從上圖分析可得知影響消費(fèi)速度的幾種情況:
1.PullInterval: 設(shè)置消費(fèi)端,拉取mq消息的間隔時(shí)間。 注意:該時(shí)間算起時(shí)間是rocketMq收到Ack后算起。 例如:PullInterval=10s,上次rocketMq服務(wù)收到Ack消息后開始算起10s后,在拉去消息,不包含消費(fèi)端消費(fèi)占用的時(shí)間。
2.PullBatchSize: 設(shè)置每次pull消息的數(shù)量,該參數(shù)設(shè)置是針對邏輯消息隊(duì)列,并不是每次pull消息拉到的總消息數(shù),若上圖右邊消費(fèi)端分配了兩個(gè)消費(fèi)隊(duì)列來監(jiān)聽。那么PullBatchSize 設(shè)置為32,那么該消費(fèi)端每次pull到 64個(gè)消息。
消費(fèi)端每次pull到消息總數(shù)=PullBatchSize*監(jiān)聽隊(duì)列數(shù)
3.ThreadMin和ThreadMax: 消費(fèi)端消費(fèi)pull到的消息需要的線程數(shù)量。
以上三種情況:是針對參數(shù)配置,來調(diào)整消費(fèi)速度。
除了這三種情況外還有兩種服務(wù)部署情況,可以調(diào)整消費(fèi)速度:
4.rocketMq 邏輯消費(fèi)隊(duì)列配置數(shù)量 有消費(fèi)端每次pull到消息總數(shù)=PullBatchSize*監(jiān)聽隊(duì)列數(shù) 可知rocketMq 邏輯消費(fèi)隊(duì)列配置數(shù)量即上圖中的 queue1 ,queue2,配置數(shù)量越多每次pull到的消息總數(shù)也就越多。如果下邊配置讀隊(duì)列數(shù)量:
5.消費(fèi)端節(jié)點(diǎn)部署數(shù)量 :部署數(shù)量無論一個(gè)節(jié)點(diǎn)監(jiān)聽所有隊(duì)列,還是多個(gè)節(jié)點(diǎn)按照分配策略分配監(jiān)聽隊(duì)列數(shù)量,理論上每秒pull到的數(shù)量都一樣的,但是多節(jié)點(diǎn)消費(fèi)端消費(fèi)線程數(shù)量要比單節(jié)點(diǎn)消費(fèi)線程數(shù)量多,也就是多節(jié)點(diǎn)消費(fèi)速度大于單節(jié)點(diǎn)。
第二:從上邊分析可得知,控制消費(fèi)速度可以分為并發(fā)和延遲消費(fèi)兩種方案控制流量。
前提:rocketMq 主題topic 配置的消費(fèi)隊(duì)列數(shù)量一定的情況下討論,這里設(shè)置為queueNum=4個(gè)消費(fèi)隊(duì)列 1.并發(fā)控流: 1.1 部署節(jié)點(diǎn)數(shù)量 nodeNum 1.2 每個(gè)節(jié)點(diǎn)消費(fèi)端線程數(shù)量 threadNum 注釋:一般情況最小線程和最大線程設(shè)置為一樣,并且一般情況最大線程不會(huì)用到原因見線程池,觸發(fā)線程次隊(duì)列塞滿才會(huì)啟動(dòng)新的線程, 1.3 每次pull的消息間隔時(shí)間 pullTime 1.4 批量拉去的數(shù)量PullBatchSize
列如上圖配置: 部署節(jié)點(diǎn)數(shù)量為2 ,每個(gè)節(jié)點(diǎn)消費(fèi)端線程數(shù)量為2 ,每次pull的消息間隔時(shí)間為1s, PullBatchSize 為32
我們先分析一個(gè)節(jié)點(diǎn)每秒拉取消息數(shù)量:
pullTime*PullBatchSize *queueNum=TotalNum
1*32*2=64
此時(shí)具體消費(fèi)速度就看消費(fèi)端線程數(shù)量和接口執(zhí)行時(shí)間來決定64個(gè)消息需要多久消費(fèi)完。
若接口每次執(zhí)行速度為50毫秒,那么1s兩個(gè)線程可以執(zhí)行40個(gè)消息,那么該節(jié)點(diǎn)處理完64個(gè)需要時(shí)間為:64/40 s 大約就是1.6s
若配置消費(fèi)端開啟三個(gè)線程,那么就是每秒消費(fèi)64個(gè)消息時(shí)綽綽有余。
一般情況下pullTime默認(rèn)為0,即消費(fèi)速度完全就有消費(fèi)端線程數(shù)量和接口執(zhí)行速度(其他外在因素不考慮)兩個(gè)因素來決定。
那么我們來分析下pullTime為0的情況: 一個(gè)接口執(zhí)行時(shí)間為50ms,那么1秒一個(gè)線程處理20個(gè)消息,那么開啟10個(gè)線程該節(jié)點(diǎn)每秒能處理200個(gè)線程,開啟100個(gè)線程那就是2000個(gè)消息…
兩個(gè)節(jié)點(diǎn)就是4000個(gè)消息這已經(jīng)超出了mysql能承受的壓力,其實(shí)這里達(dá)到mysql的連接不一定有4000個(gè),畢竟mysql前 邊還有一個(gè)數(shù)據(jù)庫連接池在控制連接數(shù)量間接控制執(zhí)行速度。
每秒處理消息數(shù)量=1s/接口執(zhí)行時(shí)間毫秒*線程數(shù)量。
結(jié)論:pullTime=0 的情況下,并發(fā)控制實(shí)際就是調(diào)整節(jié)點(diǎn)部署數(shù)量和消費(fèi)端消費(fèi)線程數(shù)量,并且要預(yù)估每條消息業(yè)務(wù)處理時(shí)間(預(yù)估接口執(zhí)行時(shí)間)
2.消費(fèi)延時(shí)控流:
針對消息訂閱者的消費(fèi)延時(shí)流控的基本原理是,每次消費(fèi)時(shí)在客戶端增加一個(gè)延時(shí)來控制消費(fèi)速度,此時(shí)理論上消費(fèi)并發(fā)最快速度為:
單節(jié)點(diǎn)部署: ConsumInterval :延時(shí)時(shí)間單位毫秒 ConcurrentThreadNumber:消費(fèi)端線程數(shù)量 MaxRate :理論每秒處理數(shù)量 MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果消息并發(fā)消費(fèi)線程(ConcurrentThreadNumber)為 20,延時(shí)(ConsumInterval)為 100 ms,代入上述公式可得
200 = 1 / 0.1 * 20
由上可知,理論上可以將并發(fā)消費(fèi)控制在 200 以下
如果是多個(gè)節(jié)點(diǎn)部署如兩個(gè)節(jié)點(diǎn),理論消費(fèi)速度最高為每秒處理400個(gè)消息。
如下延時(shí)流控代碼:
/**
* 測試mq 并發(fā) 接受
*/
@Component
@RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
@SneakyThrows
@Override
public void onMessage(LikeWritingParams params) {
System.out.println("睡上0.1秒");
Thread.sleep(100);
long begin = System.currentTimeMillis();
System.out.println("mq消費(fèi)速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
//writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
long end = System.currentTimeMillis();
// System.out.println("消費(fèi):: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
defaultMQPushConsumer.setConsumeThreadMin(20); //消費(fèi)端拉去到消息以后分配線索去消費(fèi)
defaultMQPushConsumer.setConsumeThreadMax(50);//最大消費(fèi)線程,一般情況下,默認(rèn)隊(duì)列沒有塞滿,是不會(huì)啟用新的線程的
defaultMQPushConsumer.setPullInterval(0);//消費(fèi)端多久一次去rocketMq 拉去消息
defaultMQPushConsumer.setPullBatchSize(32); //消費(fèi)端每個(gè)隊(duì)列一次拉去多少個(gè)消息,若該消費(fèi)端分賠了N個(gè)監(jiān)控隊(duì)列,那么消費(fèi)端每次去rocketMq拉去消息說為N*1
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
}
}
注釋:如上消費(fèi)端,單節(jié)點(diǎn)每秒處理速度也就是最高200個(gè)消息,實(shí)際上要小于200,業(yè)務(wù)代碼執(zhí)行也是需要時(shí)間。
但是要注意實(shí)際操作中并發(fā)流控實(shí)際是默認(rèn)存在的, spring boot 消費(fèi)端默認(rèn)配置 this.consumeThreadMin = 20; this.consumeThreadMax = 20; this.pullInterval = 0L; this.pullBatchSize = 32;
若業(yè)務(wù)邏輯執(zhí)行需要20ms,那么單節(jié)點(diǎn)處理速度就是:1/0.02*20=1000 這里默認(rèn)拉去的速度1s內(nèi)遠(yuǎn)大于1000
注意: 這里雖然pullInterval 等于0 當(dāng)時(shí)受限于每次拉去64個(gè),處理完也是需要一端時(shí)間才能回復(fù)ack,才能再次拉取,所以消費(fèi)速度應(yīng)該小于1000
所以并發(fā)流控要消費(fèi)速度大于消費(fèi)延時(shí)流控 ,那么消費(fèi)延時(shí)流控才有意義
總結(jié):rocketMq 肖鋒流控兩種方式: 并發(fā)流控:就是根據(jù)業(yè)務(wù)流控速率要求,來調(diào)整topic 消費(fèi)隊(duì)列數(shù)量(read queue),消費(fèi)端部署節(jié)點(diǎn),消費(fèi)端拉去間隔時(shí)間,消費(fèi)端消費(fèi)線程數(shù)量等,來達(dá)到要求的速率內(nèi)
延時(shí)消費(fèi)流控:就是在消費(fèi)端延時(shí)消費(fèi)消息(sleep),具體延時(shí)多少要根據(jù)業(yè)務(wù)要求速率,和消費(fèi)端線程數(shù)量,和節(jié)點(diǎn)部署數(shù)量來控制
阿里實(shí)例:
|