小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

rocketmq消峰之流量控制詳解

 HUC王子 2021-09-03

在這里插入圖片描述
rocketMq消費(fèi)端消費(fèi)分?jǐn)?shù)以上三個(gè)步驟:
第一: 消費(fèi)端從rocketMq服務(wù)端pull消息,到本地。
第二: 消費(fèi)端消費(fèi)pull到的消息。
第三: 消費(fèi)消費(fèi)結(jié)束后,回復(fù)Ack到rocketMq,偏移消費(fèi)位置。

代碼:

 /**
     * 測試mq 并發(fā) 接受
     */
    @Component
    @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
    class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{

        @SneakyThrows
        @Override
        public void onMessage(LikeWritingParams params) {
            System.out.println("睡上10s");
            //Thread.sleep(10000);

            long begin = System.currentTimeMillis();
            System.out.println("mq消費(fèi)速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
            //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
            long end = System.currentTimeMillis();
          //  System.out.println("消費(fèi):: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
        }

        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            defaultMQPushConsumer.setConsumeThreadMin(2); //消費(fèi)端拉去到消息以后分配線索去消費(fèi)
            defaultMQPushConsumer.setConsumeThreadMax(10);//最大消費(fèi)線程,一般情況下,默認(rèn)隊(duì)列沒有塞滿,是不會(huì)啟用新的線程的
            defaultMQPushConsumer.setPullInterval(1000);//消費(fèi)端多久一次去rocketMq 拉去消息
            defaultMQPushConsumer.setPullBatchSize(32);     //消費(fèi)端每個(gè)隊(duì)列一次拉去多少個(gè)消息,若該消費(fèi)端分賠了N個(gè)監(jiān)控隊(duì)列,那么消費(fèi)端每次去rocketMq拉去消息說為N*1
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
        }
    }

第一:從上圖分析可得知影響消費(fèi)速度的幾種情況:

1.PullInterval: 設(shè)置消費(fèi)端,拉取mq消息的間隔時(shí)間。
注意:該時(shí)間算起時(shí)間是rocketMq收到Ack后算起。
例如:PullInterval=10s,上次rocketMq服務(wù)收到Ack消息后開始算起10s后,在拉去消息,不包含消費(fèi)端消費(fèi)占用的時(shí)間。

2.PullBatchSize: 設(shè)置每次pull消息的數(shù)量,該參數(shù)設(shè)置是針對邏輯消息隊(duì)列,并不是每次pull消息拉到的總消息數(shù),若上圖右邊消費(fèi)端分配了兩個(gè)消費(fèi)隊(duì)列來監(jiān)聽。那么PullBatchSize 設(shè)置為32,那么該消費(fèi)端每次pull到 64個(gè)消息。

消費(fèi)端每次pull到消息總數(shù)=PullBatchSize*監(jiān)聽隊(duì)列數(shù)

3.ThreadMin和ThreadMax: 消費(fèi)端消費(fèi)pull到的消息需要的線程數(shù)量。

以上三種情況:是針對參數(shù)配置,來調(diào)整消費(fèi)速度。

除了這三種情況外還有兩種服務(wù)部署情況,可以調(diào)整消費(fèi)速度:

4.rocketMq 邏輯消費(fèi)隊(duì)列配置數(shù)量 有消費(fèi)端每次pull到消息總數(shù)=PullBatchSize*監(jiān)聽隊(duì)列數(shù)
可知rocketMq 邏輯消費(fèi)隊(duì)列配置數(shù)量即上圖中的 queue1 ,queue2,配置數(shù)量越多每次pull到的消息總數(shù)也就越多。如果下邊配置讀隊(duì)列數(shù)量:
在這里插入圖片描述

5.消費(fèi)端節(jié)點(diǎn)部署數(shù)量 :部署數(shù)量無論一個(gè)節(jié)點(diǎn)監(jiān)聽所有隊(duì)列,還是多個(gè)節(jié)點(diǎn)按照分配策略分配監(jiān)聽隊(duì)列數(shù)量,理論上每秒pull到的數(shù)量都一樣的,但是多節(jié)點(diǎn)消費(fèi)端消費(fèi)線程數(shù)量要比單節(jié)點(diǎn)消費(fèi)線程數(shù)量多,也就是多節(jié)點(diǎn)消費(fèi)速度大于單節(jié)點(diǎn)。

