spring kafka seek

Jackie
2 min readOct 15, 2021

I have a kafka consumer which after resume, it’s picking up from last committed offset and sequentially consuming from there. Even with several calls to seekToEnd().

Logs

//before pause 2021-10-09 15:44:18,569 [fx-kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024115] 2021-10-09 15:44:28,573 [fx-kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024116]

//paused

//called seekToEnd before resume

//after resume //called seekToEnd several times before and after resume 2021-10-09 15:45:13,603 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024117] 2021-10-09 15:45:53,610 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024118] 2021-10-09 15:46:03,612 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024119] 2021-10-09 15:46:13,613 [kafka-consumer-0-C-1] INFO c.x.f.r.k.HarvestConsumer - check raw msg size 1 on [17] @ [38012024120]

Turns out this actually due to an issue with the implementation of the subclass extend AbstractConsumerSeekAware.

Turned out there is an issue with my current code. At consumer start up, I did a manual seek in the onPartitionsAssigned. This so far have been working well.

However, now when there is a need to pause/resume the consumer, turned out the overrode method to do the manual seek has since skipped maintaining the Consumer into the callbacks field.

When the seekToEnd was invoked, it's really iterating through an empty map.

Published

Originally published at https://lwpro2.dev on October 15, 2021.

--

--