小男孩‘自慰网亚洲一区二区,亚洲一级在线播放毛片,亚洲中文字幕av每天更新,黄aⅴ永久免费无码,91成人午夜在线精品,色网站免费在线观看,亚洲欧洲wwwww在线观看

分享

kafka的auto.offset.reset詳解與測試

 vnxy001 2024-05-13 發(fā)布于浙江

1. 取值及定義#

auto.offset.reset有以下三個(gè)可選值:

  • latest (默認(rèn))
  • earliest
  • none

三者均有共同定義:
對(duì)于同一個(gè)消費(fèi)者組,若已有提交的offset,則從提交的offset開始接著消費(fèi)

意思就是,只要這個(gè)消費(fèi)者組消費(fèi)過了,不管auto.offset.reset指定成什么值,效果都一樣,每次啟動(dòng)都是已有的最新的offset開始接著往后消費(fèi)


不同的點(diǎn)為:

  • latest(默認(rèn)):對(duì)于同一個(gè)消費(fèi)者組,若沒有提交過offset,則只消費(fèi)消費(fèi)者連接topic后,新產(chǎn)生的數(shù)據(jù)

就是說如果這個(gè)topic有歷史消息,現(xiàn)在新啟動(dòng)了一個(gè)消費(fèi)者組,且auto.offset.reset=latest,此時(shí)已存在的歷史消息無法消費(fèi)到,那保持消費(fèi)者組運(yùn)行,如果此時(shí)topic有新消息進(jìn)來了,這時(shí)新消息才會(huì)被消費(fèi)到。而一旦有消費(fèi),則必然會(huì)提交offset
這時(shí)候如果該消費(fèi)者組意外下線了,topic仍然有消息進(jìn)來,接著該消費(fèi)者組在后面恢復(fù)上線了,它仍然可以從下線時(shí)的offset處開始接著消費(fèi),此時(shí)走的就是共同定義

  • earliest:對(duì)于同一個(gè)消費(fèi)者組,若沒有提交過offset,則從頭開始消費(fèi)

就是說如果這個(gè)topic有歷史消息存在,現(xiàn)在新啟動(dòng)了一個(gè)消費(fèi)者組,且auto.offset.reset=earliest,那將會(huì)從頭開始消費(fèi),這就是與latest不同之處。
一旦該消費(fèi)者組消費(fèi)過topic后,此時(shí)就有該消費(fèi)者組的offset了,這種情況下即使指定了auto.offset.reset=earliest,再重新啟動(dòng)該消費(fèi)者組,效果是與latest一樣的,也就是此時(shí)走的是共同的定義

  • none:對(duì)于同一個(gè)消費(fèi)者組,若沒有提交過offset,會(huì)拋異常

一般生產(chǎn)環(huán)境基本用不到該參數(shù)


2. 新建全新topic#

./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic TestOffsetResetTopic --partitions 1 --replication-factor 1 --create

3. 往新建的topic發(fā)送消息#

便于測試,用Java代碼發(fā)送5條消息

public class TestProducer {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        String topic = "TestOffsetResetTopic";

        for (int i = 0; i < 5; i++) {
            String value = "message_" + i + "_" + LocalDateTime.now();
            System.out.println("Send value: " + value);
            producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> {
                if (exception == null) {
                    String str = MessageFormat.format("Send success! topic: {0}, partition: {1}, offset: {2}", metadata.topic(), metadata.partition(), metadata.offset());
                    System.out.println(str);
                }
            });
            Thread.sleep(500);
        }

        producer.close();
    }
}

發(fā)送消息成功:

Send value: message_0_2022-09-16T18:26:15.943749600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 0
Send value: message_1_2022-09-16T18:26:17.066608900
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 1
Send value: message_2_2022-09-16T18:26:17.568667200
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 2
Send value: message_3_2022-09-16T18:26:18.069093600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 3
Send value: message_4_2022-09-16T18:26:18.583288100
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 4

