KafkaAdminClient

class kafka.KafkaAdminClient(**configs)[source]

A class for administering the Kafka cluster.

Warning

This is an unstable interface that was recently added and is subject to change without warning. In particular, many methods currently return raw protocol tuples. In future releases, we plan to make these into nicer, more pythonic objects. Unfortunately, this will likely break those interfaces.

The KafkaAdminClient class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the Kafka broker. Usage of optional fields from protocol versions that are not supported by the broker will result in IncompatibleBrokerVersion exceptions.

Use of this class requires a minimum broker version >= 0.10.0.0.

Keyword Arguments:
 
  • bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.
  • client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
  • reconnect_backoff_ms (int) – The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50.
  • reconnect_backoff_max_ms (int) – The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 1000.
  • request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.
  • connections_max_idle_ms – Close idle connections after the number of milliseconds specified by this config. The broker closes idle connections after connections.max.idle.ms, so this avoids hitting unexpected socket disconnected errors on the client. Default: 540000
  • retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
  • max_in_flight_requests_per_connection (int) – Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
  • receive_buffer_bytes (int) – The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
  • send_buffer_bytes (int) – The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
  • socket_options (list) – List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
  • metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000
  • security_protocol (str) – Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
  • ssl_context (ssl.SSLContext) – Pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
  • ssl_check_hostname (bool) – Flag to configure whether SSL handshake should verify that the certificate matches the broker’s hostname. Default: True.
  • ssl_cafile (str) – Optional filename of CA file to use in certificate verification. Default: None.
  • ssl_certfile (str) – Optional filename of file in PEM format containing the client certificate, as well as any CA certificates needed to establish the certificate’s authenticity. Default: None.
  • ssl_keyfile (str) – Optional filename containing the client private key. Default: None.
  • ssl_password (str) – Optional password to be used when loading the certificate chain. Default: None.
  • ssl_crlfile (str) – Optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+. Default: None.
  • api_version (tuple) – Specify which Kafka API version to use. If set to None, KafkaClient will attempt to infer the broker version by probing various APIs. Example: (0, 10, 2). Default: None
  • api_version_auto_timeout_ms (int) – number of milliseconds to throw a timeout exception from the constructor when checking the broker api version. Only applies if api_version is None
  • selector (selectors.BaseSelector) – Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
  • metrics (kafka.metrics.Metrics) – Optionally provide a metrics instance for capturing network IO stats. Default: None.
  • metric_group_prefix (str) – Prefix for metric names. Default: ‘’
  • sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
  • sasl_plain_username (str) – username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
  • sasl_plain_password (str) – password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
  • sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’
  • sasl_kerberos_domain_name (str) – kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers
  • sasl_oauth_token_provider (AbstractTokenProvider) – OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None
alter_configs(config_resources)[source]

Alter configuration parameters of one or more Kafka resources.

Warning

This is currently broken for BROKER resources because those must be sent to that specific broker, versus this always picks the least-loaded node. See the comment in the source code for details. We would happily accept a PR fixing this.

Parameters:config_resources – A list of ConfigResource objects.
Returns:Appropriate version of AlterConfigsResponse class.
close()[source]

Close the KafkaAdminClient connection to the Kafka broker.

create_acls(acls)[source]

Create a list of ACLs

This endpoint only accepts a list of concrete ACL objects, no ACLFilters. Throws TopicAlreadyExistsError if topic is already present.

Parameters:acls – a list of ACL objects
Returns:dict of successes and failures
create_partitions(topic_partitions, timeout_ms=None, validate_only=False)[source]

Create additional partitions for an existing topic.

Parameters:
  • topic_partitions – A map of topic name strings to NewPartition objects.
  • timeout_ms – Milliseconds to wait for new partitions to be created before the broker returns.
  • validate_only – If True, don’t actually create new partitions. Default: False
Returns:

Appropriate version of CreatePartitionsResponse class.

create_topics(new_topics, timeout_ms=None, validate_only=False)[source]

Create new topics in the cluster.

Parameters:
  • new_topics – A list of NewTopic objects.
  • timeout_ms – Milliseconds to wait for new topics to be created before the broker returns.
  • validate_only – If True, don’t actually create new topics. Not supported by all versions. Default: False
