### Run your local zookeeper and Kafka cluster from command line ###
#Note1: open separate commandlines to run each command
#Note2: make sure that Kafka is installed on C drive to reduce path/classpath length.
#Run Zookeeper
cd c:/kafka
zkserver
#Run broker
cd c:/kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties
#Create topic
cd c:/kafka/bin/windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic sreesTopic1
#Create a producer & consumer to test the Kafka setup
##Producer
cd c:/kafka/bin/windows
kafka-console-producer.bat --broker-list localhost:9092 --topic sreesTopic1
##Consumer
cd c:/kafka/bin/windows
#kafka-console-consumer.bat --zookeeper localhost:2181 --topic sreesTopic1
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic sreesTopic1
Troubleshooting my SpringBoot-Kafka Consumer settings (timeouts) in local vs test to fix missingTopicsFatal is true
I had below local config and when i did JUnit unit testing (@SpringRunner), I was getting occasional error saying -
Local config (used during JUnit testing)
auto.commit.interval.ms=1000
session.timeout.ms=6000
request.timeout.ms=6000
heartbeat.interval.ms=1000Error which I receive on most attempts
IllegalStateException: Topic(s) [MY_TOPIC_NAME] is/are not present and missingTopicsFatal is true
Caused by: java.lang.IllegalStateException: Topic(s) [MY_TOPIC_NAME] is/are not present and missingTopicsFatal is true at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:351) at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:136) at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:308) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:309) at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:256) at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
Fixed by adjusting the auto.commit.ms & request.timeout.ms in Local (seems session was timing out by the time request processing (6sec) completes) - need to gather actual info.
auto.commit.interval.ms=2000
session.timeout.ms=8000
request.timeout.ms=6000
heartbeat.interval.ms=1000Test config (which worked fine so far)
auto.commit.interval.ms=500
session.timeout.ms=6001
request.timeout.ms=3000
heartbeat.interval.ms=120
Explanation on each of these time settings
auto.commit.interval.ms= How often (in milliseconds) consumer offsets should be auto-committed when enable.auto.commit is enabled
session.timeout.ms=The timeout used to detect worker failures (Default: 10000)
request.timeout.ms=The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. (Use ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)
heartbeat.interval.ms=The expected time between heartbeats to the group coordinator when using Kafka’s group management facilities
Additional Notes:
<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.0.M5</spring.boot.version>
<spring.kafka.version>2.3.0.M4</spring.kafka.version>
<spring-kafka-test-version>2.4.1.RELEASE</spring-kafka-test-version>
<spring.boot.test.version>2.2.4.RELEASE</spring.boot.test.version>
<junit.version>4.13</junit.version>
</properties>
Futher reads/References to learn more: https://docs.confluent.io/current/installation/configuration/consumer-configs.html
--------------------------------------------------
SpringBoot-Kafka Integration - key notes
ConcurrentKafkaListenerContainerFactory
ConcurrentKafkaListenerContainerFactory
is used to create containers for annotated methods with @KafkaListener
MessageListenerContainer
in spring Kafkafactory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
Here, container.setConcurrency(3) creates three KafkaMessageListenerContainer instances.
If you have six
TopicPartition
instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.Listeners can be configured to consume specific types of messages by adding a custom filter. This can be done by setting a RecordFilterStrategy to the KafkaListenerContainerFactory:
1
2
3
4
5
6
7
8
9
10
11
| @Bean public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains( "filterOutIfItHasThisWord" )); return factory; } Kafka Message Key (to aid Partitioning)Apache Kafka uses message key for different purposes
Kafka Message KEY - for message ordering - same partition |