Source code for kafka.partitioner.roundrobin

from __future__ import absolute_import

from kafka.partitioner.base import Partitioner


[docs]class RoundRobinPartitioner(Partitioner): def __init__(self, partitions=None): self.partitions_iterable = CachedPartitionCycler(partitions) if partitions: self._set_partitions(partitions) else: self.partitions = None def __call__(self, key, all_partitions=None, available_partitions=None): if available_partitions: cur_partitions = available_partitions else: cur_partitions = all_partitions if not self.partitions: self._set_partitions(cur_partitions) elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None: self._set_partitions(cur_partitions) return next(self.partitions_iterable) def _set_partitions(self, available_partitions): self.partitions = available_partitions self.partitions_iterable.set_partitions(available_partitions)
[docs] def partition(self, key, all_partitions=None, available_partitions=None): return self.__call__(key, all_partitions, available_partitions)
[docs]class CachedPartitionCycler(object): def __init__(self, partitions=None): self.partitions = partitions if partitions: assert type(partitions) is list self.cur_pos = None def __next__(self): return self.next() @staticmethod def _index_available(cur_pos, partitions): return cur_pos < len(partitions)
[docs] def set_partitions(self, partitions): if self.cur_pos: if not self._index_available(self.cur_pos, partitions): self.cur_pos = 0 self.partitions = partitions return None self.partitions = partitions next_item = self.partitions[self.cur_pos] if next_item in partitions: self.cur_pos = partitions.index(next_item) else: self.cur_pos = 0 return None self.partitions = partitions
[docs] def next(self): assert self.partitions is not None if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions): self.cur_pos = 1 return self.partitions[0] cur_item = self.partitions[self.cur_pos] self.cur_pos += 1 return cur_item