class kafka.BrokerConnection(host, port, afi, **configs)[source]

Initialize a Kafka broker connection

Keyword Arguments:
  • client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
  • reconnect_backoff_ms (int) – The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50.
  • reconnect_backoff_max_ms (int) – The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.
  • request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.
  • max_in_flight_requests_per_connection (int) – Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
  • receive_buffer_bytes (int) – The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
  • send_buffer_bytes (int) – The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
  • socket_options (list) – List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
  • security_protocol (str) – Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
  • ssl_context (ssl.SSLContext) – pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
  • ssl_check_hostname (bool) – flag to configure whether ssl handshake should verify that the certificate matches the brokers hostname. default: True.
  • ssl_cafile (str) – optional filename of ca file to use in certificate verification. default: None.
  • ssl_certfile (str) – optional filename of file in pem format containing the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.
  • ssl_keyfile (str) – optional filename containing the client private key. default: None.
  • ssl_password (callable, str, bytes, bytearray) – optional password or callable function that returns a password, for decrypting the client private key. Default: None.
  • ssl_crlfile (str) – optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.
  • ssl_ciphers (str) – optionally set the available ciphers for ssl connections. It should be a string in the OpenSSL cipher list format. If no cipher can be selected (because compile-time options or other configuration forbids use of all the specified ciphers), an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
  • api_version (tuple) – Specify which Kafka API version to use. Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)
  • api_version_auto_timeout_ms (int) – number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
  • selector (selectors.BaseSelector) – Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
  • state_change_callback (callable) – function to be called when the connection state changes from CONNECTING to CONNECTED etc.
  • metrics (kafka.metrics.Metrics) – Optionally provide a metrics instance for capturing network IO stats. Default: None.
  • metric_group_prefix (str) – Prefix for metric names. Default: ‘’
  • sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
  • sasl_plain_username (str) – username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
  • sasl_plain_password (str) – password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
  • sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’
  • sasl_kerberos_domain_name (str) – kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers
  • sasl_oauth_token_provider (AbstractTokenProvider) – OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None

Return true if we are disconnected from the given node and can’t re-establish a connection yet


Return True unless there are max_in_flight_requests_per_connection.

check_version(timeout=2, strict=False, topics=[])[source]

Attempt to guess the broker version.

Note: This is a blocking call.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …


Close socket and fail all in-flight-requests.

Parameters:error (Exception, optional) – pending in-flight-requests will be failed with this exception. Default: kafka.errors.KafkaConnectionError.

Attempt to connect and return ConnectionState


Return True iff socket is connected.


Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).


Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting or connected, returns a very large number to handle slow/stalled connections.


Return True iff socket is closed


Non-blocking network receive.

Return list of (response, future) tuples

send(request, blocking=True)[source]

Queue request for async network send, return Future()


Attempts to send pending requests messages via blocking IO If all requests have been sent, return True Otherwise, if the socket is blocked and there are more bytes to send, return False.


Attempts to send pending requests messages via non-blocking IO If all requests have been sent, return True Otherwise, if the socket is blocked and there are more bytes to send, return False.