- class kafka.cluster.ClusterMetadata(**configs)[source]
A class to manage kafka cluster metadata.
- Keyword Arguments:
retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.
metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000
bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the client should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.
allow_auto_create_topics (bool) – Enable/disable auto topic creation on metadata request. Only available with api_version >= (0, 11). Default: True
- add_coordinator(response, key_type, key)[source]
Update with metadata for a group or txn coordinator
- Parameters:
response (FindCoordinatorResponse) – broker response
key_type (str) – ‘group’ or ‘transaction’
key (str) – consumer_group or transactional_id
- Returns:
coordinator node_id if metadata is updated, None on error
- Return type:
string
- add_topic(topic)[source]
Add a topic to the list of topics tracked via metadata.
- Parameters:
topic (str) – topic to track
- Returns:
resolves after metadata request/response
- Return type:
Future
- Raises:
TypeError – if topic is not a string
ValueError – if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length
- attach(manager)[source]
Wire this cluster to its connection manager.
Construction is split from attach so ClusterMetadata can be built standalone (tests, snapshots) without a live manager. The reference is held via weakref.proxy so that manager <-> cluster does not form a GC cycle; manager.close() still calls cluster.close() to clear eagerly.
- available_partitions_for_topic(topic)[source]
Return set of partitions with known leaders
- Parameters:
topic (str) – topic to check for partitions
- Returns:
{partition (int), …} None if topic not found.
- Return type:
set
- bootstrap_brokers()[source]
Get bootstrap brokers only, extracted from the bootstrap_servers config option. Node ids are synthesized as ‘bootstrap-0’ etc.
- Returns:
[MetadataResponseBroker, …]
- Return type:
list
- broker_metadata(broker_id)[source]
Get MetadataResponseBroker
- Parameters:
broker_id (int or str) – node_id for a broker to check
- Returns:
MetadataResponseBroker or None if not found
- brokers()[source]
Get all MetadataResponseBroker
- Returns:
[MetadataResponseBroker, …]
- Return type:
list
- coordinator_for_group(group)[source]
Return node_id of group coordinator.
- Parameters:
group (str) – name of consumer group
- Returns:
node_id (int or str) for group coordinator, -1 if coordinator unknown None if the group does not exist.
- is_replica_node(partition, node_id)[source]
Return MetadataResponseBroker for
node_idonly when it is known AND still listed as a replica ofpartition(KIP-392).Used by the consumer’s preferred-read-replica routing to avoid sending fetches to a broker that has been demoted out of the partition’s replica set even though it still exists as a node.
- Parameters:
partition (TopicPartition) – topic / partition to look up.
node_id (int) – broker id to validate.
- Returns:
MetadataResponseBroker if the node exists in cluster metadata and is currently listed as a replica of
partition; otherwise None.
- leader_epoch_for_partition(partition)[source]
Return leader_epoch for partition, or None if topic/partition is unknown.
- property metadata_refresh_in_progress
True if a refresh is mid-flight.
- partitions_for_broker(broker_id)[source]
Return TopicPartitions for which the broker is a leader.
- Parameters:
broker_id (int or str) – node id for a broker
- Returns:
{TopicPartition, …} None if the broker either has no partitions or does not exist.
- Return type:
set
- partitions_for_topic(topic)[source]
Return set of all partitions for topic (whether available or not)
- Parameters:
topic (str) – topic to check for partitions
- Returns:
{partition (int), …} None if topic not found.
- Return type:
set
- async refresh_metadata(node_id=None)[source]
Send one MetadataRequest and apply the response.
Concurrent callers share a single in-flight request: if a refresh is already underway, additional callers await the same Future and see the same outcome (success or exception). This avoids duplicate broker requests when bootstrap and the refresh loop race, or when external callers invoke refresh while the loop is mid-flight.
- request_update()[source]
Flags metadata for update, return Future()
Actual update must be handled separately. This method will only change the reported ttl()
- Returns:
kafka.future.Future (value will be the cluster object after update)
- set_topics(topics)[source]
Set specific topics to track for metadata.
- Parameters:
topics (list of str) – topics to check for metadata
- Returns:
resolves after metadata request/response
- Return type:
Future
- start_refresh_loop()[source]
Spawn the periodic refresh coroutine. Idempotent. Triggers bootstrap if needed.
- topic_id(topic_name)[source]
Return the topic UUID for
topic_name, or None if unknown.Populated from MetadataResponse v10+ (Kafka 2.8+, KIP-516). Older responses leave this empty.
- topic_name_for_id(topic_id)[source]
Return the topic name for
topic_id(uuid.UUID), or None.Reverse lookup of
topic_id(). Populated from MetadataResponse v10+ (KIP-516).
- topics(exclude_internal_topics=True)[source]
Get set of known topics.
- Parameters:
exclude_internal_topics (bool) – Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Default True
- Returns:
{topic (str), …}
- Return type:
set
- update_metadata(metadata)[source]
Update cluster state given a MetadataResponse.
- Parameters:
metadata (MetadataResponse) – broker response to a metadata request
Returns: None
- update_partition_leader(partition, leader_id, leader_epoch)[source]
Apply a KIP-951 current-leader hint from a Fetch/Produce response.
The cached leader id and epoch for
partitionare replaced only whenleader_epochis strictly newer than the cached value (and non-negative). When the leader id moves,_broker_partitionsis rewired so leader-based routing follows immediately.- Parameters:
partition (TopicPartition) – topic / partition the hint is about.
leader_id (int) – broker id named as the new leader.
leader_epoch (int) – epoch of that new leader.
- Returns:
True iff cached state was changed.
- Return type:
bool