Monday, June 29, 2020

Kafka Consumer - types - atleast-once , exactly-once



·          At-most-once Kafka Consumer (Zero or More Deliveries)

1)                     enable.auto.commit’ to true

2)                     small value for auto.commit.interval.ms

3)                     don’t call  consumer.commitSync()

·          At-least-once Kafka Consumer (One or More Message Deliveries, Duplicate Possible)

1)                     (enable.auto.commit to true with high value for auto.commit.interval.ms

2)                     OR enable.auto.commit false)

3)                     AND don’t call  consumer.commitSync()

implement ‘idempotent’ behavior within consumer to avoid reprocessing of the duplicate messages

·          Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only One Message Delivery)

1)                     enable.auto.commit = false.

2)                     Don't call consumer.commitSync(); after processing the message.

3)                     Subscribe with listener - in ConsumerRebalanceListener perform consumer.seek(topicPartition,offset); to start reading

4)                     Store the processed message's offset (RDBMS) as a single atomic-transaction. If NoSQL - Store the offset along with the message.

public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      for (TopicPartition partition : partitions) {
         offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
                }
        }
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
      for (TopicPartition partition : partitions) {
          consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
                }
        }

·          Exactly-once Kafka Static Consumer via Assign (One and Only One Message Delivery) - consumer registers with Kafka via a ‘assign (2) registration method call.

1)                     enable.auto.commit’ to false

2)                     don’t call  consumer.commitSync()

3)                     Register consumer to specific partition using ‘assign’ call.

4)                     On start up of the consumer seek to specific message offset by calling consumer.seek(topicPartition,offset);

5)                     Store offset in an atomic way (RDBMS or NoSQL)

6)                     Implement idempotent as an extra safety

TopicPartition topicPartition =
 registerConsumerToSpecificPartition(consumer, topic, partition);
// Read the offset for the topic and partition from external storage.
 long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
// Use seek and go to exact offset for that topic and partition.
 consumer.seek(topicPartition, offset);
 processRecords(consumer);