SimpleProducer

class kafka.producer.SimpleProducer(*args, **kwargs)[source]

Bases: kafka.producer.base.Producer

A simple, round-robin producer.

See Producer class for Base Arguments

Additional Arguments:
random_start (bool, optional): randomize the initial partition which
the first message block will be published to, otherwise if false, the first message block will always publish to partition 0 before cycling through each partition, defaults to True.
class kafka.producer.KeyedProducer(*args, **kwargs)[source]

Bases: kafka.producer.base.Producer

A producer which distributes messages to partitions based on the key

See Producer class for Arguments

Additional Arguments:
partitioner: A partitioner class that will be used to get the partition
to send the message to. Must be derived from Partitioner. Defaults to HashedPartitioner.
class kafka.producer.base.Producer(client, req_acks=1, ack_timeout=1000, codec=None, codec_compresslevel=None, sync_fail_on_error=True, async=False, batch_send=False, batch_send_every_n=20, batch_send_every_t=20, async_retry_limit=None, async_retry_backoff_ms=100, async_retry_on_timeouts=True, async_queue_maxsize=0, async_queue_put_timeout=0, async_log_messages_on_error=True, async_stop_timeout=30)[source]

Bases: object

Base class to be used by producers

Parameters:
  • client (kafka.SimpleClient) – instance to use for broker communications. If async=True, the background thread will use client.copy(), which is expected to return a thread-safe object.
  • codec (kafka.protocol.ALL_CODECS) – compression codec to use.
  • req_acks (int, optional) – A value indicating the acknowledgements that the server must receive before responding to the request, defaults to 1 (local ack).
  • ack_timeout (int, optional) – millisecond timeout to wait for the configured req_acks, defaults to 1000.
  • sync_fail_on_error (bool, optional) – whether sync producer should raise exceptions (True), or just return errors (False), defaults to True.
  • async (bool, optional) – send message using a background thread, defaults to False.
  • batch_send_every_n (int, optional) – If async is True, messages are sent in batches of this size, defaults to 20.
  • batch_send_every_t (int or float, optional) – If async is True, messages are sent immediately after this timeout in seconds, even if there are fewer than batch_send_every_n, defaults to 20.
  • async_retry_limit (int, optional) – number of retries for failed messages or None for unlimited, defaults to None / unlimited.
  • async_retry_backoff_ms (int, optional) – milliseconds to backoff on failed messages, defaults to 100.
  • async_retry_on_timeouts (bool, optional) – whether to retry on RequestTimedOutError, defaults to True.
  • async_queue_maxsize (int, optional) – limit to the size of the internal message queue in number of messages (not size), defaults to 0 (no limit).
  • async_queue_put_timeout (int or float, optional) – timeout seconds for queue.put in send_messages for async producers – will only apply if async_queue_maxsize > 0 and the queue is Full, defaults to 0 (fail immediately on full queue).
  • async_log_messages_on_error (bool, optional) – set to False and the async producer will only log hash() contents on failed produce requests, defaults to True (log full messages). Hash logging will not allow you to identify the specific message that failed, but it will allow you to match failures with retries.
  • async_stop_timeout (int or float, optional) – seconds to continue attempting to send queued messages after producer.stop(), defaults to 30.
Deprecated Arguments:
batch_send (bool, optional): If True, messages are sent by a background
thread in batches, defaults to False. Deprecated, use ‘async’
send_messages(topic, partition, *msg)[source]

Helper method to send produce requests.

Note that msg type must be encoded to bytes by user. Passing unicode message will not work, for example you should encode before calling send_messages via something like unicode_message.encode(‘utf-8’) All messages will set the message ‘key’ to None.

Parameters:
  • topic (str) – name of topic for produce request
  • partition (int) – partition number for produce request
  • *msg (bytes) – one or more message payloads
Returns:

ResponseRequest returned by server

Raises:
  • FailedPayloadsError – low-level connection error, can be caused by networking failures, or a malformed request.
  • ConnectionError
  • KafkaUnavailableError – all known brokers are down when attempting to refresh metadata.
  • LeaderNotAvailableError – topic or partition is initializing or a broker failed and leadership election is in progress.
  • NotLeaderForPartitionError – metadata is out of sync; the broker that the request was sent to is not the leader for the topic or partition.
  • UnknownTopicOrPartitionError – the topic or partition has not been created yet and auto-creation is not available.
  • AsyncProducerQueueFull – in async mode, if too many messages are unsent and remain in the internal queue.
stop(timeout=None)[source]

Stop the producer (async mode). Blocks until async thread completes.