Monday, June 29, 2020

Kafka Consumer - types - atleast-once , exactly-once

·          At-most-once Kafka Consumer (Zero or More Deliveries)

1)           ’ to true

2)                     small value for

3)                     don’t call  consumer.commitSync()

·          At-least-once Kafka Consumer (One or More Message Deliveries, Duplicate Possible)

1)                     ( to true with high value for

2)                     OR false)

3)                     AND don’t call  consumer.commitSync()

implement ‘idempotent’ behavior within consumer to avoid reprocessing of the duplicate messages

·          Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only One Message Delivery)

1)            = false.

2)                     Don't call consumer.commitSync(); after processing the message.

3)                     Subscribe with listener - in ConsumerRebalanceListener perform,offset); to start reading

4)                     Store the processed message's offset (RDBMS) as a single atomic-transaction. If NoSQL - Store the offset along with the message.

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      for (TopicPartition partition : partitions) {
         offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      for (TopicPartition partition : partitions) {
, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));

·          Exactly-once Kafka Static Consumer via Assign (One and Only One Message Delivery) - consumer registers with Kafka via a ‘assign (2) registration method call.

1)           ’ to false

2)                     don’t call  consumer.commitSync()

3)                     Register consumer to specific partition using ‘assign’ call.

4)                     On start up of the consumer seek to specific message offset by calling,offset);

5)                     Store offset in an atomic way (RDBMS or NoSQL)

6)                     Implement idempotent as an extra safety

TopicPartition topicPartition =
 registerConsumerToSpecificPartition(consumer, topic, partition);
// Read the offset for the topic and partition from external storage.
 long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
// Use seek and go to exact offset for that topic and partition., offset);