class kafka.KafkaAdminClient(**configs)[source]

A class for administering the Kafka cluster.

Warning

This is an unstable interface that was recently added and is subject to change without warning. In particular, many methods currently return raw protocol tuples. In future releases, we plan to make these into nicer, more pythonic objects. Unfortunately, this will likely break those interfaces.

The KafkaAdminClient class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the Kafka broker. Usage of optional fields from protocol versions that are not supported by the broker will result in IncompatibleBrokerVersion exceptions.

Use of this class requires a minimum broker version >= 0.10.0.0.

Keyword Arguments:
  • bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.

  • client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’

  • reconnect_backoff_ms (int) – The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50.

  • reconnect_backoff_max_ms (int) – The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 30000.

  • request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.

  • connections_max_idle_ms – Close idle connections after the number of milliseconds specified by this config. The broker closes idle connections after connections.max.idle.ms, so this avoids hitting unexpected socket disconnected errors on the client. Default: 540000

  • retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.

  • max_in_flight_requests_per_connection (int) – Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5.

  • receive_buffer_bytes (int) – The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.

  • send_buffer_bytes (int) – The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.

  • socket_options (list) – List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]

  • metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000

  • security_protocol (str) – Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.

  • ssl_context (ssl.SSLContext) – Pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.

  • ssl_check_hostname (bool) – Flag to configure whether SSL handshake should verify that the certificate matches the broker’s hostname. Default: True.

  • ssl_cafile (str) – Optional filename of CA file to use in certificate verification. Default: None.

  • ssl_certfile (str) – Optional filename of file in PEM format containing the client certificate, as well as any CA certificates needed to establish the certificate’s authenticity. Default: None.

  • ssl_keyfile (str) – Optional filename containing the client private key. Default: None.

  • ssl_password (str) – Optional password to be used when loading the certificate chain. Default: None.

  • ssl_crlfile (str) – Optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. Default: None.

  • api_version (tuple) – Specify which Kafka API version to use. If set to None, KafkaConnectionManager will attempt to infer the broker version by probing various APIs. Example: (0, 10, 2). Default: None

  • bootstrap_timeout_ms (int) – number of milliseconds to throw a timeout exception from the constructor when bootstrapping. Default: 2000.

  • selector (selectors.BaseSelector) – Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector

  • metrics (kafka.metrics.Metrics) – Optionally provide a metrics instance for capturing network IO stats. Default: None.

  • metric_group_prefix (str) – Prefix for metric names. Default: ‘’

  • sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.

  • sasl_plain_username (str) – username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.

  • sasl_plain_password (str) – password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.

  • sasl_kerberos_name (str or gssapi.Name) – Constructed gssapi.Name for use with sasl mechanism handshake. If provided, sasl_kerberos_service_name and sasl_kerberos_domain name are ignored. Default: None.

  • sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’

  • sasl_kerberos_domain_name (str) – kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers

  • sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider) – OAuthBearer token provider instance. Default: None

  • socks5_proxy (str) – Socks5 proxy url. Default: None

  • kafka_client (callable) – Custom class / callable for creating KafkaNetClient instances

alter_configs(config_resources, validate_only=False, raise_on_unknown=True, incremental=None)

Alter configuration parameters of one or more Kafka resources.

Parameters:
  • config_resources – A list of ConfigResource objects. Each resource’s configs must be a dict mapping config key to either (op, value) (where op is an AlterConfigOp, its name, or its int value) or a bare value (interpreted as SET). For DELETE operations the value is ignored and sent as null. APPEND/SUBTRACT require broker >= 2.3. On older brokers only SET is supported; non-SET ops raise ValueError. On older brokers the client also fills in all other modified dynamic keys before submitting, since AlterConfigsRequest resets any omitted key to its default (be aware of the inherent race in that approach).

  • validate_only (bool, optional) – If True, changes are sent to broker for validation only. Changes will not be applied. Default: False

  • raise_on_unknown (bool, optional) – If True, raises ValueError if any config key is not recognized as a dynamic config for the resource.

  • incremental (bool, optional) – Set to True/False to force use of IncrementalAlterConfigs (True) or AlterConfigs (False). By Default, the admin client will use IncrementalAlterConfigs if supported by the broker, otherwise AlterConfigs.

Returns:

