Wojtek Wajerowicz

resetting kafka offsets

If you’re using Apache Kafka, you know it persists all the messages on disk as a distributed commit log. You might sometimes want to take advantage of that and reprocess some of the messages. Assuming that you want to reprocess all the messages currently stored on your brokers and you set auto.offset.reset to smallest, you can just delete your consumers’ data from Zookeeper. After restarting, your consumers should start from the beginning. But what if you forgot or didn’t want to set auto.offset.reset in you consumers to smallest? Then you can manually set the offsets for each partition for your consumers to the smallest currently available offset.

finding earliest offset

Finding the smallest offset is pretty straightforward. Before starting you need to download or build Kafka binaries and then simply run:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list {brokerUrl} —topic {yourFancyTopic} --time -2

This command is pretty straightforward, –time -2 means that we want the smallest offset (you could use -1 if you wanted to find the largest one). Make sure that you use your Kafka broker URLs rather than Zookeeper URLs. After running the command you should get an output similar to:

yourFancyTopic:0:4851608
yourFancyTopic:1:4921219
yourFancyTopic:2:4934182
yourFancyTopic:3:4849460
yourFancyTopic:4:4920949
yourFancyTopic:5:4927682
yourFancyTopic:6:4847047
yourFancyTopic:7:4917802
yourFancyTopic:8:4929010
yourFancyTopic:9:4850859

As you have probably figured out, the format of this output is topic:partitionId:offset.

Now all you have to do is set the partition offsets for your consumers to this values.

manually setting kafka consumer offsets

Manually setting consumers is pretty straightforward. You’ll need a Zookeeper client (I used zookeeper-cli as it seems to be a bit more user friendly than the one which comes with Kafka, e.g. enables you to do recursive deletes). After connecting to your Zookeeper cluster, simply type for each partition:

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}

Now all you need to do is to use partition IDs and offsets from the previous steps. Make sure you stop all your consumers before doing that, otherwise they might overwrite the offsets you wrote.

Although this may not be the fastest way of doing this, it can easily be automated if we need it to happen more frequently. Are you aware of quicker ways of doing this? Tweet us or drop a comment below to let us know what your solution is.

blog comments powered by Disqus