第二:從上邊分析可得知,控制消費(fèi)速度可以分為并發(fā)和延遲消費(fèi)兩種方案控制流量。

前提:rocketMq 主題topic 配置的消費(fèi)隊(duì)列數(shù)量一定的情況下討論,這里設(shè)置為queueNum=4個(gè)消費(fèi)隊(duì)列
1.并發(fā)控流:
1.1 部署節(jié)點(diǎn)數(shù)量 nodeNum
1.2 每個(gè)節(jié)點(diǎn)消費(fèi)端線程數(shù)量 threadNum
注釋:一般情況最小線程和最大線程設(shè)置為一樣,并且一般情況最大線程不會(huì)用到原因見線程池,觸發(fā)線程次隊(duì)列塞滿才會(huì)啟動(dòng)新的線程,
1.3 每次pull的消息間隔時(shí)間 pullTime
1.4 批量拉去的數(shù)量PullBatchSize

列如上圖配置: 部署節(jié)點(diǎn)數(shù)量為2 ,每個(gè)節(jié)點(diǎn)消費(fèi)端線程數(shù)量為2 ,每次pull的消息間隔時(shí)間為1s,
PullBatchSize 為32

我們先分析一個(gè)節(jié)點(diǎn)每秒拉取消息數(shù)量:

pullTime*PullBatchSize *queueNum=TotalNum

1*32*2=64

此時(shí)具體消費(fèi)速度就看消費(fèi)端線程數(shù)量和接口執(zhí)行時(shí)間來決定64個(gè)消息需要多久消費(fèi)完。

若接口每次執(zhí)行速度為50毫秒,那么1s兩個(gè)線程可以執(zhí)行40個(gè)消息,那么該節(jié)點(diǎn)處理完64個(gè)需要時(shí)間為:64/40 s 大約就是1.6s

若配置消費(fèi)端開啟三個(gè)線程,那么就是每秒消費(fèi)64個(gè)消息時(shí)綽綽有余。

一般情況下pullTime默認(rèn)為0,即消費(fèi)速度完全就有消費(fèi)端線程數(shù)量和接口執(zhí)行速度(其他外在因素不考慮)兩個(gè)因素來決定。

那么我們來分析下pullTime為0的情況:
一個(gè)接口執(zhí)行時(shí)間為50ms,那么1秒一個(gè)線程處理20個(gè)消息,那么開啟10個(gè)線程該節(jié)點(diǎn)每秒能處理200個(gè)線程,開啟100個(gè)線程那就是2000個(gè)消息…

兩個(gè)節(jié)點(diǎn)就是4000個(gè)消息這已經(jīng)超出了mysql能承受的壓力,其實(shí)這里達(dá)到mysql的連接不一定有4000個(gè),畢竟mysql前 邊還有一個(gè)數(shù)據(jù)庫連接池在控制連接數(shù)量間接控制執(zhí)行速度。

每秒處理消息數(shù)量=1s/接口執(zhí)行時(shí)間毫秒*線程數(shù)量。

結(jié)論:pullTime=0 的情況下,并發(fā)控制實(shí)際就是調(diào)整節(jié)點(diǎn)部署數(shù)量和消費(fèi)端消費(fèi)線程數(shù)量,并且要預(yù)估每條消息業(yè)務(wù)處理時(shí)間(預(yù)估接口執(zhí)行時(shí)間)

2.消費(fèi)延時(shí)控流:

針對消息訂閱者的消費(fèi)延時(shí)流控的基本原理是,每次消費(fèi)時(shí)在客戶端增加一個(gè)延時(shí)來控制消費(fèi)速度,此時(shí)理論上消費(fèi)并發(fā)最快速度為:

單節(jié)點(diǎn)部署:
ConsumInterval :延時(shí)時(shí)間單位毫秒
ConcurrentThreadNumber:消費(fèi)端線程數(shù)量
MaxRate :理論每秒處理數(shù)量
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber

如果消息并發(fā)消費(fèi)線程(ConcurrentThreadNumber)為 20,延時(shí)(ConsumInterval)為 100 ms,代入上述公式可得

200 = 1 / 0.1 * 20

由上可知,理論上可以將并發(fā)消費(fèi)控制在 200 以下

如果是多個(gè)節(jié)點(diǎn)部署如兩個(gè)節(jié)點(diǎn),理論消費(fèi)速度最高為每秒處理400個(gè)消息。

如下延時(shí)流控代碼:

 /**
     * 測試mq 并發(fā) 接受
     */
    @Component
    @RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
    class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{

        @SneakyThrows
        @Override
        public void onMessage(LikeWritingParams params) {
            System.out.println("睡上0.1秒");
            Thread.sleep(100);

            long begin = System.currentTimeMillis();
            System.out.println("mq消費(fèi)速度"+Thread.currentThread().getName()+"  "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
            //writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
            long end = System.currentTimeMillis();
          //  System.out.println("消費(fèi):: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
        }

        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            defaultMQPushConsumer.setConsumeThreadMin(20); //消費(fèi)端拉去到消息以后分配線索去消費(fèi)
            defaultMQPushConsumer.setConsumeThreadMax(50);//最大消費(fèi)線程,一般情況下,默認(rèn)隊(duì)列沒有塞滿,是不會(huì)啟用新的線程的
            defaultMQPushConsumer.setPullInterval(0);//消費(fèi)端多久一次去rocketMq 拉去消息
            defaultMQPushConsumer.setPullBatchSize(32);     //消費(fèi)端每個(gè)隊(duì)列一次拉去多少個(gè)消息,若該消費(fèi)端分賠了N個(gè)監(jiān)控隊(duì)列,那么消費(fèi)端每次去rocketMq拉去消息說為N*1
            defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
        }
    }

注釋:如上消費(fèi)端,單節(jié)點(diǎn)每秒處理速度也就是最高200個(gè)消息,實(shí)際上要小于200,業(yè)務(wù)代碼執(zhí)行也是需要時(shí)間。

但是要注意實(shí)際操作中并發(fā)流控實(shí)際是默認(rèn)存在的,
spring boot 消費(fèi)端默認(rèn)配置
this.consumeThreadMin = 20;
this.consumeThreadMax = 20;
this.pullInterval = 0L;
this.pullBatchSize = 32;

若業(yè)務(wù)邏輯執(zhí)行需要20ms,那么單節(jié)點(diǎn)處理速度就是:1/0.02*20=1000
這里默認(rèn)拉去的速度1s內(nèi)遠(yuǎn)大于1000

注意: 這里雖然pullInterval 等于0 當(dāng)時(shí)受限于每次拉去64個(gè),處理完也是需要一端時(shí)間才能回復(fù)ack,才能再次拉取,所以消費(fèi)速度應(yīng)該小于1000

所以并發(fā)流控要消費(fèi)速度大于消費(fèi)延時(shí)流控 ,那么消費(fèi)延時(shí)流控才有意義

總結(jié):rocketMq 肖鋒流控兩種方式:
并發(fā)流控:就是根據(jù)業(yè)務(wù)流控速率要求,來調(diào)整topic 消費(fèi)隊(duì)列數(shù)量(read queue),消費(fèi)端部署節(jié)點(diǎn),消費(fèi)端拉去間隔時(shí)間,消費(fèi)端消費(fèi)線程數(shù)量等,來達(dá)到要求的速率內(nèi)

延時(shí)消費(fèi)流控:就是在消費(fèi)端延時(shí)消費(fèi)消息(sleep),具體延時(shí)多少要根據(jù)業(yè)務(wù)要求速率,和消費(fèi)端線程數(shù)量,和節(jié)點(diǎn)部署數(shù)量來控制

阿里實(shí)例:

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多