{resource_name (str): Error/Result}}

Return type:

dict of {resource_type (str)

alter_group_offsets(group_id, offsets, group_coordinator_id=None)

Alter committed offsets for a consumer group.

The group must have no active members (i.e. be empty or dead) for the commit to succeed; otherwise individual partitions may return UNKNOWN_MEMBER_ID or similar errors.

Parameters:
Keyword Arguments:

group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.

Returns:

A dict mapping TopicPartition to the partition-level KafkaError class (NoError on success).

Return type:

dict

alter_partition_reassignments(reassignments, timeout_ms=None, raise_errors=True)

Alter the replica sets for the given partitions.

Parameters:

reassignments (dict) – A dict mapping TopicPartition to a list of broker IDs for the new replica set, or None to cancel a pending reassignment for that partition.

Keyword Arguments:
  • timeout_ms (numeric, optional) – The time in ms to wait for the request to complete.

  • raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.

Returns:

Decoded AlterPartitionReassignmentsResponse (as a dict).

alter_replica_log_dirs(replica_assignments)

Move replicas between log directories on their hosting brokers.

Each entry instructs the targeted broker to move (or place) the replica for a given partition into the specified absolute log directory path. Requests are sent to each broker in parallel; a broker will only act on replicas it currently hosts.

Parameters:

replica_assignments – A dict mapping TopicPartitionReplica (topic, partition, broker_id) to the destination log directory path (absolute string). Tuples of (topic, partition, broker_id) are also accepted.

Returns:

dict mapping TopicPartitionReplica to the corresponding error class (kafka.errors.NoError on success).

alter_user_scram_credentials(alterations)

Alter SCRAM credentials for one or more users.

Parameters:

alterations – A list of UserScramCredentialDeletion and/or UserScramCredentialUpsertion objects describing the credentials to delete and/or insert/update.

Returns:

A dict mapping user name -> error message (or None on success).

close()[source]

Close the KafkaAdminClient connection to the Kafka broker.

create_acls(acls)

Create a list of ACLs

This endpoint only accepts a list of concrete ACL objects, no ACLFilters. Throws TopicAlreadyExistsError if topic is already present.

Parameters:

acls – a list of ACL objects

Returns:

dict of successes and failures

create_partitions(topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True)

Create additional partitions for an existing topic.

Parameters:

topic_partitions – A dict of topic name strings to total partition count (int), or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} if manual assignment is desired. dict of {topic_name: NewPartitions} is deprecated.

Keyword Arguments:
  • timeout_ms (numeric, optional) – Milliseconds to wait for new partitions to be created before the broker returns.

  • validate_only (bool, optional) – If True, don’t actually create new partitions. Default: False

  • raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.

Returns:

Appropriate version of CreatePartitionsResponse class.

create_topics(new_topics, timeout_ms=None, validate_only=False, raise_errors=True, wait_for_metadata=False)

Create new topics in the cluster.

Parameters:

new_topics

A list of topic names, or a dict of {topic_name: {num_partitions: int (default -1),

replication_factor: int (default -1), assignments: {partition: [broker_ids]}, configs: {key: value}}}

All keys are optional.

List of NewTopic objects is deprecated. Note: for brokers < 2.4, num_partitions and replication_factor are required and must be provided via dict or [NewTopic].

Keyword Arguments:
  • timeout_ms (numeric, optional) – Milliseconds to wait for new topics to be created before the broker returns.

  • validate_only (bool, optional) – If True, don’t actually create new topics. Not supported by all versions. Default: False

  • raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.

  • wait_for_metadata (bool, optional) – If True, block until each new topic is visible in broker metadata with a leader assigned for every partition. Default: False

Returns:

dict of CreateTopicResponse key/vals

delete_acls(acl_filters)

Delete a set of ACLs

Deletes all ACLs matching the list of input ACLFilter

Parameters:

acl_filters – a list of ACLFilter

Returns:

a list of 3-tuples corresponding to the list of input filters.

The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)

delete_group_offsets(group_id, partitions, group_coordinator_id=None)

Delete committed offsets for a consumer group.

The group must have no active members subscribed to the given topics; otherwise partitions may fail with GROUP_SUBSCRIBED_TO_TOPIC.

