Offset manager
Every kafka message has an incremental offset. Kafka API has method to commit offsets given as arguments. If a larger offset is committed, lower offsets are considered to be automatically committed.
Offset manager is a data structure which calculates committable offsets for each partition. To use offset manager:
- add message(s) with metadata about offset and partition with a batch key.
- add offsets into a sorted map with committable flag to be false.
addOffsetToBatch(Object batch, List<Message> messages)
- set messages to be committable once the processing is finished.
setCommittable(Object batch)to set the committable flag to be true.
getCommittableOffset()returns the largest offset that can be committed.
Implementation
Data Structures
- OffsetNode: A combination of topic, partition and the offset.
- toBeCommittableBatchOffsets: A map of batch-keys and a set of OffsetNodes.
- sortedOffsets: A map of topic-partition to a sorted list of OffsetNode.
Adding offsets
When addOffsetToBatch(Object batch, List<Message> messages) is called, it creates a OffsetNode from the message.
Each Topic-Partition has a sorted list by offsets. The OffsetNode is added into this sorted list.
OffsetNode is also added into the map keyed by provided key.
Setting a batch to be Committable.
setCommittable(Object batch) sets a flag isCommittable to be true on each
OffsetNode on the batch. It also removes from the map toBeCommittableBatchOffsets.
Getting Committable offsets
getCommittableOffset()
- For each topic-partition:
- Look for the contiguous offsets in the sorted list which are set to be committed.
- Return the largest offset from the contiguous series.
- Delete smaller OffsetNodes from the sorted list.