BrokerConnection¶
-
class
kafka.
BrokerConnection
(host, port, afi, **configs)[source]¶ Initialize a Kafka broker connection
Keyword Arguments: - client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
- reconnect_backoff_ms (int) – The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50.
- reconnect_backoff_max_ms (int) – The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.
- request_timeout_ms (int) – Client request timeout in milliseconds. Default: 40000.
- max_in_flight_requests_per_connection (int) – Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
- receive_buffer_bytes (int) – The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
- send_buffer_bytes (int) – The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
- socket_options (list) – List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
- security_protocol (str) – Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
- ssl_context (ssl.SSLContext) – pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
- ssl_check_hostname (bool) – flag to configure whether ssl handshake should verify that the certificate matches the brokers hostname. default: True.
- ssl_cafile (str) – optional filename of ca file to use in certificate veriication. default: None.
- ssl_certfile (str) – optional filename of file in pem format containing the client certificate, as well as any ca certificates needed to establish the certificate’s authenticity. default: None.
- ssl_keyfile (str) – optional filename containing the client private key. default: None.
- ssl_password (callable, str, bytes, bytearray) – optional password or callable function that returns a password, for decrypting the client private key. Default: None.
- ssl_crlfile (str) – optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. default: None.
- api_version (tuple) – Specify which Kafka API version to use. Accepted values are: (0, 8, 0), (0, 8, 1), (0, 8, 2), (0, 9), (0, 10). Default: (0, 8, 2)
- api_version_auto_timeout_ms (int) – number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
- selector (selectors.BaseSelector) – Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
- state_change_callback (callable) – function to be called when the connection state changes from CONNECTING to CONNECTED etc.
- metrics (kafka.metrics.Metrics) – Optionally provide a metrics instance for capturing network IO stats. Default: None.
- metric_group_prefix (str) – Prefix for metric names. Default: ‘’
- sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI. Default: PLAIN
- sasl_plain_username (str) – username for sasl PLAIN authentication. Default: None
- sasl_plain_password (str) – password for sasl PLAIN authentication. Default: None
- sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’
-
blacked_out
()[source]¶ Return true if we are disconnected from the given node and can’t re-establish a connection yet
-
check_version
(timeout=2, strict=False)[source]¶ Attempt to guess the broker version.
Note: This is a blocking call.
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), …
-
close
(error=None)[source]¶ Close socket and fail all in-flight-requests.
Parameters: error (Exception, optional) – pending in-flight-requests will be failed with this exception. Default: kafka.errors.ConnectionError.
-
connecting
()[source]¶ Returns True if still connecting (this may encompass several different states, such as SSL handshake, authorization, etc).
-
connection_delay
()[source]¶ Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects the reconnect backoff time. When connecting, returns 0 to allow non-blocking connect to finish. When connected, returns a very large number to handle slow/stalled connections.