Parameters:
  • group_id (str) – The consumer group id.

  • partitions – An iterable of TopicPartition whose committed offsets should be deleted.

Keyword Arguments:

group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.

Returns:

A dict mapping TopicPartition to the partition-level KafkaError class (NoError on success).

Return type:

dict

Raises:

KafkaError – If the response contains a top-level error (e.g. GroupIdNotFoundError, NonEmptyGroupError).

delete_groups(group_ids, group_coordinator_id=None)

Delete Group Offsets for given consumer groups.

Note: This does not verify that the group ids actually exist and group_coordinator_id is the correct coordinator for all these groups.

The result needs checking for potential errors.

Parameters:

group_ids ([str]) – The consumer group ids of the groups which are to be deleted.

Keyword Arguments:

group_coordinator_id (int, optional) – The node_id of the broker which is the coordinator for all the groups. Default: None.

Returns:

A list of tuples (group_id, KafkaError)

delete_records(records_to_delete, timeout_ms=None, partition_leader_id=None)

Delete records whose offset is smaller than the given offset of the corresponding partition.

Partitions whose response is NotLeaderForPartitionError are retried with refreshed metadata, bounded by timeout_ms (or the admin client’s request_timeout_ms when None). When partition_leader_id is supplied no retry is attempted; the caller is asserting routing and any error is reported as-is.

Parameters:

({TopicPartition (records_to_delete) – int}): The earliest available offsets for the given partitions.

Keyword Arguments:
  • timeout_ms (numeric, optional) – Timeout in milliseconds. Also caps the total time spent retrying NotLeaderForPartitionError.

  • partition_leader_id (node_id / int, optional) – If specified, all deletion requests will be sent to this node.

Returns:

dict {topicPartition -> metadata}

delete_topics(topics, timeout_ms=None, raise_errors=True)

Delete topics from the cluster.

Parameters:

topics ([str]) – A list of topic name strings or uuid.UUID ids.

Keyword Arguments:
  • timeout_ms (numeric, optional) – Milliseconds to wait for topics to be deleted before the broker returns.

  • raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.

Returns:

dict of DeleteTopicsResponse key/vals (version-dependent)

describe_acls(acl_filter)

Describe a set of ACLs

Used to return a set of ACLs matching the supplied ACLFilter. The cluster must be configured with an authorizer for this to work, or you will get a SecurityDisabledError

Parameters:

acl_filter – an ACLFilter object

Returns:

tuple of a list of matching ACL objects and a KafkaError (NoError if successful)

describe_cluster()

Fetch cluster-wide metadata such as the list of brokers, the controller ID, and the cluster ID.

Returns:

A dict with cluster-wide metadata, excluding topic details.

describe_configs(config_resources, include_synonyms=False, config_filter='modified')

Fetch configuration parameters for one or more Kafka resources.

Parameters:

config_resources – An list of ConfigResource objects. Any keys in ConfigResource.configs dict will be used to filter the result. Setting the configs dict to None will get all values. An empty dict will get zero values (as per Kafka protocol).

Keyword Arguments:
  • include_synonyms (bool, optional) – If True, return synonyms in response. Not supported by all versions. Default: False.

  • config_filter (ConfigFilterType or str) – Modified returns only keys that have non-default values; Dynamic returns all keys that can be modified with alter_configs; All returns all available keys. Default: Modified.

Returns:

{resource_name (str): {config_key: {config data}}}}

Return type:

dict of {resource_type (str)

describe_features(send_request_to_controller=False)

Fetch the cluster’s supported and finalized feature flags.

Features are broker-level capabilities (e.g. metadata.version) that can be finalized cluster-wide via update_features (KIP-584). Requires broker >= 2.4.

Keyword Arguments:

send_request_to_controller (bool, optional) – If True, route the request to the active controller. By default the request is sent to any available broker. Default: False.

Returns:

  • supported_features: dict of {feature_name: (min_version, max_version)}

  • finalized_features: dict of {feature_name: (min_version_level, max_version_level)}

  • finalized_features_epoch: int, or None if unknown (broker did not report an epoch, or reported -1)

Return type:

dict with keys

describe_groups(group_ids, group_coordinator_id=None, include_authorized_operations=False)

Describe a set of consumer groups.

Any errors are immediately raised.

Parameters:

group_ids – A list of consumer group IDs. These are typically the group names as strings.

Keyword Arguments:

group_coordinator_id (int, optional) – The node_id of the groups’ coordinator broker. If set to None, it will query the cluster for each group to find that group’s coordinator. Explicitly specifying this can be useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None.

Returns:

{key: val}}. key/vals are simple to_dict translations

