Source code for kafka.partitioner.hashed

from __future__ import absolute_import

from kafka.vendor import six

from .base import Partitioner


[docs]class Murmur2Partitioner(Partitioner): """ Implements a partitioner which selects the target partition based on the hash of the key. Attempts to apply the same hashing function as mainline java client. """
[docs] def partition(self, key, partitions=None): if not partitions: partitions = self.partitions # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69 idx = (murmur2(key) & 0x7fffffff) % len(partitions) return partitions[idx]
[docs]class LegacyPartitioner(Partitioner): """DEPRECATED -- See Issue 374 Implements a partitioner which selects the target partition based on the hash of the key """
[docs] def partition(self, key, partitions=None): if not partitions: partitions = self.partitions size = len(partitions) idx = hash(key) % size return partitions[idx]
# Default will change to Murmur2 in 0.10 release HashedPartitioner = LegacyPartitioner # https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
[docs]def murmur2(key): """Pure-python Murmur2 implementation. Based on java client, see org.apache.kafka.common.utils.Utils.murmur2 Args: key: if not a bytes type, encoded using default encoding Returns: MurmurHash2 of key bytearray """ # Convert key to bytes or bytearray if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)): data = key else: data = bytearray(str(key).encode()) length = len(data) seed = 0x9747b28c # 'm' and 'r' are mixing constants generated offline. # They're not really 'magic', they just happen to work well. m = 0x5bd1e995 r = 24 # Initialize the hash to a random value h = seed ^ length length4 = length // 4 for i in range(length4): i4 = i * 4 k = ((data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24)) k &= 0xffffffff k *= m k &= 0xffffffff k ^= (k % 0x100000000) >> r # k ^= k >>> r k &= 0xffffffff k *= m k &= 0xffffffff h *= m h &= 0xffffffff h ^= k h &= 0xffffffff # Handle the last few bytes of the input array extra_bytes = length % 4 if extra_bytes >= 3: h ^= (data[(length & ~3) + 2] & 0xff) << 16 h &= 0xffffffff if extra_bytes >= 2: h ^= (data[(length & ~3) + 1] & 0xff) << 8 h &= 0xffffffff if extra_bytes >= 1: h ^= (data[length & ~3] & 0xff) h &= 0xffffffff h *= m h &= 0xffffffff h ^= (h % 0x100000000) >> 13 # h >>> 13; h &= 0xffffffff h *= m h &= 0xffffffff h ^= (h % 0x100000000) >> 15 # h >>> 15; h &= 0xffffffff return h