Source code for kafka.producer.future
import collections
import threading
from kafka import errors as Errors
from kafka.future import Future
from kafka.util import Timer
class FutureProduceResult(Future):
__slots__ = ('topic_partition', '_latch')
def __init__(self, topic_partition):
super().__init__()
self.topic_partition = topic_partition
self._latch = threading.Event()
def success(self, value):
ret = super().success(value)
self._latch.set()
return ret
def failure(self, error):
ret = super().failure(error)
self._latch.set()
return ret
def wait(self, timeout=None):
# wait() on python2.6 returns None instead of the flag value
return self._latch.wait(timeout) or self._latch.is_set()
[docs]
class FutureRecordMetadata(Future):
"""An asynchronous handle to the result of a single :meth:`~kafka.KafkaProducer.send`.
:meth:`~kafka.KafkaProducer.send` returns one of these immediately,
before the record has been transmitted to the broker. Call :meth:`get`
to block until the record is acknowledged and obtain its
:class:`RecordMetadata`, or register callbacks via
:meth:`~kafka.future.Future.add_callback` /
:meth:`~kafka.future.Future.add_errback` to be notified without
blocking. The future resolves successfully once the containing batch is
acknowledged according to the producer's ``acks`` configuration, or
fails with the relevant exception (for example
:class:`~kafka.errors.KafkaTimeoutError`).
"""
__slots__ = ('_produce_future', 'args')
def __init__(self, produce_future, batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
super().__init__()
self._produce_future = produce_future
# packing args as a tuple is a minor speed optimization
self.args = (batch_index, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
produce_future._add_cb_eb(self._produce_success, self.failure)
def _produce_success(self, result):
offset, produce_timestamp_ms, record_exceptions_fn = result
# Unpacking from args tuple is minor speed optimization
(batch_index, timestamp_ms, checksum,
serialized_key_size, serialized_value_size, serialized_header_size) = self.args
if record_exceptions_fn is not None:
self.failure(record_exceptions_fn(batch_index))
else:
# None is when Broker does not support the API (<0.10) and
# -1 is when the broker is configured for CREATE_TIME timestamps
if produce_timestamp_ms is not None and produce_timestamp_ms != -1:
timestamp_ms = produce_timestamp_ms
if offset != -1 and batch_index is not None:
offset += batch_index
tp = self._produce_future.topic_partition
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
checksum, serialized_key_size,
serialized_value_size, serialized_header_size)
self.success(metadata)
def rebind(self, new_produce_future, new_batch_index):
"""Rebind this future to a new produce future with a new batch index.
Used when a batch is split due to MESSAGE_TOO_LARGE. The original
FutureRecordMetadata is rebound to the new (smaller) batch's future.
This must be called from the sender thread while the old produce_future
has not been completed. Any user thread blocked in get() on the old
produce_future's latch will be woken and will re-wait on the new one.
"""
old_produce_future = self._produce_future
self._produce_future = new_produce_future
_, timestamp_ms, checksum, sk, sv, sh = self.args
self.args = (new_batch_index, timestamp_ms, checksum, sk, sv, sh)
new_produce_future._add_cb_eb(self._produce_success, self.failure)
# Wake any thread blocked in get() so it re-waits on the new future.
# The old produce_future is never completed, so its stale callbacks
# (registered in __init__) will never fire.
old_produce_future._latch.set()
[docs]
def get(self, timeout=None):
"""Wait for up to timeout seconds for future to complete."""
# Loop because rebind() may wake us from the old produce_future's
# latch before the record is actually done. A batch may be split
# multiple times, so each rebind wakes us and we re-wait on the
# (possibly new) _produce_future.
timer = Timer(timeout * 1000 if timeout is not None else None)
while not self.is_done and not timer.expired:
if not self._produce_future.wait(timer.timeout_secs):
raise Errors.KafkaTimeoutError(
"Timeout after waiting for %s secs." % (timeout,))
if self.failed():
raise self.exception # pylint: disable-msg=raising-bad-type
return self.value
RecordMetadata = collections.namedtuple(
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])
RecordMetadata.__doc__ = """Metadata about a record that has been acknowledged by the broker.
Returned by :meth:`FutureRecordMetadata.get`, which resolves once the
batch containing the record has been acknowledged according to the
producer's ``acks`` configuration.
Keyword Arguments:
topic (str): The topic the record was appended to.
partition (int): The partition the record was appended to.
topic_partition (TopicPartition): The ``(topic, partition)`` the record
was appended to.
offset (int): The offset of the record in the topic partition, or -1 if
the broker did not assign one (e.g. ``acks=0``).
timestamp (int): The timestamp of the record, in milliseconds since the
epoch (UTC). For CreateTime this is the producer-supplied timestamp;
for LogAppendTime it is the broker-assigned timestamp.
checksum (int): Deprecated. The CRC32 checksum of the record, or None.
Removed in message format v2 (Kafka 0.11+).
serialized_key_size (int): The size of the serialized, uncompressed key
in bytes, or -1 if the key is None.
serialized_value_size (int): The size of the serialized, uncompressed
value in bytes, or -1 if the value is None.
serialized_header_size (int): The size of the serialized, uncompressed
headers in bytes, or -1 if there are no headers.
"""