of the raw results from DescribeGroupsResponse (with inline decoding of ConsumerSubscription and ConsumerAssignment metadata, and conversion of acl set ints to semantic enums).

Return type:

A dict of {group_id

describe_log_dirs(topic_partitions=None, brokers=None)

Fetch broker log directory and topic/partition stats

Keyword Arguments:
  • topic_partitions (dict, list, optional) – Either: dict of {topic_name: [partition ids]}. Or: list of [topic_name], to query all partitions for topic. Or: None, to query all topics / all partitions. Default: None

  • brokers (list, optional) – List of [node_id] for brokers to query. If None, query is sent to all brokers. Default: None

Returns:

list of dicts, containing per-broker log-dir data

describe_metadata_quorum()

Describe the KRaft quorum state for the cluster metadata log.

Returns quorum info for the __cluster_metadata topic (partition 0), including the current leader, leader epoch, high watermark, voters, and observers. On broker version >= 3.8 (KIP-853), the response also reports controller node endpoints in nodes. Requires a KRaft cluster.

Returns:

dict matching the DescribeQuorumResponse shape.

describe_topic_partitions(topics, response_partition_limit=2000, cursor=None)

Describe topics with fine-grained partition-level control (KIP-966).

Unlike describe_topics(), this uses the DescribeTopicPartitions API (apiKey 75, broker 3.7+) which supports pagination via a cursor and partition-level ELR (Eligible Leader Replicas) information.

Parameters:

topics ([str]) – A list of topic names.

Keyword Arguments:
  • response_partition_limit (int, optional) – Maximum number of partitions to include in the response. Default: 2000.

  • cursor (dict, optional) – Dict with 'topic_name' and 'partition_index' keys to start pagination from. Default: None.

Returns:

{'topics': [...], 'next_cursor': None | {...}}. topics is a list of dicts (one per topic) with keys error_code, name, topic_id, is_internal, partitions, and topic_authorized_operations. next_cursor is None if pagination is complete, otherwise a dict with the next page’s topic_name and partition_index.

Return type:

dict

describe_topics(topics=None)

Fetch metadata for the specified topics or all topics if None.

Keyword Arguments:

topics ([str], optional) – topics is retrieved.

Returns:

A list of dicts describing each topic (including partition info).

describe_user_scram_credentials(users=None)

Describe SCRAM credentials for one or more users.

Parameters:

users (list of str, optional) – User names to describe. If None, describe all users with SCRAM credentials.

Returns:

A dict mapping user name to a dict with keys 'error' (None or error message) and 'credential_infos' (list of {‘mechanism’: ScramMechanism, ‘iterations’: int}).

elect_leaders(election_type, topic_partitions=None, timeout_ms=None, raise_errors=True)

Trigger leader election for the specified topic partitions.

Parameters:

election_type – Type of election to attempt. 0 for Preferred, 1 for Unclean

Keyword Arguments:
  • topic_partitions (dict, list, optional) – Either: dict of {topic_name: [partition ids]}. Or: list of [topic_name], and election will run on all partitions for topic. Or: None, and election runs against all topics / all partitions. Default: None

  • timeout_ms (num, optional) – Milliseconds to wait for the leader election process.

  • raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.

Returns:

Appropriate version of ElectLeadersResponse class.

get_broker_version_data(broker_id)

Return BrokerVersionData for a specific broker

list_config_resources(resource_types=None)

List config resources known to the cluster.

Useful for discovering resource types that have no separate enumeration API (e.g. CLIENT_METRICS, GROUP). For TOPIC and BROKER the data is also available via Metadata / cluster descriptions.

Keyword Arguments:

resource_types (list, optional) – Filter by resource type. Each entry may be a ConfigResourceType or its name (e.g. 'TOPIC'). If None or empty, the broker returns all supported types. Requires broker >= 4.1 for anything other than CLIENT_METRICS.

Returns:

[resource_name (str)]}

Return type:

