·
At-most-once Kafka Consumer (Zero or More Deliveries)
1)
enable.auto.commit’ to true
2)
small value for
auto.commit.interval.ms
3)
don’t call consumer.commitSync()
·
At-least-once Kafka Consumer (One or More Message Deliveries,
Duplicate Possible)
1)
(enable.auto.commit to true with
high value for auto.commit.interval.ms
2)
OR enable.auto.commit false)
3)
AND don’t call consumer.commitSync()
implement ‘idempotent’ behavior within consumer to avoid reprocessing of
the duplicate messages
·
Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only
One Message Delivery)
1)
enable.auto.commit = false.
2)
Don't call consumer.commitSync();
after processing the message.
3)
Subscribe with listener - in
ConsumerRebalanceListener perform consumer.seek(topicPartition,offset); to
start reading
4)
Store the processed message's
offset (RDBMS) as a single atomic-transaction. If NoSQL - Store the offset
along with the message.
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
·
Exactly-once Kafka Static Consumer via Assign (One
and Only One Message Delivery) - consumer registers with Kafka via a ‘assign (2) registration method call.
1)
enable.auto.commit’ to false
2)
don’t call consumer.commitSync()
3)
Register consumer to specific
partition using ‘assign’ call.
4)
On start up of the consumer seek
to specific message offset by calling consumer.seek(topicPartition,offset);
5)
Store offset in an atomic way
(RDBMS or NoSQL)
6) Implement idempotent as an extra safety
TopicPartition topicPartition =
registerConsumerToSpecificPartition(consumer, topic, partition);
// Read the offset for the topic and partition from external storage.
long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
// Use seek and go to exact offset for that topic and partition.
consumer.seek(topicPartition, offset);
processRecords(consumer);