class kafka.cluster.ClusterMetadata(**configs)[source]

A class to manage kafka cluster metadata.

Keyword Arguments:
  • retry_backoff_ms (int) – Milliseconds to backoff when retrying on errors. Default: 100.

  • metadata_max_age_ms (int) – The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000

  • bootstrap_servers – ‘host[:port]’ string (or list of ‘host[:port]’ strings) that the client should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092.

  • allow_auto_create_topics (bool) – Enable/disable auto topic creation on metadata request. Only available with api_version >= (0, 11). Default: True

add_coordinator(response, key_type, key)[source]

Update with metadata for a group or txn coordinator

Parameters:
  • response (FindCoordinatorResponse) – broker response

  • key_type (str) – ‘group’ or ‘transaction’

  • key (str) – consumer_group or transactional_id

Returns:

coordinator node_id if metadata is updated, None on error

Return type:

string

add_listener(listener)[source]

Add a callback function to be called on each metadata update

add_topic(topic)[source]

Add a topic to the list of topics tracked via metadata.

Parameters:

topic (str) – topic to track

Returns:

resolves after metadata request/response

Return type:

Future

Raises:
  • TypeError – if topic is not a string

  • ValueError – if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length

attach(manager)[source]

Wire this cluster to its connection manager.

Construction is split from attach so ClusterMetadata can be built standalone (tests, snapshots) without a live manager. The reference is held via weakref.proxy so that manager <-> cluster does not form a GC cycle; manager.close() still calls cluster.close() to clear eagerly.

available_partitions_for_topic(topic)[source]

Return set of partitions with known leaders

Parameters:

topic (str) – topic to check for partitions

Returns:

{partition (int), …} None if topic not found.

Return type:

set

bootstrap_brokers()[source]

Get bootstrap brokers only, extracted from the bootstrap_servers config option. Node ids are synthesized as ‘bootstrap-0’ etc.

Returns:

[MetadataResponseBroker, …]

Return type:

list

broker_metadata(broker_id)[source]

Get MetadataResponseBroker

Parameters:

broker_id (int or str) – node_id for a broker to check

Returns:

MetadataResponseBroker or None if not found

brokers()[source]

Get all MetadataResponseBroker

Returns:

[MetadataResponseBroker, …]

Return type:

list

coordinator_for_group(group)[source]

Return node_id of group coordinator.

Parameters:

group (str) – name of consumer group

Returns:

node_id (int or str) for group coordinator, -1 if coordinator unknown None if the group does not exist.

failed_update(exception)[source]

Update cluster state given a failed MetadataRequest.

is_replica_node(partition, node_id)[source]

Return MetadataResponseBroker for node_id only when it is known AND still listed as a replica of partition (KIP-392).

Used by the consumer’s preferred-read-replica routing to avoid sending fetches to a broker that has been demoted out of the partition’s replica set even though it still exists as a node.

Parameters:
  • partition (TopicPartition) – topic / partition to look up.

  • node_id (int) – broker id to validate.

Returns:

MetadataResponseBroker if the node exists in cluster metadata and is currently listed as a replica of partition; otherwise None.

leader_epoch_for_partition(partition)[source]

Return leader_epoch for partition, or None if topic/partition is unknown.

leader_for_partition(partition)[source]

Return node_id of leader, -1 unavailable, None if unknown.

property metadata_refresh_in_progress

True if a refresh is mid-flight.

partitions_for_broker(broker_id)[source]

Return TopicPartitions for which the broker is a leader.

Parameters:

broker_id (int or str) – node id for a broker

Returns:

{TopicPartition, …} None if the broker either has no partitions or does not exist.

Return type:

set

partitions_for_topic(topic)[source]

Return set of all partitions for topic (whether available or not)

Parameters:

topic (str) – topic to check for partitions

Returns:

{partition (int), …} None if topic not found.

Return type:

set

refresh_backoff()[source]

Return milliseconds to wait before attempting to retry after failure

async refresh_metadata(node_id=None)[source]

Send one MetadataRequest and apply the response.

Concurrent callers share a single in-flight request: if a refresh is already underway, additional callers await the same Future and see the same outcome (success or exception). This avoids duplicate broker requests when bootstrap and the refresh loop race, or when external callers invoke refresh while the loop is mid-flight.

remove_listener(listener)[source]

Remove a previously added listener callback.

request_update()[source]

Flags metadata for update, return Future()

Actual update must be handled separately. This method will only change the reported ttl()

Returns:

kafka.future.Future (value will be the cluster object after update)

set_topics(topics)[source]

Set specific topics to track for metadata.

Parameters:

topics (list of str) – topics to check for metadata

Returns:

resolves after metadata request/response

Return type:

Future

start_refresh_loop()[source]

Spawn the periodic refresh coroutine. Idempotent. Triggers bootstrap if needed.

topic_id(topic_name)[source]

Return the topic UUID for topic_name, or None if unknown.

Populated from MetadataResponse v10+ (Kafka 2.8+, KIP-516). Older responses leave this empty.

topic_name_for_id(topic_id)[source]

Return the topic name for topic_id (uuid.UUID), or None.

Reverse lookup of topic_id(). Populated from MetadataResponse v10+ (KIP-516).

topics(exclude_internal_topics=True)[source]

Get set of known topics.

Parameters:

exclude_internal_topics (bool) – Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Default True

Returns:

{topic (str), …}

Return type:

set

ttl()[source]

Milliseconds until metadata should be refreshed

update_metadata(metadata)[source]

Update cluster state given a MetadataResponse.

Parameters:

metadata (MetadataResponse) – broker response to a metadata request

Returns: None

update_partition_leader(partition, leader_id, leader_epoch)[source]

Apply a KIP-951 current-leader hint from a Fetch/Produce response.

The cached leader id and epoch for partition are replaced only when leader_epoch is strictly newer than the cached value (and non-negative). When the leader id moves, _broker_partitions is rewired so leader-based routing follows immediately.

Parameters:
  • partition (TopicPartition) – topic / partition the hint is about.

  • leader_id (int) – broker id named as the new leader.

  • leader_epoch (int) – epoch of that new leader.

Returns:

True iff cached state was changed.

Return type:

bool