dict of {resource_type (str)

list_group_offsets(group_id, group_coordinator_id=None, partitions=None)

Fetch committed offsets for a single consumer group.

Note: This does not verify that the group_id or partitions actually exist in the cluster.

As soon as any error is encountered, it is immediately raised.

Parameters:

group_id (str) – The consumer group id name for which to fetch offsets.

Keyword Arguments:
  • group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If set to None, will query the cluster to find the group coordinator. Default: None.

  • partitions – A list of TopicPartitions for which to fetch offsets. On brokers >= 0.10.2, this can be set to None to fetch all known offsets for the consumer group. Default: None.

Returns:

A dict mapping TopicPartition to

OffsetAndMetadata.

list_groups(broker_ids=None, states_filter=None, types_filter=None)

List all consumer groups known to the cluster.

This returns a list of Group dicts. The tuples are composed of the consumer group name and the consumer group protocol type.

Only consumer groups that store their offsets in Kafka are returned. The protocol type will be an empty string for groups created using Kafka < 0.9 APIs because, although they store their offsets in Kafka, they don’t use Kafka for group coordination. For groups created using Kafka >= 0.9, the protocol type will typically be “consumer”.

As soon as any error is encountered, it is immediately raised.

Keyword Arguments:
  • broker_ids ([int], optional) – A list of broker node_ids to query for consumer groups. If set to None, will query all brokers in the cluster. Explicitly specifying broker(s) can be useful for determining which consumer groups are coordinated by those broker(s). Default: None

  • states_filter (list, optional) – Filter groups by state. Values may be GroupState members, their string names (case-insensitive, hyphen or underscore), or raw protocol strings (e.g. ['Stable', 'Empty']). Requires broker >= 3.0 (KIP-518). Default: None (no filter).

  • types_filter (list, optional) – Filter groups by type. Values may be GroupType members, their string names (case-insensitive), or raw protocol strings (e.g. ['consumer', 'classic', 'share']). Requires broker >= 4.0 (KIP-848). Default: None (no filter).

Returns:

List of group data dicts, with key/vals from ListGroupsRequest

list_partition_offsets(topic_partition_specs, isolation_level='read_uncommitted', timeout_ms=None)

Look up offsets for the given partitions by spec.

Partitions are routed to their respective leader brokers via cluster metadata; one ListOffsetsRequest is sent per leader. Partitions that return NotLeaderForPartitionError are retried with refreshed metadata, bounded by timeout_ms (or the admin client’s request_timeout_ms when None).

Parameters:

topic_partition_specs – dict mapping TopicPartition to OffsetSpec (or a raw integer timestamp / wire-level sentinel).

Keyword Arguments:
  • isolation_level (str, optional) – One of 'read_uncommitted' (default) or 'read_committed'. read_committed requires broker support for ListOffsets v2+.

  • timeout_ms (int, optional) – Maximum time to spend retrying NotLeaderForPartitionError. Default: request_timeout_ms.

Returns:

A dict mapping TopicPartition to OffsetAndTimestamp

Return type:

dict

Raises:
  • KafkaError – If any partition response carries an error code.

  • NotLeaderForPartitionError – If NotLeaderForPartitionError retries do not converge within timeout_ms.

  • UnknownTopicOrPartitionError – If a requested partition is not known to the cluster.

  • UnsupportedVersionError – If the broker does not support a version of ListOffsetsRequest compatible with the requested specs.

list_partition_reassignments(topic_partitions=None, timeout_ms=None)

List the current ongoing partition reassignments.

Parameters:

topic_partitions (dict, list, optional) – Either: a dict of {topic_name: [partition_ids]}, or a list of TopicPartition, or None to list ongoing reassignments for all partitions. Default: None.

Keyword Arguments:

timeout_ms (numeric, optional) – The time in ms to wait for the request to complete.

Returns:

A dict mapping TopicPartition to a dict with keys 'replicas', 'adding_replicas', and 'removing_replicas' (each a list of broker IDs).

Return type:

dict

list_topics()

Retrieve a list of all topic names in the cluster.

Returns:

A list of topic name strings.

remove_group_members(group_id, members, group_coordinator_id=None)

Remove members from a consumer group.

On brokers supporting LeaveGroup v3+ (Kafka 2.3+), a single batched request is sent. On older brokers, falls back to one single-member LeaveGroupRequest per member (in which case group_instance_id is not supported and member_id is required).

Parameters:
  • group_id (str) – The consumer group id.

  • members – An iterable of MemberToRemove. Each entry must set at least one of member_id or, if brokers support LeaveGroup v3+, group_instance_id. reason is only sent to brokers supporting LeaveGroup v5+ (KIP-800).

Keyword Arguments:

group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.

Returns:

A dict mapping MemberToRemove to the per-member KafkaError class (NoError on success). The key’s reason is always None in the result (not echoed by the broker).

Return type:

dict

Raises:
  • KafkaError – If a batched response contains a top-level error.

  • UnsupportedVersionError – If the broker does not support batched LeaveGroupRequest and any member uses group_instance_id.

reset_configs(config_resources, validate_only=False, raise_on_unknown=True, incremental=None)

Reset configuration parameters of one or more Kafka resources to defaults.

On 2.3+ brokers, the client will submit an IncrementalAlterConfigsRequest with op DELETE for each resource/key. On older brokers, the client will use submit an AlterConfigsRequest and attempt to include all modified dynamic config values for each resource except the keys marked for reset. (AlterConfigsRequest will reset any missing config key to its default).

Parameters:

config_resources – A list of ConfigResource objects. Each resource’s configs should be a list or dict of config keys to reset. (if dict, the values are ignored).

Returns:

{resource_name (str): Error/Result}}

