- class kafka.KafkaAdminClient(**configs)[source]
A class for administering the Kafka cluster.
Warning
This is an unstable interface that was recently added and is subject to change without warning. In particular, many methods currently return raw protocol tuples. In future releases, we plan to make these into nicer, more pythonic objects. Unfortunately, this will likely break those interfaces.
The KafkaAdminClient class will negotiate for the latest version of each message protocol format supported by both the kafka-python client library and the Kafka broker. Usage of optional fields from protocol versions that are not supported by the broker will result in IncompatibleBrokerVersion exceptions.
Use of this class requires a minimum broker version >= 0.10.0.0.
- Keyword Arguments:
bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the consumer should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.
client_id (str) – a name for this client. This string is passed in each request to servers and can be used to identify specific server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: ‘kafka-python-{version}’
reconnect_backoff_ms (int) – The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50.
reconnect_backoff_max_ms (int) – The maximum amount of time in milliseconds to backoff/wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. Once the maximum is reached, reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between 20% below and 20% above the computed value. Default: 30000.
request_timeout_ms (int) – Client request timeout in milliseconds. Default: 30000.
connections_max_idle_ms – Close idle connections after the number of milliseconds specified by this config. The broker closes idle connections after connections.max.idle.ms, so this avoids hitting unexpected socket disconnected errors on the client. Default: 540000
retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
max_in_flight_requests_per_connection (int) – Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5.
receive_buffer_bytes (int) – The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). Java client defaults to 32768.
send_buffer_bytes (int) – The size of the TCP send buffer (SO_SNDBUF) to use when sending data. Default: None (relies on system defaults). Java client defaults to 131072.
socket_options (list) – List of tuple-arguments to socket.setsockopt to apply to broker connection sockets. Default: [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000
security_protocol (str) – Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. Default: PLAINTEXT.
ssl_context (ssl.SSLContext) – Pre-configured SSLContext for wrapping socket connections. If provided, all other ssl_* configurations will be ignored. Default: None.
ssl_check_hostname (bool) – Flag to configure whether SSL handshake should verify that the certificate matches the broker’s hostname. Default: True.
ssl_cafile (str) – Optional filename of CA file to use in certificate verification. Default: None.
ssl_certfile (str) – Optional filename of file in PEM format containing the client certificate, as well as any CA certificates needed to establish the certificate’s authenticity. Default: None.
ssl_keyfile (str) – Optional filename containing the client private key. Default: None.
ssl_password (str) – Optional password to be used when loading the certificate chain. Default: None.
ssl_crlfile (str) – Optional filename containing the CRL to check for certificate expiration. By default, no CRL check is done. When providing a file, only the leaf certificate will be checked against this CRL. Default: None.
api_version (tuple) – Specify which Kafka API version to use. If set to None, KafkaConnectionManager will attempt to infer the broker version by probing various APIs. Example: (0, 10, 2). Default: None
bootstrap_timeout_ms (int) – number of milliseconds to throw a timeout exception from the constructor when bootstrapping. Default: 2000.
selector (selectors.BaseSelector) – Provide a specific selector implementation to use for I/O multiplexing. Default: selectors.DefaultSelector
metrics (kafka.metrics.Metrics) – Optionally provide a metrics instance for capturing network IO stats. Default: None.
metric_group_prefix (str) – Prefix for metric names. Default: ‘’
sasl_mechanism (str) – Authentication mechanism when security_protocol is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are: PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
sasl_plain_username (str) – username for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_plain_password (str) – password for sasl PLAIN and SCRAM authentication. Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
sasl_kerberos_name (str or gssapi.Name) – Constructed gssapi.Name for use with sasl mechanism handshake. If provided, sasl_kerberos_service_name and sasl_kerberos_domain name are ignored. Default: None.
sasl_kerberos_service_name (str) – Service name to include in GSSAPI sasl mechanism handshake. Default: ‘kafka’
sasl_kerberos_domain_name (str) – kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (kafka.sasl.oauth.AbstractTokenProvider) – OAuthBearer token provider instance. Default: None
socks5_proxy (str) – Socks5 proxy url. Default: None
kafka_client (callable) – Custom class / callable for creating KafkaNetClient instances
- alter_configs(config_resources, validate_only=False, raise_on_unknown=True, incremental=None)
Alter configuration parameters of one or more Kafka resources.
- Parameters:
config_resources – A list of ConfigResource objects. Each resource’s
configsmust be a dict mapping config key to either(op, value)(whereopis anAlterConfigOp, its name, or its int value) or a bare value (interpreted as SET). For DELETE operations the value is ignored and sent as null. APPEND/SUBTRACT require broker >= 2.3. On older brokers only SET is supported; non-SET ops raise ValueError. On older brokers the client also fills in all other modified dynamic keys before submitting, since AlterConfigsRequest resets any omitted key to its default (be aware of the inherent race in that approach).validate_only (bool, optional) – If True, changes are sent to broker for validation only. Changes will not be applied. Default: False
raise_on_unknown (bool, optional) – If True, raises ValueError if any config key is not recognized as a dynamic config for the resource.
incremental (bool, optional) – Set to True/False to force use of IncrementalAlterConfigs (True) or AlterConfigs (False). By Default, the admin client will use IncrementalAlterConfigs if supported by the broker, otherwise AlterConfigs.
- Returns:
{resource_name (str): Error/Result}}
- Return type:
dict of {resource_type (str)
- alter_group_offsets(group_id, offsets, group_coordinator_id=None)
Alter committed offsets for a consumer group.
The group must have no active members (i.e. be empty or dead) for the commit to succeed; otherwise individual partitions may return
UNKNOWN_MEMBER_IDor similar errors.- Parameters:
group_id (str) – The consumer group id.
offsets (dict) – A dict mapping
TopicPartitiontoOffsetAndMetadata.
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.
- Returns:
A dict mapping
TopicPartitionto the partition-levelKafkaErrorclass (NoErroron success).- Return type:
dict
- alter_partition_reassignments(reassignments, timeout_ms=None, raise_errors=True)
Alter the replica sets for the given partitions.
- Parameters:
reassignments (dict) – A dict mapping
TopicPartitionto a list of broker IDs for the new replica set, orNoneto cancel a pending reassignment for that partition.- Keyword Arguments:
timeout_ms (numeric, optional) – The time in ms to wait for the request to complete.
raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.
- Returns:
Decoded AlterPartitionReassignmentsResponse (as a dict).
- alter_replica_log_dirs(replica_assignments)
Move replicas between log directories on their hosting brokers.
Each entry instructs the targeted broker to move (or place) the replica for a given partition into the specified absolute log directory path. Requests are sent to each broker in parallel; a broker will only act on replicas it currently hosts.
- Parameters:
replica_assignments – A dict mapping
TopicPartitionReplica(topic,partition,broker_id) to the destination log directory path (absolute string). Tuples of(topic, partition, broker_id)are also accepted.- Returns:
dict mapping
TopicPartitionReplicato the corresponding error class (kafka.errors.NoErroron success).
- alter_user_scram_credentials(alterations)
Alter SCRAM credentials for one or more users.
- Parameters:
alterations – A list of UserScramCredentialDeletion and/or UserScramCredentialUpsertion objects describing the credentials to delete and/or insert/update.
- Returns:
A dict mapping user name -> error message (or None on success).
- create_acls(acls)
Create a list of ACLs
This endpoint only accepts a list of concrete ACL objects, no ACLFilters. Throws TopicAlreadyExistsError if topic is already present.
- Parameters:
acls – a list of ACL objects
- Returns:
dict of successes and failures
- create_partitions(topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True)
Create additional partitions for an existing topic.
- Parameters:
topic_partitions – A dict of topic name strings to total partition count (int), or a dict of {topic_name: {count: int, assignments: [[broker_ids]]}} if manual assignment is desired. dict of {topic_name: NewPartitions} is deprecated.
- Keyword Arguments:
timeout_ms (numeric, optional) – Milliseconds to wait for new partitions to be created before the broker returns.
validate_only (bool, optional) – If True, don’t actually create new partitions. Default: False
raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.
- Returns:
Appropriate version of CreatePartitionsResponse class.
- create_topics(new_topics, timeout_ms=None, validate_only=False, raise_errors=True, wait_for_metadata=False)
Create new topics in the cluster.
- Parameters:
new_topics –
A list of topic names, or a dict of {topic_name: {num_partitions: int (default -1),
replication_factor: int (default -1), assignments: {partition: [broker_ids]}, configs: {key: value}}}
All keys are optional.
List of NewTopic objects is deprecated. Note: for brokers < 2.4, num_partitions and replication_factor are required and must be provided via dict or [NewTopic].
- Keyword Arguments:
timeout_ms (numeric, optional) – Milliseconds to wait for new topics to be created before the broker returns.
validate_only (bool, optional) – If True, don’t actually create new topics. Not supported by all versions. Default: False
raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.
wait_for_metadata (bool, optional) – If True, block until each new topic is visible in broker metadata with a leader assigned for every partition. Default: False
- Returns:
dict of CreateTopicResponse key/vals
- delete_acls(acl_filters)
Delete a set of ACLs
Deletes all ACLs matching the list of input ACLFilter
- Parameters:
acl_filters – a list of ACLFilter
- Returns:
- a list of 3-tuples corresponding to the list of input filters.
The tuples hold (the input ACLFilter, list of affected ACLs, KafkaError instance)
- delete_group_offsets(group_id, partitions, group_coordinator_id=None)
Delete committed offsets for a consumer group.
The group must have no active members subscribed to the given topics; otherwise partitions may fail with
GROUP_SUBSCRIBED_TO_TOPIC.- Parameters:
group_id (str) – The consumer group id.
partitions – An iterable of
TopicPartitionwhose committed offsets should be deleted.
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.
- Returns:
A dict mapping
TopicPartitionto the partition-levelKafkaErrorclass (NoErroron success).- Return type:
dict
- Raises:
KafkaError – If the response contains a top-level error (e.g.
GroupIdNotFoundError,NonEmptyGroupError).
- delete_groups(group_ids, group_coordinator_id=None)
Delete Group Offsets for given consumer groups.
Note: This does not verify that the group ids actually exist and group_coordinator_id is the correct coordinator for all these groups.
The result needs checking for potential errors.
- Parameters:
group_ids ([str]) – The consumer group ids of the groups which are to be deleted.
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the broker which is the coordinator for all the groups. Default: None.
- Returns:
A list of tuples (group_id, KafkaError)
- delete_records(records_to_delete, timeout_ms=None, partition_leader_id=None)
Delete records whose offset is smaller than the given offset of the corresponding partition.
Partitions whose response is
NotLeaderForPartitionErrorare retried with refreshed metadata, bounded bytimeout_ms(or the admin client’srequest_timeout_mswhenNone). Whenpartition_leader_idis supplied no retry is attempted; the caller is asserting routing and any error is reported as-is.- Parameters:
({TopicPartition (records_to_delete) – int}): The earliest available offsets for the given partitions.
- Keyword Arguments:
timeout_ms (numeric, optional) – Timeout in milliseconds. Also caps the total time spent retrying NotLeaderForPartitionError.
partition_leader_id (node_id / int, optional) – If specified, all deletion requests will be sent to this node.
- Returns:
dict {topicPartition -> metadata}
- delete_topics(topics, timeout_ms=None, raise_errors=True)
Delete topics from the cluster.
- Parameters:
topics ([str]) – A list of topic name strings or uuid.UUID ids.
- Keyword Arguments:
timeout_ms (numeric, optional) – Milliseconds to wait for topics to be deleted before the broker returns.
raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.
- Returns:
dict of DeleteTopicsResponse key/vals (version-dependent)
- describe_acls(acl_filter)
Describe a set of ACLs
Used to return a set of ACLs matching the supplied ACLFilter. The cluster must be configured with an authorizer for this to work, or you will get a SecurityDisabledError
- Parameters:
acl_filter – an ACLFilter object
- Returns:
tuple of a list of matching ACL objects and a KafkaError (NoError if successful)
- describe_cluster()
Fetch cluster-wide metadata such as the list of brokers, the controller ID, and the cluster ID.
- Returns:
A dict with cluster-wide metadata, excluding topic details.
- describe_configs(config_resources, include_synonyms=False, config_filter='modified')
Fetch configuration parameters for one or more Kafka resources.
- Parameters:
config_resources – An list of ConfigResource objects. Any keys in ConfigResource.configs dict will be used to filter the result. Setting the configs dict to None will get all values. An empty dict will get zero values (as per Kafka protocol).
- Keyword Arguments:
include_synonyms (bool, optional) – If True, return synonyms in response. Not supported by all versions. Default: False.
config_filter (ConfigFilterType or str) – Modified returns only keys that have non-default values; Dynamic returns all keys that can be modified with alter_configs; All returns all available keys. Default: Modified.
- Returns:
{resource_name (str): {config_key: {config data}}}}
- Return type:
dict of {resource_type (str)
- describe_features(send_request_to_controller=False)
Fetch the cluster’s supported and finalized feature flags.
Features are broker-level capabilities (e.g.
metadata.version) that can be finalized cluster-wide viaupdate_features(KIP-584). Requires broker >= 2.4.- Keyword Arguments:
send_request_to_controller (bool, optional) – If True, route the request to the active controller. By default the request is sent to any available broker. Default: False.
- Returns:
supported_features: dict of{feature_name: (min_version, max_version)}finalized_features: dict of{feature_name: (min_version_level, max_version_level)}finalized_features_epoch: int, or None if unknown (broker did not report an epoch, or reported -1)
- Return type:
dict with keys
- describe_groups(group_ids, group_coordinator_id=None, include_authorized_operations=False)
Describe a set of consumer groups.
Any errors are immediately raised.
- Parameters:
group_ids – A list of consumer group IDs. These are typically the group names as strings.
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the groups’ coordinator broker. If set to None, it will query the cluster for each group to find that group’s coordinator. Explicitly specifying this can be useful for avoiding extra network round trips if you already know the group coordinator. This is only useful when all the group_ids have the same coordinator, otherwise it will error. Default: None.
- Returns:
- {key: val}}. key/vals are simple to_dict translations
of the raw results from DescribeGroupsResponse (with inline decoding of ConsumerSubscription and ConsumerAssignment metadata, and conversion of acl set ints to semantic enums).
- Return type:
A dict of {group_id
- describe_log_dirs(topic_partitions=None, brokers=None)
Fetch broker log directory and topic/partition stats
- Keyword Arguments:
topic_partitions (dict, list, optional) – Either: dict of {topic_name: [partition ids]}. Or: list of [topic_name], to query all partitions for topic. Or: None, to query all topics / all partitions. Default: None
brokers (list, optional) – List of [node_id] for brokers to query. If None, query is sent to all brokers. Default: None
- Returns:
list of dicts, containing per-broker log-dir data
- describe_metadata_quorum()
Describe the KRaft quorum state for the cluster metadata log.
Returns quorum info for the
__cluster_metadatatopic (partition 0), including the current leader, leader epoch, high watermark, voters, and observers. On broker version >= 3.8 (KIP-853), the response also reports controller node endpoints innodes. Requires a KRaft cluster.- Returns:
dict matching the DescribeQuorumResponse shape.
- describe_topic_partitions(topics, response_partition_limit=2000, cursor=None)
Describe topics with fine-grained partition-level control (KIP-966).
Unlike
describe_topics(), this uses the DescribeTopicPartitions API (apiKey 75, broker 3.7+) which supports pagination via a cursor and partition-level ELR (Eligible Leader Replicas) information.- Parameters:
topics ([str]) – A list of topic names.
- Keyword Arguments:
response_partition_limit (int, optional) – Maximum number of partitions to include in the response. Default: 2000.
cursor (dict, optional) – Dict with
'topic_name'and'partition_index'keys to start pagination from. Default: None.
- Returns:
{'topics': [...], 'next_cursor': None | {...}}.topicsis a list of dicts (one per topic) with keyserror_code,name,topic_id,is_internal,partitions, andtopic_authorized_operations.next_cursoris None if pagination is complete, otherwise a dict with the next page’stopic_nameandpartition_index.- Return type:
dict
- describe_topics(topics=None)
Fetch metadata for the specified topics or all topics if None.
- Keyword Arguments:
topics ([str], optional) – topics is retrieved.
- Returns:
A list of dicts describing each topic (including partition info).
- describe_user_scram_credentials(users=None)
Describe SCRAM credentials for one or more users.
- Parameters:
users (list of str, optional) – User names to describe. If None, describe all users with SCRAM credentials.
- Returns:
A dict mapping user name to a dict with keys
'error'(None or error message) and'credential_infos'(list of {‘mechanism’: ScramMechanism, ‘iterations’: int}).
- elect_leaders(election_type, topic_partitions=None, timeout_ms=None, raise_errors=True)
Trigger leader election for the specified topic partitions.
- Parameters:
election_type – Type of election to attempt. 0 for Preferred, 1 for Unclean
- Keyword Arguments:
topic_partitions (dict, list, optional) – Either: dict of {topic_name: [partition ids]}. Or: list of [topic_name], and election will run on all partitions for topic. Or: None, and election runs against all topics / all partitions. Default: None
timeout_ms (num, optional) – Milliseconds to wait for the leader election process.
raise_errors (bool, optional) – Whether to raise errors as exceptions. Default True.
- Returns:
Appropriate version of ElectLeadersResponse class.
- get_broker_version_data(broker_id)
Return BrokerVersionData for a specific broker
- list_config_resources(resource_types=None)
List config resources known to the cluster.
Useful for discovering resource types that have no separate enumeration API (e.g.
CLIENT_METRICS,GROUP). ForTOPICandBROKERthe data is also available viaMetadata/ cluster descriptions.- Keyword Arguments:
resource_types (list, optional) – Filter by resource type. Each entry may be a
ConfigResourceTypeor its name (e.g.'TOPIC'). If None or empty, the broker returns all supported types. Requires broker >= 4.1 for anything other thanCLIENT_METRICS.- Returns:
[resource_name (str)]}
- Return type:
dict of {resource_type (str)
- list_group_offsets(group_id, group_coordinator_id=None, partitions=None)
Fetch committed offsets for a single consumer group.
Note: This does not verify that the group_id or partitions actually exist in the cluster.
As soon as any error is encountered, it is immediately raised.
- Parameters:
group_id (str) – The consumer group id name for which to fetch offsets.
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If set to None, will query the cluster to find the group coordinator. Default: None.
partitions – A list of TopicPartitions for which to fetch offsets. On brokers >= 0.10.2, this can be set to None to fetch all known offsets for the consumer group. Default: None.
- Returns:
- A dict mapping
TopicPartitionto
- A dict mapping
- list_groups(broker_ids=None, states_filter=None, types_filter=None)
List all consumer groups known to the cluster.
This returns a list of Group dicts. The tuples are composed of the consumer group name and the consumer group protocol type.
Only consumer groups that store their offsets in Kafka are returned. The protocol type will be an empty string for groups created using Kafka < 0.9 APIs because, although they store their offsets in Kafka, they don’t use Kafka for group coordination. For groups created using Kafka >= 0.9, the protocol type will typically be “consumer”.
As soon as any error is encountered, it is immediately raised.
- Keyword Arguments:
broker_ids ([int], optional) – A list of broker node_ids to query for consumer groups. If set to None, will query all brokers in the cluster. Explicitly specifying broker(s) can be useful for determining which consumer groups are coordinated by those broker(s). Default: None
states_filter (list, optional) – Filter groups by state. Values may be
GroupStatemembers, their string names (case-insensitive, hyphen or underscore), or raw protocol strings (e.g.['Stable', 'Empty']). Requires broker >= 3.0 (KIP-518). Default: None (no filter).types_filter (list, optional) – Filter groups by type. Values may be
GroupTypemembers, their string names (case-insensitive), or raw protocol strings (e.g.['consumer', 'classic', 'share']). Requires broker >= 4.0 (KIP-848). Default: None (no filter).
- Returns:
List of group data dicts, with key/vals from ListGroupsRequest
- list_partition_offsets(topic_partition_specs, isolation_level='read_uncommitted', timeout_ms=None)
Look up offsets for the given partitions by spec.
Partitions are routed to their respective leader brokers via cluster metadata; one
ListOffsetsRequestis sent per leader. Partitions that returnNotLeaderForPartitionErrorare retried with refreshed metadata, bounded bytimeout_ms(or the admin client’srequest_timeout_mswhenNone).- Parameters:
topic_partition_specs – dict mapping
TopicPartitiontoOffsetSpec(or a raw integer timestamp / wire-level sentinel).- Keyword Arguments:
isolation_level (str, optional) – One of
'read_uncommitted'(default) or'read_committed'.read_committedrequires broker support for ListOffsets v2+.timeout_ms (int, optional) – Maximum time to spend retrying NotLeaderForPartitionError. Default:
request_timeout_ms.
- Returns:
A dict mapping
TopicPartitiontoOffsetAndTimestamp- Return type:
dict
- Raises:
KafkaError – If any partition response carries an error code.
NotLeaderForPartitionError – If NotLeaderForPartitionError retries do not converge within
timeout_ms.UnknownTopicOrPartitionError – If a requested partition is not known to the cluster.
UnsupportedVersionError – If the broker does not support a version of ListOffsetsRequest compatible with the requested specs.
- list_partition_reassignments(topic_partitions=None, timeout_ms=None)
List the current ongoing partition reassignments.
- Parameters:
topic_partitions (dict, list, optional) – Either: a dict of
{topic_name: [partition_ids]}, or a list ofTopicPartition, orNoneto list ongoing reassignments for all partitions. Default: None.- Keyword Arguments:
timeout_ms (numeric, optional) – The time in ms to wait for the request to complete.
- Returns:
A dict mapping
TopicPartitionto a dict with keys'replicas','adding_replicas', and'removing_replicas'(each a list of broker IDs).- Return type:
dict
- list_topics()
Retrieve a list of all topic names in the cluster.
- Returns:
A list of topic name strings.
- remove_group_members(group_id, members, group_coordinator_id=None)
Remove members from a consumer group.
On brokers supporting LeaveGroup v3+ (Kafka 2.3+), a single batched request is sent. On older brokers, falls back to one single-member LeaveGroupRequest per member (in which case
group_instance_idis not supported andmember_idis required).- Parameters:
group_id (str) – The consumer group id.
members – An iterable of
MemberToRemove. Each entry must set at least one ofmember_idor, if brokers support LeaveGroup v3+,group_instance_id.reasonis only sent to brokers supporting LeaveGroup v5+ (KIP-800).
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.
- Returns:
A dict mapping
MemberToRemoveto the per-memberKafkaErrorclass (NoErroron success). The key’sreasonis always None in the result (not echoed by the broker).- Return type:
dict
- Raises:
KafkaError – If a batched response contains a top-level error.
UnsupportedVersionError – If the broker does not support batched LeaveGroupRequest and any member uses
group_instance_id.
- reset_configs(config_resources, validate_only=False, raise_on_unknown=True, incremental=None)
Reset configuration parameters of one or more Kafka resources to defaults.
On 2.3+ brokers, the client will submit an IncrementalAlterConfigsRequest with op DELETE for each resource/key. On older brokers, the client will use submit an AlterConfigsRequest and attempt to include all modified dynamic config values for each resource except the keys marked for reset. (AlterConfigsRequest will reset any missing config key to its default).
- Parameters:
config_resources – A list of ConfigResource objects. Each resource’s
configsshould be a list or dict of config keys to reset. (if dict, the values are ignored).- Returns:
{resource_name (str): Error/Result}}
- Return type:
dict of {resource_type (str)
- reset_group_offsets(group_id, offset_specs, group_coordinator_id=None)
Reset committed offsets for a consumer group.
The group must have no active members (i.e. be empty or dead) for the reset to succeed; otherwise individual partitions may return
UNKNOWN_MEMBER_IDor similar errors.Each dict value selects how the target offset is produced. All resulting offsets are clamped to the partition’s
[earliest, latest]range; values that resolve toUNKNOWN_OFFSET(e.g. a timestamp beyond the last record) are clamped tolatest.- Parameters:
group_id (str) – The consumer group id.
offset_specs (dict) –
A dict mapping
TopicPartitionto one of:OffsetSpec(e.g.OffsetSpec.EARLIEST,OffsetSpec.LATEST,OffsetSpec.MAX_TIMESTAMP): resolved server-side via ListOffsets.OffsetTimestamp(ms since epoch): resolved server-side to the earliest offset whose timestamp is>=the given value.Plain
int: an explicit committed offset (no server-side resolution), which is still clamped to the valid range.
- Keyword Arguments:
group_coordinator_id (int, optional) – The node_id of the group’s coordinator broker. If None, the cluster will be queried to locate the coordinator. Default: None.
- Returns:
A dict mapping
TopicPartitionto dict of {‘error’:KafkaErrorclass, ‘offset’: int}. Theoffsetvalue is the post-clamp value that was committed.- Return type:
dict
- update_features(feature_updates, validate_only=False, timeout_ms=60000)
Update cluster-wide finalized feature flags.
Finalize cluster-wide feature capabilities (e.g.
metadata.version). The request is always routed to the active controller. See KIP-584. Requires broker >= 2.7.- Parameters:
feature_updates – A dict of
{feature_name: (upgrade_type, max_version_level)}or{feature_name: max_version_level}(implicit UPGRADE).upgrade_typemay be aUpdateFeatureType, its name, or int value. Amax_version_level < 1requests deletion of the finalized feature.- Keyword Arguments:
validate_only (bool, optional) – If True, validate the request but do not apply it. Default: False.
timeout_ms (int, optional) – Broker-side timeout in milliseconds. Default: 60000.
- Returns:
‘OK’ | error message}
- Return type:
dict of {feature_name
- wait_for_topics(topic_names, timeout_ms=10000)
Block until each of the given topics is ready to use.
CreateTopicsResponse only confirms that the broker accepted the create request; propagating the new topics into the broker’s metadata cache – and electing a leader for every partition – can lag behind, especially on KRaft clusters. This method polls
describe_topics()at a fixed interval until every requested topic both:is returned with
error_code == 0, andhas
error_code == 0and a leader assigned (leader_id >= 0) for every partition.
- Parameters:
topic_names ([str]) – Topic names to wait for.
- Keyword Arguments:
timeout_ms (numeric, optional) – Maximum milliseconds to wait. Default: 10000.
- Raises:
KafkaTimeoutError – if any topic is still not ready when the deadline expires.