Source code for kafka.cluster

import collections
import copy
import logging
import random
import re
import socket
import threading
import time
import uuid
import weakref

from kafka import errors as Errors
from kafka.future import Future
from kafka.net.wakeup_notifier import WakeupNotifier
from kafka.protocol.metadata import MetadataRequest, MetadataResponse
from kafka.structs import TopicPartition
from kafka.util import ensure_valid_topic_name

log = logging.getLogger(__name__)


[docs] class ClusterMetadata: """ A class to manage kafka cluster metadata. Keyword Arguments: retry_backoff_ms (int): Milliseconds to backoff when retrying on errors. Default: 100. metadata_max_age_ms (int): The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000 bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' strings) that the client should contact to bootstrap initial cluster metadata. This does not have to be the full node list. It just needs to have at least one broker that will respond to a Metadata API Request. Default port is 9092. If no servers are specified, will default to localhost:9092. allow_auto_create_topics (bool): Enable/disable auto topic creation on metadata request. Only available with api_version >= (0, 11). Default: True """ DEFAULT_CONFIG = { 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, 'bootstrap_servers': [], 'allow_auto_create_topics': True, 'client_dns_lookup': 'use_all_dns_ips', } def __init__(self, **configs): self._manager = None self._topics = set() self._brokers = {} # node_id -> MetadataResponseBroker self._partitions = {} # topic -> partition -> PartitionMetadata self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...} self._topic_ids = {} # topic name -> uuid.UUID self._topic_names_by_id = {} # uuid.UUID -> topic name self._coordinators = {} # (key_type, key) -> node_id self._last_refresh_ms = 0 self._last_successful_refresh_ms = 0 self._need_update = True self._future = None self._listeners = set() self._lock = threading.Lock() self.need_all_topic_metadata = False self.unauthorized_topics = set() self.internal_topics = set() self.controller = None self.cluster_id = None self._refresh_loop_future = None self._refresh_future = None self._wakeup = None self.closed = False self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs[key] self._bootstrap_brokers = self._generate_bootstrap_brokers() self._coordinator_brokers = {} @property def metadata_refresh_in_progress(self): """True if a refresh is mid-flight.""" return self._refresh_future is not None and not self._refresh_future.is_done
[docs] def attach(self, manager): """Wire this cluster to its connection manager. Construction is split from attach so ClusterMetadata can be built standalone (tests, snapshots) without a live manager. The reference is held via weakref.proxy so that manager <-> cluster does not form a GC cycle; manager.close() still calls cluster.close() to clear eagerly. """ self._manager = weakref.proxy(manager) self._wakeup = WakeupNotifier(self._manager._net)
def close(self): self.closed = True self._wakeup.notify()
[docs] def start_refresh_loop(self): """Spawn the periodic refresh coroutine. Idempotent. Triggers bootstrap if needed.""" if self._manager is None: raise RuntimeError('start_refresh_loop requires prior attach()') if self._refresh_loop_future is not None: return self._refresh_loop_future = self._manager.call_soon(self._refresh_loop)
async def _refresh_loop(self): """Awaits ttl() then triggers refresh_metadata(); request_update() wakes early.""" if self._manager is None: raise RuntimeError('start_refresh_loop requires prior attach()') if not self._manager.bootstrapped: log.debug('Metadata refresh loop needs bootstrap...') await self._manager.bootstrap_async() log.info('Starting metadata refresh loop') while not self.closed: if self.metadata_refresh_in_progress: await self._refresh_future ttl_ms = self.ttl() if ttl_ms == 0: try: await self.refresh_metadata() except Errors.KafkaError as exc: log.debug('Metadata refresh failed: %s', exc) log.exception(exc) continue try: log.debug('Sleeping %s for next Metadata refresh', ttl_ms / 1000) await self._wakeup(ttl_ms / 1000) except Exception as exc: log.error('Metadata refresh loop error: %s', exc) log.info('Stopping metadata refresh loop')
[docs] async def refresh_metadata(self, node_id=None): """Send one MetadataRequest and apply the response. Concurrent callers share a single in-flight request: if a refresh is already underway, additional callers await the same Future and see the same outcome (success or exception). This avoids duplicate broker requests when bootstrap and the refresh loop race, or when external callers invoke refresh while the loop is mid-flight. """ if self._manager is None: raise RuntimeError('refresh_metadata requires prior attach()') if self.metadata_refresh_in_progress: log.debug('Metadata refresh already in flight; awaiting existing') await self._refresh_future return self._refresh_future = Future() try: await self._do_refresh_metadata(node_id) except Exception as exc: self._refresh_future.failure(exc) raise else: self._refresh_future.success(None)
async def _do_refresh_metadata(self, node_id): node_id = self._manager.least_loaded_node() if node_id is None else node_id if node_id is None: log.warning('No node available for metadata refresh - backoff/retry') self._manager.update_backoff('metadata') raise Errors.NodeNotReadyError('metadata') else: self._manager.reset_backoff('metadata') log.info(f'Metadata refresh (node_id={node_id})') try: request = self.metadata_request() log.debug("Sending metadata request %s to node %s", request, node_id) response = await self._manager.send(request, node_id) except Exception as exc: log.error('Metadata refresh: failed %s', exc) self.failed_update(exc) raise log.debug('Metadata refresh: success') self.update_metadata(response) def _generate_bootstrap_brokers(self): # collect_hosts does not perform DNS, so we should be fine to re-use bootstrap_hosts = collect_hosts(self.config['bootstrap_servers']) if self.config['client_dns_lookup'] == 'resolve_canonical_bootstrap_servers_only': bootstrap_hosts = expand_to_canonical_bootstrap_hosts(bootstrap_hosts) brokers = {} for i, (host, port, _) in enumerate(bootstrap_hosts): node_id = 'bootstrap-%s' % i brokers[node_id] = MetadataResponse.MetadataResponseBroker(node_id, host, port, None) return brokers def is_bootstrap(self, node_id): return node_id in self._bootstrap_brokers
[docs] def set_topics(self, topics): """Set specific topics to track for metadata. Arguments: topics (list of str): topics to check for metadata Returns: Future: resolves after metadata request/response """ for topic in topics: ensure_valid_topic_name(topic) if not set(topics).difference(self._topics): return Future().success(self) # TODO: handle future when old metadata request is currently in-flight # TODO: handle future when set_topics called multiple times before new request self._topics = set(topics) return self.request_update()
[docs] def add_topic(self, topic): """Add a topic to the list of topics tracked via metadata. Arguments: topic (str): topic to track Returns: Future: resolves after metadata request/response Raises: TypeError: if topic is not a string ValueError: if topic is invalid: must be chars (a-zA-Z0-9._-), and less than 250 length """ ensure_valid_topic_name(topic) if topic in self._topics: return Future().success(self) # TODO: handle future when old metadata request is currently in-flight self._topics.add(topic) return self.request_update()
[docs] def brokers(self): """Get all MetadataResponseBroker Returns: list: [MetadataResponseBroker, ...] """ return list(self._brokers.values())
[docs] def bootstrap_brokers(self): """Get bootstrap brokers only, extracted from the bootstrap_servers config option. Node ids are synthesized as 'bootstrap-0' etc. Returns: list: [MetadataResponseBroker, ...] """ return list(self._bootstrap_brokers.values())
[docs] def broker_metadata(self, broker_id): """Get MetadataResponseBroker Arguments: broker_id (int or str): node_id for a broker to check Returns: MetadataResponseBroker or None if not found """ return ( self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id) or self._coordinator_brokers.get(broker_id) )
[docs] def partitions_for_topic(self, topic): """Return set of all partitions for topic (whether available or not) Arguments: topic (str): topic to check for partitions Returns: set: {partition (int), ...} None if topic not found. """ if topic not in self._partitions: return None return set(self._partitions[topic].keys())
[docs] def available_partitions_for_topic(self, topic): """Return set of partitions with known leaders Arguments: topic (str): topic to check for partitions Returns: set: {partition (int), ...} None if topic not found. """ if topic not in self._partitions: return None return set([partition for partition, metadata in self._partitions[topic].items() if metadata.leader_id != -1])
[docs] def leader_for_partition(self, partition): """Return node_id of leader, -1 unavailable, None if unknown.""" if partition.topic not in self._partitions: return None elif partition.partition not in self._partitions[partition.topic]: return None return self._partitions[partition.topic][partition.partition].leader_id
[docs] def is_replica_node(self, partition, node_id): """Return MetadataResponseBroker for ``node_id`` only when it is known AND still listed as a replica of ``partition`` (KIP-392). Used by the consumer's preferred-read-replica routing to avoid sending fetches to a broker that has been demoted out of the partition's replica set even though it still exists as a node. Arguments: partition (TopicPartition): topic / partition to look up. node_id (int): broker id to validate. Returns: MetadataResponseBroker if the node exists in cluster metadata and is currently listed as a replica of ``partition``; otherwise None. """ broker = self.broker_metadata(node_id) if broker is None: return None if partition.topic not in self._partitions: return None partition_data = self._partitions[partition.topic].get(partition.partition) if partition_data is None: return None if node_id not in partition_data.replica_nodes: return None return broker
[docs] def leader_epoch_for_partition(self, partition): """Return leader_epoch for partition, or None if topic/partition is unknown.""" if partition.topic not in self._partitions: return None elif partition.partition not in self._partitions[partition.topic]: return None return self._partitions[partition.topic][partition.partition].leader_epoch
[docs] def update_partition_leader(self, partition, leader_id, leader_epoch): """Apply a KIP-951 current-leader hint from a Fetch/Produce response. The cached leader id and epoch for ``partition`` are replaced only when ``leader_epoch`` is strictly newer than the cached value (and non-negative). When the leader id moves, ``_broker_partitions`` is rewired so leader-based routing follows immediately. Arguments: partition (TopicPartition): topic / partition the hint is about. leader_id (int): broker id named as the new leader. leader_epoch (int): epoch of that new leader. Returns: bool: True iff cached state was changed. """ with self._lock: p_data = self._partitions.get(partition.topic, {}).get(partition.partition) if p_data is None: return False if leader_epoch < 0 or leader_epoch <= p_data.leader_epoch: return False old_leader = p_data.leader_id p_data.leader_id = leader_id p_data.leader_epoch = leader_epoch if old_leader != leader_id: if old_leader in self._broker_partitions: self._broker_partitions[old_leader].discard(partition) if leader_id != -1: self._broker_partitions[leader_id].add(partition) return True
[docs] def partitions_for_broker(self, broker_id): """Return TopicPartitions for which the broker is a leader. Arguments: broker_id (int or str): node id for a broker Returns: set: {TopicPartition, ...} None if the broker either has no partitions or does not exist. """ return self._broker_partitions.get(broker_id)
[docs] def coordinator_for_group(self, group): """Return node_id of group coordinator. Arguments: group (str): name of consumer group Returns: node_id (int or str) for group coordinator, -1 if coordinator unknown None if the group does not exist. """ return self._coordinators.get(('group', group))
[docs] def ttl(self): """Milliseconds until metadata should be refreshed""" now = time.monotonic() * 1000 if self._manager is not None and self._manager.connection_delay('metadata'): # Exponential backoff - KIP-580 return self._manager.connection_delay('metadata') * 1000 elif self._need_update: ttl = 0 else: metadata_age = now - self._last_successful_refresh_ms ttl = self.config['metadata_max_age_ms'] - metadata_age retry_age = now - self._last_refresh_ms next_retry = self.config['retry_backoff_ms'] - retry_age return max(ttl, next_retry, 0)
[docs] def refresh_backoff(self): """Return milliseconds to wait before attempting to retry after failure""" return self.config['retry_backoff_ms']
[docs] def request_update(self): """Flags metadata for update, return Future() Actual update must be handled separately. This method will only change the reported ttl() Returns: kafka.future.Future (value will be the cluster object after update) """ with self._lock: self._need_update = True if not self._future or self._future.is_done: self._future = Future() ret = self._future if self._manager: self.start_refresh_loop() self._wakeup.notify() return ret
@property def need_update(self): return self._need_update
[docs] def topics(self, exclude_internal_topics=True): """Get set of known topics. Arguments: exclude_internal_topics (bool): Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to True the only way to receive records from an internal topic is subscribing to it. Default True Returns: set: {topic (str), ...} """ topics = set(self._partitions.keys()) if exclude_internal_topics: return topics - self.internal_topics else: return topics
def metadata_request(self): if self.need_all_topic_metadata: topics = MetadataRequest.ALL_TOPICS elif not self._topics: topics = MetadataRequest.NO_TOPICS else: topics = [MetadataRequest.MetadataRequestTopic(name=topic) for topic in self._topics] return MetadataRequest( topics=topics, allow_auto_topic_creation=self.config['allow_auto_create_topics'], include_cluster_authorized_operations=False, include_topic_authorized_operations=False, )
[docs] def topic_id(self, topic_name): """Return the topic UUID for ``topic_name``, or None if unknown. Populated from MetadataResponse v10+ (Kafka 2.8+, KIP-516). Older responses leave this empty. """ return self._topic_ids.get(topic_name)
[docs] def topic_name_for_id(self, topic_id): """Return the topic name for ``topic_id`` (uuid.UUID), or None. Reverse lookup of :meth:`topic_id`. Populated from MetadataResponse v10+ (KIP-516). """ return self._topic_names_by_id.get(topic_id)
[docs] def failed_update(self, exception): """Update cluster state given a failed MetadataRequest.""" f = None with self._lock: if self._future: f = self._future self._future = None self._last_refresh_ms = time.monotonic() * 1000 if f: f.failure(exception)
[docs] def update_metadata(self, metadata): """Update cluster state given a MetadataResponse. Arguments: metadata (MetadataResponse): broker response to a metadata request Returns: None """ if not metadata.brokers: log.warning("No broker metadata found in MetadataResponse -- ignoring.") return self.failed_update(Errors.MetadataEmptyBrokerList(metadata)) _new_brokers = {} for broker in metadata.brokers: if metadata.API_VERSION == 0: node_id, host, port = broker rack = None else: node_id, host, port, rack = broker _new_brokers.update({ node_id: MetadataResponse.MetadataResponseBroker(node_id, host, port, rack) }) if metadata.API_VERSION == 0: _new_controller = None else: _new_controller = _new_brokers.get(metadata.controller_id) if metadata.API_VERSION < 2: _new_cluster_id = None else: _new_cluster_id = metadata.cluster_id _new_partitions = {} _new_broker_partitions = collections.defaultdict(set) _new_unauthorized_topics = set() _new_internal_topics = set() _new_topic_ids = {} _new_topic_names_by_id = {} _retry_topics = set() # KAFKA-9212: pre-2.4 brokers may emit stale leader_epoch values # during partition reassignment (the controller failed to update # its own cached epoch before sending UpdateMetadata, so the # propagated epoch lags behind the actual leader's). Caching that # stale value would loop us on FENCED_LEADER_EPOCH for every # subsequent ListOffsets / Fetch / OffsetCommit. The Java client's # fix gates on response version >= 9 (Kafka 2.4+, where the # controller bug is fixed); we do the same. epoch_reliable = metadata.API_VERSION >= 9 for t in metadata.topics: topic = t.name if t.is_internal: _new_internal_topics.add(topic) error_type = Errors.for_code(t.error_code) new_topic_id = t.topic_id recreated = False if new_topic_id is not None and topic is not None: prior = self._topic_ids.get(topic) if prior is not None and prior != new_topic_id: log.warning( "Topic %s topic_id changed from %s to %s -- likely" " recreated; resetting cached leader epochs.", topic, prior, new_topic_id) recreated = True _new_topic_ids[topic] = new_topic_id _new_topic_names_by_id[new_topic_id] = topic if error_type is Errors.NoError: _new_partitions[topic] = {} for p_data in t.partitions: partition = p_data.partition_index if not epoch_reliable or recreated: p_data.leader_epoch = -1 _new_partitions[topic][partition] = p_data if p_data.leader_id != -1: _new_broker_partitions[p_data.leader_id].add( TopicPartition(topic, partition)) # Only log errors for topics we are specifically tracking elif topic in self._topics: if error_type.retriable: _retry_topics.add(topic) if error_type is Errors.LeaderNotAvailableError: log.warning("Topic %s is not available during auto-create" " initialization", topic) elif error_type is Errors.UnknownTopicOrPartitionError: log.error("Topic %s not found in cluster metadata", topic) elif error_type is Errors.TopicAuthorizationFailedError: log.error("Topic %s is not authorized for this client", topic) _new_unauthorized_topics.add(topic) elif error_type is Errors.InvalidTopicError: log.error("'%s' is not a valid topic name", topic) if topic in self._topics: self._topics.remove(topic) else: log.error("Error fetching metadata for topic %s: %s", topic, error_type) with self._lock: self._brokers = _new_brokers self.controller = _new_controller self.cluster_id = _new_cluster_id self._partitions = _new_partitions self._broker_partitions = _new_broker_partitions self.unauthorized_topics = _new_unauthorized_topics self.internal_topics = _new_internal_topics # Pre-v10 responses don't carry topic_id, so the wholesale swap # would clobber known ids during a rolling downgrade (or any # cross-broker version skew). Only replace the index when the # response actually had a chance to populate it. if metadata.API_VERSION >= 10: self._topic_ids = _new_topic_ids self._topic_names_by_id = _new_topic_names_by_id self._need_update = len(_retry_topics) > 0 f = None if self._future: f = self._future self._future = None now = time.monotonic() * 1000 self._last_refresh_ms = now self._last_successful_refresh_ms = now if f: # In the common case where we ask for a single topic and get back an # error, we should fail the future if len(metadata.topics) == 1 and metadata.topics[0][0] != Errors.NoError.errno: error_code, topic = metadata.topics[0][:2] error = Errors.for_code(error_code)(topic) f.failure(error) else: f.success(self) log.info("Updated metadata: %s", self) for listener in self._listeners: listener(self)
[docs] def add_listener(self, listener): """Add a callback function to be called on each metadata update""" self._listeners.add(listener)
[docs] def remove_listener(self, listener): """Remove a previously added listener callback.""" try: self._listeners.remove(listener) except KeyError: pass
[docs] def add_coordinator(self, response, key_type, key): """Update with metadata for a group or txn coordinator Arguments: response (FindCoordinatorResponse): broker response key_type (str): 'group' or 'transaction' key (str): consumer_group or transactional_id Returns: string: coordinator node_id if metadata is updated, None on error """ log.debug("Updating coordinator for %s/%s: %s", key_type, key, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: log.error("FindCoordinatorResponse error: %s", error_type) self._coordinators[(key_type, key)] = -1 return # Use a coordinator-specific node id so that requests # get a dedicated connection node_id = 'coordinator-{}'.format(response.node_id) coordinator = MetadataResponse.MetadataResponseBroker( node_id, response.host, response.port, None) log.info("Coordinator for %s/%s is %s", key_type, key, coordinator) self._coordinator_brokers[node_id] = coordinator self._coordinators[(key_type, key)] = node_id return node_id
def __str__(self): return 'ClusterMetadata(brokers: %d, topics: %d, coordinators: %d)' % \ (len(self._brokers), len(self._partitions), len(self._coordinators))
def collect_hosts(hosts, randomize=True): """ Processes a list (or comma-separated string) of hosts strings (host:port) and returns a list of (host, port, family) tuples. Optionally randomizes the returned list. """ if isinstance(hosts, str): hosts = hosts.strip().split(',') result = [] for host_port in hosts: # ignore leading SECURITY_PROTOCOL:// to mimic java client host_port = re.sub('^.*://', '', host_port) host, port, afi = get_ip_port_afi(host_port) result.append((host, port, afi)) if randomize: random.shuffle(result) return result def expand_to_canonical_bootstrap_hosts(hosts): """Expand each bootstrap entry to one entry per canonical FQDN. Mirrors Java's ``client.dns.lookup=resolve_canonical_bootstrap_servers_only``: forward-resolve each host, take the ``canonname`` reported by the resolver, and emit one bootstrap entry per unique canonical name. Useful for Kerberos round-robin DNS deployments where the principal must match each individual broker FQDN. If a host fails to resolve, the original entry is preserved verbatim -- matching Java's best-effort behaviour so bootstrap doesn't fail outright. """ expanded = [] for host, port, afi in hosts: try: addrinfos = socket.getaddrinfo( host, port, afi, socket.SOCK_STREAM, 0, socket.AI_CANONNAME) except socket.gaierror as exc: log.warning('Canonical bootstrap resolution failed for %s:%s: %s; ' 'keeping original entry', host, port, exc) expanded.append((host, port, afi)) continue seen = set() for family, _socktype, _proto, canonname, _sockaddr in addrinfos: name = canonname or host if name in seen: continue seen.add(name) expanded.append((name, port, family)) return expanded def _address_family(address): """ Attempt to determine the family of an address (or hostname) :return: either socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC if the address family could not be determined """ if address.startswith('[') and address.endswith(']'): return socket.AF_INET6 for af in (socket.AF_INET, socket.AF_INET6): try: socket.inet_pton(af, address) return af except (ValueError, AttributeError, socket.error): continue return socket.AF_UNSPEC DEFAULT_KAFKA_PORT = 9092 def get_ip_port_afi(host_and_port_str): """ Parse the IP and port from a string in the format of: * host_or_ip <- Can be either IPv4 address literal or hostname/fqdn * host_or_ipv4:port <- Can be either IPv4 address literal or hostname/fqdn * [host_or_ip] <- IPv6 address literal * [host_or_ip]:port. <- IPv6 address literal .. note:: IPv6 address literals with ports *must* be enclosed in brackets .. note:: If the port is not specified, default will be returned. :return: tuple (host, port, afi), afi will be socket.AF_INET or socket.AF_INET6 or socket.AF_UNSPEC """ host_and_port_str = host_and_port_str.strip() if host_and_port_str.startswith('['): af = socket.AF_INET6 host, rest = host_and_port_str[1:].split(']') if rest: port = int(rest[1:]) else: port = DEFAULT_KAFKA_PORT return host, port, af else: if ':' not in host_and_port_str: af = _address_family(host_and_port_str) return host_and_port_str, DEFAULT_KAFKA_PORT, af else: # now we have something with a colon in it and no square brackets. It could be # either an IPv6 address literal (e.g., "::1") or an IP:port pair or a host:port pair try: # if it decodes as an IPv6 address, use that socket.inet_pton(socket.AF_INET6, host_and_port_str) return host_and_port_str, DEFAULT_KAFKA_PORT, socket.AF_INET6 except AttributeError: log.warning('socket.inet_pton not available on this platform.' ' consider `pip install win_inet_pton`') pass except (ValueError, socket.error): # it's a host:port pair pass host, port = host_and_port_str.rsplit(':', 1) port = int(port) af = _address_family(host) return host, port, af