Return type:

dict of {resource_type (str)

reset_group_offsets(group_id, offset_specs, group_coordinator_id=None)

Reset committed offsets for a consumer group.

The group must have no active members (i.e. be empty or dead) for the reset to succeed; otherwise individual partitions may return UNKNOWN_MEMBER_ID or similar errors.

Each dict value selects how the target offset is produced. All resulting offsets are clamped to the partition’s [earliest, latest] range; values that resolve to UNKNOWN_OFFSET (e.g. a timestamp beyond the last record) are clamped to latest.

Parameters:
  • group_id (str) – The consumer group id.

  • offset_specs (dict) –

    A dict mapping TopicPartition to one of:

    • OffsetSpec (e.g. OffsetSpec.EARLIEST, OffsetSpec.LATEST, OffsetSpec.MAX_TIMESTAMP): resolved server-side via ListOffsets.

    • OffsetTimestamp (ms since epoch): resolved server-side to the earliest offset whose timestamp is >= the given value.

    • Plain int: an explicit committed offset (no server-side resolution), which is still clamped to the valid range.

Keyword Arguments:

group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.

Returns:

A dict mapping TopicPartition to dict of {‘error’: KafkaError class, ‘offset’: int}. The offset value is the post-clamp value that was committed.

Return type:

dict

update_features(feature_updates, validate_only=False, timeout_ms=60000)

Update cluster-wide finalized feature flags.

Finalize cluster-wide feature capabilities (e.g. metadata.version). The request is always routed to the active controller. See KIP-584. Requires broker >= 2.7.

Parameters:

feature_updates – A dict of {feature_name: (upgrade_type, max_version_level)} or {feature_name: max_version_level} (implicit UPGRADE). upgrade_type may be a UpdateFeatureType, its name, or int value. A max_version_level < 1 requests deletion of the finalized feature.

Keyword Arguments:
  • validate_only (bool, optional) – If True, validate the request but do not apply it. Default: False.

  • timeout_ms (int, optional) – Broker-side timeout in milliseconds. Default: 60000.

Returns:

‘OK’ | error message}

Return type:

dict of {feature_name

wait_for_topics(topic_names, timeout_ms=10000)

Block until each of the given topics is ready to use.

CreateTopicsResponse only confirms that the broker accepted the create request; propagating the new topics into the broker’s metadata cache – and electing a leader for every partition – can lag behind, especially on KRaft clusters. This method polls describe_topics() at a fixed interval until every requested topic both:

  • is returned with error_code == 0, and

  • has error_code == 0 and a leader assigned (leader_id >= 0) for every partition.

Parameters:

topic_names ([str]) – Topic names to wait for.

Keyword Arguments:

timeout_ms (numeric, optional) – Maximum milliseconds to wait. Default: 10000.

Raises:

KafkaTimeoutError – if any topic is still not ready when the deadline expires.