1. 取值及定義#
auto.offset.reset有以下三個(gè)可選值:
- latest (默認(rèn))
- earliest
- none
三者均有共同定義:
對(duì)于同一個(gè)消費(fèi)者組,若已有提交的offset,則從提交的offset開始接著消費(fèi)
意思就是,只要這個(gè)消費(fèi)者組消費(fèi)過了,不管auto.offset.reset 指定成什么值,效果都一樣,每次啟動(dòng)都是已有的最新的offset開始接著往后消費(fèi)
不同的點(diǎn)為:
- latest(默認(rèn)):對(duì)于同一個(gè)消費(fèi)者組,若沒有提交過offset,則只消費(fèi)
消費(fèi)者連接topic后,新產(chǎn)生的數(shù)據(jù)
就是說如果這個(gè)topic有歷史消息,現(xiàn)在新啟動(dòng)了一個(gè)消費(fèi)者組,且auto.offset.reset=latest ,此時(shí)已存在的歷史消息無法消費(fèi)到,那保持消費(fèi)者組運(yùn)行,如果此時(shí)topic有新消息進(jìn)來了,這時(shí)新消息才會(huì)被消費(fèi)到。而一旦有消費(fèi),則必然會(huì)提交offset
這時(shí)候如果該消費(fèi)者組 意外下線了,topic仍然有消息進(jìn)來,接著該消費(fèi)者組 在后面恢復(fù)上線了,它仍然可以從下線時(shí)的offset處開始接著消費(fèi),此時(shí)走的就是共同定義
- earliest:對(duì)于同一個(gè)消費(fèi)者組,若沒有提交過offset,則從頭開始消費(fèi)
就是說如果這個(gè)topic有歷史消息存在,現(xiàn)在新啟動(dòng)了一個(gè)消費(fèi)者組,且auto.offset.reset=earliest ,那將會(huì)從頭開始消費(fèi),這就是與latest 不同之處。
一旦該消費(fèi)者組 消費(fèi)過topic后,此時(shí)就有該消費(fèi)者組 的offset了,這種情況下即使指定了auto.offset.reset=earliest ,再重新啟動(dòng)該消費(fèi)者組 ,效果是與latest 一樣的,也就是此時(shí)走的是共同的定義
- none:對(duì)于同一個(gè)消費(fèi)者組,若沒有提交過offset,會(huì)拋異常
一般生產(chǎn)環(huán)境基本用不到該參數(shù)
2. 新建全新topic#
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic TestOffsetResetTopic --partitions 1 --replication-factor 1 --create
3. 往新建的topic發(fā)送消息#
便于測試,用Java代碼發(fā)送5 條消息
public class TestProducer {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String topic = "TestOffsetResetTopic";
for (int i = 0; i < 5; i++) {
String value = "message_" + i + "_" + LocalDateTime.now();
System.out.println("Send value: " + value);
producer.send(new ProducerRecord<>(topic, value), (metadata, exception) -> {
if (exception == null) {
String str = MessageFormat.format("Send success! topic: {0}, partition: {1}, offset: {2}", metadata.topic(), metadata.partition(), metadata.offset());
System.out.println(str);
}
});
Thread.sleep(500);
}
producer.close();
}
}
發(fā)送消息成功:
Send value: message_0_2022-09-16T18:26:15.943749600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 0
Send value: message_1_2022-09-16T18:26:17.066608900
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 1
Send value: message_2_2022-09-16T18:26:17.568667200
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 2
Send value: message_3_2022-09-16T18:26:18.069093600
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 3
Send value: message_4_2022-09-16T18:26:18.583288100
Send success! topic: TestOffsetResetTopic, partition: 0, offset: 4
現(xiàn)在TestOffsetResetTopic 這個(gè)topic有5條消息,且還沒有任何消費(fèi)者組對(duì)其進(jìn)行消費(fèi)過,也就是沒有任何offset
4. 測試latest#
已知topic已經(jīng)存在5條歷史消息,此時(shí)啟動(dòng)一個(gè)消費(fèi)者
public class TestConsumerLatest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 指定消費(fèi)者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
// 設(shè)置 auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
String topic = "TestOffsetResetTopic";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
// 消費(fèi)數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
發(fā)現(xiàn)如上面所述,歷史已存在的5條消息不會(huì)消費(fèi)到,消費(fèi)者沒有任何動(dòng)靜,現(xiàn)在保持消費(fèi)者在線
啟動(dòng)TestProducer 再發(fā)5條消息,會(huì)發(fā)現(xiàn)這后面新發(fā)的,offset 從5 開始的消息就被消費(fèi)了
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1663329725731, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_0_2022-09-16T20:02:05.523581500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 6, CreateTime = 1663329726251, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_1_2022-09-16T20:02:06.251399400)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 7, CreateTime = 1663329726764, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_2_2022-09-16T20:02:06.764186200)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 8, CreateTime = 1663329727264, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_3_2022-09-16T20:02:07.264268500)
ConsumerRecord(topic = TestOffsetResetTopic, partition = 0, leaderEpoch = 0, offset = 9, CreateTime = 1663329727778, serialized key size = -1, serialized value size = 39, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = message_4_2022-09-16T20:02:07.778469700)
此時(shí)該消費(fèi)者組對(duì)于這個(gè)topic的offset 已經(jīng)為9 了,現(xiàn)在停掉這個(gè)消費(fèi)者(下線),再啟動(dòng)TestProducer 發(fā)5條消息,接著再啟動(dòng)TestConsumerLatest ,會(huì)發(fā)現(xiàn)緊接上一次的offset之后開始,即從10 繼續(xù)消費(fèi)
如果測試發(fā)現(xiàn)沒動(dòng)靜,請(qǐng)多等一會(huì),估計(jì)機(jī)器性能太差...
5. 測試earliest#
新建一個(gè)測試消費(fèi)者,設(shè)置auto.offset.reset 為earliest ,注意groupid 為新的group2 ,表示對(duì)于topic來說是全新的消費(fèi)者組
public class TestConsumerEarliest {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 指定消費(fèi)者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");
// 設(shè)置 auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
String topic = "TestOffsetResetTopic";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
// 消費(fèi)數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
一運(yùn)行發(fā)現(xiàn)已有的10條消息(最開始5條加上面一次測試又發(fā)了5條,一共10條)是可以被消費(fèi)到的,且消費(fèi)完后,對(duì)于這個(gè)topic就已經(jīng)有了group2 這個(gè)組的offset 了,無論之后啟停,只要groupid 不變,都會(huì)從最新的offset 往后開始消費(fèi)
6. 測試none#
新建一個(gè)測試消費(fèi)者,設(shè)置auto.offset.reset 為none ,注意groupid 為新的group3 ,表示對(duì)于topic來說是全新的消費(fèi)者組
public class TestConsumerNone {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.123.124:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 指定消費(fèi)者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group3");
// 設(shè)置 auto.offset.reset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
String topic = "TestOffsetResetTopic";
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(topic));
// 消費(fèi)數(shù)據(jù)
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
一運(yùn)行,程序報(bào)錯(cuò),因?yàn)閷?duì)于topic來說是全新的消費(fèi)者組,且又指定了auto.offset.reset 為none ,直接拋異常,程序退出
Exception in thread "main" org.apache.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partitions: [TestOffsetResetTopic-0]
at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetInitializingPositions(SubscriptionState.java:706)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2434)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1266)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at kakfa.TestConsumerNone.main(TestConsumerNone.java:31)
7. 總結(jié)#
- 如果
topic 已經(jīng)有歷史消息了,又需要消費(fèi)這些歷史消息,則必須要指定一個(gè)從未消費(fèi)過的消費(fèi)者組 ,同時(shí)指定auto.offset.reset 為earliest ,才可以消費(fèi)到歷史數(shù)據(jù),之后就有提交offset 。有了offset ,無論是earliest 還是latest ,效果都是一樣的了。
- 如果
topic 沒有歷史消息,或者不需要處理歷史消息,那按照默認(rèn)latest 即可。
|