Returns:

Appropriate version of CreateTopicResponse class.

delete_acls(acl_filters)[source]

Delete a set of ACLs

Deletes all ACLs matching the list of input ACLFilter

Parameters:acl_filters – a list of ACLFilter
Returns:a list of 3-tuples corresponding to the list of input filters. The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)
delete_topics(topics, timeout_ms=None)[source]

Delete topics from the cluster.

Parameters:
  • topics – A list of topic name strings.
  • timeout_ms – Milliseconds to wait for topics to be deleted before the broker returns.
Returns:

Appropriate version of DeleteTopicsResponse class.

describe_acls(acl_filter)[source]

Describe a set of ACLs

Used to return a set of ACLs matching the supplied ACLFilter. The cluster must be configured with an authorizer for this to work, or you will get a SecurityDisabledError

Parameters:acl_filter – an ACLFilter object
Returns:tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
describe_configs(config_resources, include_synonyms=False)[source]

Fetch configuration parameters for one or more Kafka resources.

Parameters:
  • config_resources – An list of ConfigResource objects. Any keys in ConfigResource.configs dict will be used to filter the result. Setting the configs dict to None will get all values. An empty dict will get zero values (as per Kafka protocol).
  • include_synonyms – If True, return synonyms in response. Not supported by all versions. Default: False.
Returns:

Appropriate version of DescribeConfigsResponse class.

describe_consumer_groups(group_ids, group_coordinator_id=None, include_authorized_operations=False)[source]

Describe a set of consumer groups.

Any errors are immediately raised.

Parameters:
  • group_ids – A list of consumer group IDs. These are typically the group names as strings.
  • group_coordinator_id – The node_id of the groups’ coordinator broker. If set to None, it will query the cluster for each group to find that group’s coordinator. Explicitly specifying this can be useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None.
  • include_authorized_operations – Whether or not to include information about the operations a group is allowed to perform. Only supported on API version >= v3. Default: False.
Returns:

A list of group descriptions. For now the group descriptions are the raw results from the DescribeGroupsResponse. Long-term, we plan to change this to return namedtuples as well as decoding the partition assignments.

list_consumer_group_offsets(group_id, group_coordinator_id=None, partitions=None)[source]

Fetch Consumer Offsets for a single consumer group.

Note: This does not verify that the group_id or partitions actually exist in the cluster.

As soon as any error is encountered, it is immediately raised.

Parameters:
  • group_id – The consumer group id name for which to fetch offsets.
  • group_coordinator_id – The node_id of the group’s coordinator broker. If set to None, will query the cluster to find the group coordinator. Explicitly specifying this can be useful to prevent that extra network round trip if you already know the group coordinator. Default: None.
  • partitions – A list of TopicPartitions for which to fetch offsets. On brokers >= 0.10.2, this can be set to None to fetch all known offsets for the consumer group. Default: None.
Return dictionary:
 

A dictionary with TopicPartition keys and OffsetAndMetada values. Partitions that are not specified and for which the group_id does not have a recorded offset are omitted. An offset value of -1 indicates the group_id has no offset for that TopicPartition. A -1 can only happen for partitions that are explicitly specified.

list_consumer_groups(broker_ids=None)[source]

List all consumer groups known to the cluster.

This returns a list of Consumer Group tuples. The tuples are composed of the consumer group name and the consumer group protocol type.

Only consumer groups that store their offsets in Kafka are returned. The protocol type will be an empty string for groups created using Kafka < 0.9 APIs because, although they store their offsets in Kafka, they don’t use Kafka for group coordination. For groups created using Kafka >= 0.9, the protocol type will typically be “consumer”.

As soon as any error is encountered, it is immediately raised.

Parameters:

broker_ids – A list of broker node_ids to query for consumer groups. If set to None, will query all brokers in the cluster. Explicitly specifying broker(s) can be useful for determining which consumer groups are coordinated by those broker(s). Default: None

Return list:

List of tuples of Consumer Groups.

Raises:
  • GroupCoordinatorNotAvailableError – The coordinator is not available, so cannot process requests.
  • GroupLoadInProgressError – The coordinator is loading and hence can’t process requests.