Kafka data from topic with limit

Kafka data from topic with limit

Use case: getting last entries from kafka topic within a limit. How to do that?

After several attempts how to do it properly, I've created this method, which works well and returns latest x records from kafka topic you specified.

  private Iterable<ConsumerRecord<String, T>> getLatestConsumerRecordsWithLimit(KafkaConsumer<String, T> consumer, String topic, long limit) {
        Iterable<ConsumerRecord<String, T>> records;
        synchronized (consumer) {
            consumer.subscribe(Collections.singletonList(topic));
            consumer.poll(0);
            consumer.seekToEnd(consumer.assignment());
            for (TopicPartition topicPartition : consumer.assignment()) {
                long actualPosition = consumer.position(topicPartition);
                long newPosition = actualPosition - limit;
                if (newPosition >= 0) {
                    consumer.seek(topicPartition, newPosition);
                } else {
                    consumer.seekToBeginning(consumer.assignment());
                }
            }
            ConsumerRecords<String, T> poll = consumer.poll(TIMEOUT_MS);
            records = poll.records(topic);
            consumer.unsubscribe();
        }
        return records;
    }

Let's go through several blocks

Iterable<ConsumerRecord<String, T>> getLatestConsumerRecordsWithLimit(KafkaConsumer<String, T> consumer, String topic, long limit)

T represents any generic type, so it can be for example used like:

getLatestConsumerRecordsWithLimit(KafkaConsumer<String, Person> consumer, String topic, long limit)

or

getLatestConsumerRecordsWithLimit(KafkaConsumer<String, Employee> consumer, String topic, long limit)
        synchronized (consumer) {

Kafka consumer is not naturally thread-safe. If there would be multiple threads accessing the method at the same time, the result can be unpredictable and kafka itself throws exception when detecting that. Therefore we need to perform synchronization on object consumer, which would allow only one thread entering the synchronized method at the same time.

consumer.poll(0);
consumer.seekToEnd(consumer.assignment());

we need to set pointer in kafka topic to the very end to determine total number of records.

long actualPosition = consumer.position(topicPartition);
long newPosition = actualPosition - limit;

then, getting the actual position at the very end. We want the new position to be the one at the end minus number of our records we want to return.

if (newPosition >= 0) {
    consumer.seek(topicPartition, newPosition);
} else {
    consumer.seekToBeginning(consumer.assignment());
}

it can happen, that you have smaller number of records in the topic than the actual limit, in that case, we'll set our pointer to the very beginning

ConsumerRecords<String, T> poll = consumer.poll(TIMEOUT_MS);
records = poll.records(topic);
consumer.unsubscribe();

and finally, getting the records within the timeout and unsubscribing.

In order to use this method, you'd need to inject the kafka consumer in your spring boot service:

private final KafkaConsumer<String, Person> kafkaConsumer;

Then, within some other method in your service:

Iterable<ConsumerRecord<String, Person>> records = getLatestConsumerRecordsWithLimit(kafkaConsumer, "person_topic", key, limit)

List<Person> = StreamSupport.stream(Spliterators.spliteratorUnknownSize(records.iterator(), Spliterator.ORDERED), false)
                .filter(e -> key.equals(e.key()))
                .map(e -> e.value())
                .collect(Collectors.toList())
.filter(e -> key.equals(e.key()))

we need to make sure to return only values which match key of the produced record. You shall always produce a record to kafka topic, which has specific key, so that it's assigned to correct partition based on that key.