from __future__ import absolute_import
from struct import pack, unpack, error
from kafka.protocol.abstract import AbstractType
def _pack(f, value):
try:
return pack(f, value)
except error as e:
raise ValueError("Error encountered when attempting to convert value: "
"{!r} to struct format: '{}', hit error: {}"
.format(value, f, e))
def _unpack(f, data):
try:
(value,) = unpack(f, data)
return value
except error as e:
raise ValueError("Error encountered when attempting to convert value: "
"{!r} to struct format: '{}', hit error: {}"
.format(data, f, e))
[docs]class Int8(AbstractType):
[docs] @classmethod
def encode(cls, value):
return _pack('>b', value)
[docs] @classmethod
def decode(cls, data):
return _unpack('>b', data.read(1))
[docs]class Int16(AbstractType):
[docs] @classmethod
def encode(cls, value):
return _pack('>h', value)
[docs] @classmethod
def decode(cls, data):
return _unpack('>h', data.read(2))
[docs]class Int32(AbstractType):
[docs] @classmethod
def encode(cls, value):
return _pack('>i', value)
[docs] @classmethod
def decode(cls, data):
return _unpack('>i', data.read(4))
[docs]class Int64(AbstractType):
[docs] @classmethod
def encode(cls, value):
return _pack('>q', value)
[docs] @classmethod
def decode(cls, data):
return _unpack('>q', data.read(8))
[docs]class String(AbstractType):
def __init__(self, encoding='utf-8'):
self.encoding = encoding
[docs] def encode(self, value):
if value is None:
return Int16.encode(-1)
value = str(value).encode(self.encoding)
return Int16.encode(len(value)) + value
[docs] def decode(self, data):
length = Int16.decode(data)
if length < 0:
return None
value = data.read(length)
if len(value) != length:
raise ValueError('Buffer underrun decoding string')
return value.decode(self.encoding)
[docs]class Bytes(AbstractType):
[docs] @classmethod
def encode(cls, value):
if value is None:
return Int32.encode(-1)
else:
return Int32.encode(len(value)) + value
[docs] @classmethod
def decode(cls, data):
length = Int32.decode(data)
if length < 0:
return None
value = data.read(length)
if len(value) != length:
raise ValueError('Buffer underrun decoding Bytes')
return value
[docs] @classmethod
def repr(cls, value):
return repr(value[:100] + b'...' if value is not None and len(value) > 100 else value)
[docs]class Boolean(AbstractType):
[docs] @classmethod
def encode(cls, value):
return _pack('>?', value)
[docs] @classmethod
def decode(cls, data):
return _unpack('>?', data.read(1))
[docs]class Schema(AbstractType):
def __init__(self, *fields):
if fields:
self.names, self.fields = zip(*fields)
else:
self.names, self.fields = (), ()
[docs] def encode(self, item):
if len(item) != len(self.fields):
raise ValueError('Item field count does not match Schema')
return b''.join([
field.encode(item[i])
for i, field in enumerate(self.fields)
])
[docs] def decode(self, data):
return tuple([field.decode(data) for field in self.fields])
def __len__(self):
return len(self.fields)
[docs] def repr(self, value):
key_vals = []
try:
for i in range(len(self)):
try:
field_val = getattr(value, self.names[i])
except AttributeError:
field_val = value[i]
key_vals.append('%s=%s' % (self.names[i], self.fields[i].repr(field_val)))
return '(' + ', '.join(key_vals) + ')'
except Exception:
return repr(value)
[docs]class Array(AbstractType):
def __init__(self, *array_of):
if len(array_of) > 1:
self.array_of = Schema(*array_of)
elif len(array_of) == 1 and (isinstance(array_of[0], AbstractType) or
issubclass(array_of[0], AbstractType)):
self.array_of = array_of[0]
else:
raise ValueError('Array instantiated with no array_of type')
[docs] def encode(self, items):
if items is None:
return Int32.encode(-1)
return b''.join(
[Int32.encode(len(items))] +
[self.array_of.encode(item) for item in items]
)
[docs] def decode(self, data):
length = Int32.decode(data)
if length == -1:
return None
return [self.array_of.decode(data) for _ in range(length)]
[docs] def repr(self, list_of_items):
if list_of_items is None:
return 'NULL'
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'