Monday, May 18, 2020

Kafka


Basic Kafka Setup guide

### Run your local zookeeper and Kafka cluster from command line ###
#Note1: open separate commandlines to run each command
#Note2: make sure that Kafka is installed on C drive to reduce path/classpath length.


#Run Zookeeper

cd c:/kafka

zkserver

#Run broker

cd c:/kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

#Create topic

cd c:/kafka/bin/windows

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic sreesTopic1

#Create a producer & consumer to test the Kafka setup

##Producer

cd c:/kafka/bin/windows

kafka-console-producer.bat --broker-list localhost:9092 --topic sreesTopic1

##Consumer

cd c:/kafka/bin/windows

#kafka-console-consumer.bat --zookeeper localhost:2181 --topic sreesTopic1

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic sreesTopic1 

Troubleshooting my SpringBoot-Kafka Consumer settings (timeouts) in local vs test to fix missingTopicsFatal is true


I had below local config and when i did JUnit unit testing (@SpringRunner), I was getting occasional error saying -

Local config (used during JUnit testing)
auto.commit.interval.ms=1000 
session.timeout.ms=6000 
request.timeout.ms=6000 
heartbeat.interval.ms=1000
Error which I receive on most attempts
IllegalStateException: Topic(s) [MY_TOPIC_NAME] is/are not present and missingTopicsFatal is true


Caused by: java.lang.IllegalStateException: Topic(s) [MY_TOPIC_NAME] is/are not present and missingTopicsFatal is true
   at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:351)
   at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:136)
   at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:308)
   at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:309)
   at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:256)
   at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)

Fixed by adjusting the auto.commit.ms & request.timeout.ms in Local (seems session was timing out by the time request processing (6sec) completes) - need to gather actual info.
auto.commit.interval.ms=2000 
session.timeout.ms=8000
request.timeout.ms=6000 
heartbeat.interval.ms=1000
Test config (which worked fine so far)
auto.commit.interval.ms=500 
session.timeout.ms=6001 
request.timeout.ms=3000 
heartbeat.interval.ms=120

Explanation on each of these time settings

auto.commit.interval.msHow often (in milliseconds) consumer offsets should be auto-committed when enable.auto.commit is enabled

session.timeout.ms=The timeout used to detect worker failures (Default: 10000)

request.timeout.ms=The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. (Use ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)

heartbeat.interval.ms=The expected time between heartbeats to the group coordinator when using Kafka’s group management facilities


Additional Notes:
<properties>   
  <maven.compiler.target>1.8</maven.compiler.target>
  <maven.compiler.source>1.8</maven.compiler.source>
  <spring.boot.version>2.2.0.M5</spring.boot.version>
  <spring.kafka.version>2.3.0.M4</spring.kafka.version>
  <spring-kafka-test-version>2.4.1.RELEASE</spring-kafka-test-version>
  <spring.boot.test.version>2.2.4.RELEASE</spring.boot.test.version>
  <junit.version>4.13</junit.version>
</properties>

Futher reads/References to learn more: https://docs.confluent.io/current/installation/configuration/consumer-configs.html

--------------------------------------------------
SpringBoot-Kafka Integration - key notes

ConcurrentKafkaListenerContainerFactory


ConcurrentKafkaListenerContainerFactory is used to create containers for annotated methods with @KafkaListener
There are two MessageListenerContainer in spring Kafka
1) KafkaMessageListenerContainer
2) ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. 
The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);

Here, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.
If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.
----------------------------------------------------------


# Error came on debugging with long delay between polls

Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

# OffsetCommitCallback
factory.getContainerProperties.setCommitCallBack( (offsets, exeption) -> {
  //offsets --> Map<TopicPartition, OffsetAndMetadata>

}

Adding Message Filter for Listeners

Listeners can be configured to consume specific types of messages by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:
1
2
3
4
5
6
7
8
9
10
11
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("filterOutIfItHasThisWord"));
    return factory;
}

Kafka Message Key (to aid Partitioning)

Apache Kafka uses message key for different purposes

  • Partitioning: Kafka can guarantee ordering only inside the same partition and it is therefore important to be able to route correlated messages into the same partition. To do so you need to specify a key for each message and Kafka will put all messages with the same key in the same partition.
  • Compacting topics: A topic can be configured with cleanup.policy=compact to instruct Kafka to keep only the latest message related to a certain object, identified by the message key. In other words Kafka will retain only 1 message per each key value.


Kafka Message KEY - for message ordering - same partition