現(xiàn)在TestOffsetResetTopic這個(gè)topic有5條消息,且還沒有任何消費(fèi)者組對(duì)其進(jìn)行消費(fèi)過,也就是沒有任何offset


4. 測試latest#

已知topic已經(jīng)存在5條歷史消息,此時(shí)啟動(dòng)一個(gè)消費(fèi)者

public class TestConsumerLatest {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消費(fèi)者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        // 設(shè)置 auto.offset.reset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        String topic = "TestOffsetResetTopic";
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        // 消費(fèi)數(shù)據(jù)
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

    }
}

發(fā)現(xiàn)如上面所述,歷史已存在的5條消息不會(huì)消費(fèi)到,消費(fèi)者沒有任何動(dòng)靜,現(xiàn)在保持消費(fèi)者在線

啟動(dòng)TestProducer再發(fā)5條消息,會(huì)發(fā)現(xiàn)這后面新發(fā)的,offset5開始的消息就被消費(fèi)了

ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1663329725731, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_0_2022-09-16T20:02:05.523581500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1663329726251, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_1_2022-09-16T20:02:06.251399400)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1663329726764, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_2_2022-09-16T20:02:06.764186200)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1663329727264, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_3_2022-09-16T20:02:07.264268500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1663329727778, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_4_2022-09-16T20:02:07.778469700)

此時(shí)該消費(fèi)者組對(duì)于這個(gè)topic的offset已經(jīng)為9了,現(xiàn)在停掉這個(gè)消費(fèi)者(下線),再啟動(dòng)TestProducer發(fā)5條消息,接著再啟動(dòng)TestConsumerLatest,會(huì)發(fā)現(xiàn)緊接上一次的offset之后開始,即從10繼續(xù)消費(fèi)

如果測試發(fā)現(xiàn)沒動(dòng)靜,請(qǐng)多等一會(huì),估計(jì)機(jī)器性能太差...


5. 測試earliest#

新建一個(gè)測試消費(fèi)者,設(shè)置auto.offset.resetearliest,注意groupid為新的group2,表示對(duì)于topic來說是全新的消費(fèi)者組

public class TestConsumerEarliest {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消費(fèi)者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
        // 設(shè)置 auto.offset.reset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        String topic = "TestOffsetResetTopic";
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        // 消費(fèi)數(shù)據(jù)
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

    }
}

一運(yùn)行發(fā)現(xiàn)已有的10條消息(最開始5條加上面一次測試又發(fā)了5條,一共10條)是可以被消費(fèi)到的,且消費(fèi)完后,對(duì)于這個(gè)topic就已經(jīng)有了group2這個(gè)組的offset了,無論之后啟停,只要groupid不變,都會(huì)從最新的offset往后開始消費(fèi)


6. 測試none#

新建一個(gè)測試消費(fèi)者,設(shè)置auto.offset.resetnone,注意groupid為新的group3,表示對(duì)于topic來說是全新的消費(fèi)者組

public class TestConsumerNone {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 指定消費(fèi)者組
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group3");
        // 設(shè)置 auto.offset.reset
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

        String topic = "TestOffsetResetTopic";
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        // 消費(fèi)數(shù)據(jù)
        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

    }
}

一運(yùn)行,程序報(bào)錯(cuò),因?yàn)閷?duì)于topic來說是全新的消費(fèi)者組,且又指定了auto.offset.resetnone,直接拋異常,程序退出

Exception in thread "main" org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [TestOffsetResetTopic-0]
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:706)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2434)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1266)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at kakfa.TestConsumerNone.main(TestConsumerNone.java:31)

7. 總結(jié)#

  • 如果topic已經(jīng)有歷史消息了,又需要消費(fèi)這些歷史消息,則必須要指定一個(gè)從未消費(fèi)過的消費(fèi)者組,同時(shí)指定auto.offset.resetearliest,才可以消費(fèi)到歷史數(shù)據(jù),之后就有提交offset。有了offset,無論是earliest還是latest,效果都是一樣的了。
  • 如果topic沒有歷史消息,或者不需要處理歷史消息,那按照默認(rèn)latest即可。

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多