BrokerConnection

class kafka.BrokerConnection(host, port, afi, **configs)[source]

Initialize a Kafka broker connection

Keyword Arguments:
 
  • client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
  • reconnect_backoff_ms (int) – The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50.
  • reconnect_backoff_max_ms (int) – The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.
  • request_timeout_ms (int) – Client request timeout in milliseconds. Default: 40000.
  • max_in_flight_requests_per_connection (int) – Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
  • receive_buffer_bytes (int) – The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
  • send_buffer_bytes (int) – The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
  • socket_options (list) – List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
  • security_protocol (str) – Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
  • ssl_context (ssl.SSLContext) – pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
  • ssl_check_hostname (bool) – flag to configure whether ssl handshake should verify that the certificate matches the brokers hostname. default: True.
  • ssl_cafile (str) – optional filename of ca file to use in certificate veriication. default: None.
  • ssl_certfile (str) – optional filename of file in pem format containing the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.
  • ssl_keyfile (str) – optional filename containing the client private key. default: None.
  • ssl_password (callable, str, bytes, bytearray) – optional password or callable function that returns a password, for decrypting the client private key. Default: None.
  • ssl_crlfile (str) – optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.
  • api_version (tuple) – Specify which Kafka API version to use. Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)
  • api_version_auto_timeout_ms (int) – number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
  • selector (selectors.BaseSelector) – Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
  • state_change_callback (callable) – function to be called when the connection state changes from CONNECTING to CONNECTED etc.
  • metrics (kafka.metrics.Metrics) – Optionally provide a metrics instance for capturing network IO stats. Default: None.
  • metric_group_prefix (str) – Prefix for metric names. Default: ‘’
  • sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI. Default: PLAIN
  • sasl_plain_username (str) – username for sasl PLAIN authentication. Default: None
  • sasl_plain_password (str) – password for sasl PLAIN authentication. Default: None
  • sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’
blacked_out()[source]

Return true if we are disconnected from the given node and can’t re-establish a connection yet

can_send_more()[source]

Return True unless there are max_in_flight_requests_per_connection.

check_version(timeout=2, strict=False)[source]

Attempt to guess the broker version.

Note: This is a blocking call.

Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …

close(error=None)[source]

Close socket and fail all in-flight-requests.

Parameters:error (Exception, optional) – pending in-flight-requests will be failed with this exception. Default: kafka.errors.ConnectionError.
connect()[source]

Attempt to connect and return ConnectionState

connected()[source]

Return True iff socket is connected.

connecting()[source]

Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).

connection_delay()[source]

Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting, returns 0 to allow non-blocking connect to finish. When connected, returns a very large number to handle slow/stalled connections.

disconnected()[source]

Return True iff socket is closed

recv()[source]

Non-blocking network receive.

Return list of (response, future) tuples

send(request)[source]

send request, return Future()

Can block on network if request is larger than send_buffer_bytes