UseCase: Given topic with 100 messages in kafka topic, I want to read messaged from offset 10 to offset 20. I could able to fetch from beginning offset. when i reach end offset, I have written code to stop the container.Even after execution of code, Consumer can consume further messages(from offset 21).It only stops after reading all messages in the topic
@Service
public class Consumer1 implements MessageListener<String, GenericRecord> {
@Override
public void onMessage(ConsumerRecord<String, GenericRecord> data) {
log.info("feed record {}", data);
if (data.offset() == 20) {
feedService.stopConsumer();
}
}
}
@Service
public class FeedService{
// start logic here
public void stopConsumer() {
kafkaMessageListenerContainer.stop();
}
}
Note: I am using spring-kafka latest version(2.6.4). One observation is container stop method is being executed but consumer is not getting closed.And no errors on output