Source code for kafka.net.manager

import copy
import logging
import inspect
import random
import socket
import ssl
import time

from .inet import create_connection
from .connection import KafkaConnection
from .metrics import KafkaManagerMetrics
from .transport import KafkaSSLTransport, KafkaTCPTransport
from kafka.cluster import ClusterMetadata
import kafka.errors as Errors
from kafka.net.wakeup_notifier import WakeupNotifier
from kafka.protocol.broker_version_data import BrokerVersionData
from kafka.future import Future
from kafka.version import __version__


log = logging.getLogger(__name__)

[docs] class KafkaConnectionManager: DEFAULT_CONFIG = { 'bootstrap_servers': 'localhost:9092', 'client_id': 'kafka-python-' + __version__, 'client_software_name': 'kafka-python', 'client_software_version': __version__, 'receive_message_max_bytes': 1000000, 'reconnect_backoff_ms': 50, 'reconnect_backoff_max_ms': 30000, 'request_timeout_ms': 30000, 'socket_connection_setup_timeout_ms': 10000, 'socket_connection_setup_timeout_max_ms': 30000, 'socket_options': [ (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), ], 'max_in_flight_requests_per_connection': 5, 'connections_max_idle_ms': 9 * 60 * 1000, 'security_protocol': 'PLAINTEXT', 'ssl_context': None, 'ssl_check_hostname': True, 'ssl_cafile': None, 'ssl_certfile': None, 'ssl_keyfile': None, 'ssl_password': None, 'ssl_crlfile': None, 'sasl_mechanism': None, 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_name': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'proxy_url': None, 'api_version': None, 'metrics': None, 'metric_group_prefix': '', 'metadata_max_age_ms': 300000, 'client_dns_lookup': 'use_all_dns_ips', } _VALID_DNS_LOOKUP_MODES = ('use_all_dns_ips', 'resolve_canonical_bootstrap_servers_only') def __init__(self, net, **configs): self.config = copy.copy(self.DEFAULT_CONFIG) for key in self.config: if key in configs: self.config[key] = configs[key] if self.config['client_dns_lookup'] not in self._VALID_DNS_LOOKUP_MODES: raise ValueError( "client_dns_lookup must be one of %s; got %r" % (self._VALID_DNS_LOOKUP_MODES, self.config['client_dns_lookup'])) if 'socks5_proxy' in configs: if self.config['proxy_url'] is None: log.warning('socks5_proxy is deprecated, use proxy_url instead') self.config['proxy_url'] = configs['socks5_proxy'] self._net = net self.cluster = ClusterMetadata( bootstrap_servers=self.config['bootstrap_servers'], metadata_max_age_ms=self.config['metadata_max_age_ms'], client_dns_lookup=self.config['client_dns_lookup'], ) self.cluster.attach(self) self._conns = {} self._backoff = dict() # node_id => (failures, backoff_until, socket_connect_setup_timeout_ms) # Cache the most recent SASL / SSL / auth failure per node so we can # surface it to the user instead of silently retrying forever. # Cleared on successful connect. self._auth_failures = {} # node_id => AuthenticationError self._idle_check_delay = self.config['connections_max_idle_ms'] / 1000 self.close_idle_connections() self.broker_version_data = None self._bootstrap_future = None self._bootstrap_wakeup = WakeupNotifier(self._net) if self.config['metrics']: self._sensors = KafkaManagerMetrics( self.config['metrics'], self.config['metric_group_prefix'], self._conns) else: self._sensors = None if self.config['api_version'] is not None: self.broker_version_data = BrokerVersionData(self.config['api_version']) self.closed = False @property def broker_version(self): if self.broker_version_data is None: return None return self.broker_version_data.broker_version def least_used_connections(self): return sorted(filter(lambda conn: conn.connected, self._conns.values()), key=lambda conn: conn.transport.last_activity) async def _do_bootstrap(self, deadline): while not self.closed and (deadline is None or time.monotonic() < deadline): bootstrap_broker = random.choice(self.cluster.bootstrap_brokers()) log.debug('Attempting bootstrap with %s', bootstrap_broker) try: timeout_ms = (deadline - time.monotonic()) * 1000 if deadline is not None else None conn = self.get_connection(bootstrap_broker.node_id, timeout_ms=timeout_ms, pop_on_close=False, refresh_metadata_on_err=False, reset_backoff_on_connect=False) except Errors.NodeNotReadyError: delay = self.connection_delay(bootstrap_broker.node_id) if deadline is not None: delay = min(delay, max(0, deadline - time.monotonic())) log.debug('Bootstrap %s NodeNotReadyError: backoff %s', bootstrap_broker, delay) await self._bootstrap_wakeup(delay) continue try: await conn except Errors.IncompatibleBrokerVersion: log.error('Did you attempt to connect to a kafka controller (no metadata support)?') raise except Exception as exc: self._conns.pop(bootstrap_broker.node_id, conn).close(exc) continue try: await self.cluster.refresh_metadata(bootstrap_broker.node_id) if not self.cluster.brokers(): log.warning('Bootstrap metadata response has no brokers. Retrying.') self.update_backoff(bootstrap_broker.node_id) continue except Exception as exc: log.error(f'Bootstrap attempt to {bootstrap_broker.node_id} failed: {exc}') self.update_backoff(bootstrap_broker.node_id) continue else: self.reset_backoff(bootstrap_broker.node_id) self.cluster.start_refresh_loop() log.info('Bootstrap complete: %s', self.cluster) return True finally: self._conns.pop(bootstrap_broker.node_id, conn).close() else: raise Errors.KafkaTimeoutError( 'Unable to bootstrap from %s' % (self.cluster.config['bootstrap_servers'],)) def bootstrap_async(self, timeout_ms=None, refresh=True): if self._bootstrap_future is not None and (not refresh or not self._bootstrap_future.is_done): return self._bootstrap_future deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000 log.debug('Starting new bootstrap') self._bootstrap_future = self.call_soon(self._do_bootstrap, deadline) self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc)) return self._bootstrap_future def bootstrap(self, timeout_ms=None, refresh=True): self._net.run(self.bootstrap_async, timeout_ms, refresh) @property def bootstrapped(self): return self._bootstrap_future is not None and self._bootstrap_future.succeeded() def _connection_idle_at(self, conn): return conn.transport.last_activity + self._idle_check_delay def close_idle_connections(self): for conn in self.least_used_connections(): next_idle_at = self._connection_idle_at(conn) if time.monotonic() >= next_idle_at: log.info('Closing idle connection to node %s', conn.node_id) conn.close() else: break else: next_idle_at = time.monotonic() + self._idle_check_delay log.debug('Next idle connections check in %d secs', next_idle_at - time.monotonic()) self._net.call_at(next_idle_at, self.close_idle_connections) @property def ssl_enabled(self): return self.config['security_protocol'] in ('SSL', 'SASL_SSL') def _build_ssl_context(self): if self.config['ssl_context'] is not None: return self.config['ssl_context'] ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.minimum_version = ssl.TLSVersion.TLSv1_2 ctx.check_hostname = self.config['ssl_check_hostname'] if self.config['ssl_cafile']: ctx.load_verify_locations(self.config['ssl_cafile']) else: ctx.load_default_certs() if self.config['ssl_certfile']: ctx.load_cert_chain( certfile=self.config['ssl_certfile'], keyfile=self.config['ssl_keyfile'], password=self.config['ssl_password'], ) if self.config['ssl_crlfile']: ctx.load_verify_locations(crl=self.config['ssl_crlfile']) ctx.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF return ctx async def _build_transport(self, node, timeout_at=None): sock = await create_connection(self._net, node.host, node.port, self.config['socket_options'], proxy_url=self.config['proxy_url'], timeout_at=timeout_at) if self.ssl_enabled: transport = KafkaSSLTransport(self._net, sock, self._build_ssl_context(), host=node.host, ssl_check_hostname=self.config['ssl_check_hostname']) else: transport = KafkaTCPTransport(self._net, sock, host=node.host) try: await transport.handshake() except Exception as e: raise Errors.KafkaConnectionError('Handshake failed: %s' % e) else: return transport async def _connect(self, node, conn, reset_backoff_on_connect=True, timeout_at=None): try: transport = await self._build_transport(node, timeout_at=timeout_at) conn.connection_made(transport) await conn.initialize(timeout_at=timeout_at) except Exception as exc: log.error('Connection failed: %s', exc) conn.connection_lost(exc) self.update_backoff(node.node_id) if isinstance(exc, (Errors.SaslAuthenticationFailedError, Errors.AuthorizationError)): self._auth_failures[node.node_id] = exc return if self._sensors: self._sensors.connection_created.record() if reset_backoff_on_connect: self.reset_backoff(node.node_id) self._auth_failures.pop(node.node_id, None) if conn.broker_version_data is not None: if self.cluster.is_bootstrap(node.node_id): self.broker_version_data = conn.broker_version_data def get_connection(self, node_id, timeout_ms=None, pop_on_close=True, refresh_metadata_on_err=True, reset_backoff_on_connect=True): if node_id is None: raise Errors.NodeNotReadyError('No node_id provided') self.maybe_raise_auth_failure(node_id) if self.connection_delay(node_id) > 0: raise Errors.NodeNotReadyError(node_id) elif node_id in self._conns: return self._conns[node_id] node = self.cluster.broker_metadata(node_id) if node is None: raise Errors.UnknownBrokerIdError(node_id) conn = KafkaConnection(self._net, node_id=node_id, broker_version_data=self.broker_version_data, **self.config) if pop_on_close: conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None)) if self._sensors: conn.close_future.add_both(lambda _: self._sensors.connection_closed.record()) if refresh_metadata_on_err: conn.close_future.add_errback(lambda _: self.cluster.request_update()) self._conns[node_id] = conn if timeout_ms is None: timeout_ms = self.socket_connection_setup_timeout_ms(node_id) timeout_at = time.monotonic() + timeout_ms / 1000 self._net.call_soon(lambda: self._connect(node, conn, reset_backoff_on_connect=reset_backoff_on_connect, timeout_at=timeout_at)) return conn def send(self, request, node_id=None, request_timeout_ms=None): node_id = node_id if node_id is not None else self.least_loaded_node() try: conn = self.get_connection(node_id) except Errors.NodeNotReadyError as e: return Future().failure(e) else: return conn.send_request(request, request_timeout_ms=request_timeout_ms)
[docs] def least_loaded_node(self): """Choose the node with fewest outstanding requests, with fallbacks. This method will prefer a node with an existing connection (not throttled) with no in-flight-requests. If no such node is found, a node will be chosen randomly from all nodes that are not throttled or "blacked out" (i.e., are not subject to a reconnect backoff). If no node metadata has been obtained, will return a bootstrap node. Returns: node_id or None if no suitable node was found """ nodes = [broker.node_id for broker in self.cluster.brokers()] random.shuffle(nodes) inflight = float('inf') found = None for node_id in nodes: conn = self._conns.get(node_id) connected = conn is not None and conn.connected and not conn.paused blacked_out = (conn and conn.paused) or self.connection_delay(node_id) > 0 curr_inflight = len(conn.in_flight_requests) if conn is not None else 0 if connected and curr_inflight == 0: # if we find an established connection (not throttled) # with no in-flight requests, we can stop right away return node_id elif not blacked_out and curr_inflight < inflight: # otherwise if this is the best we have found so far, record that inflight = curr_inflight found = node_id return found
def reset_backoff(self, node_id): try: del self._backoff[node_id] except KeyError: pass def jitter_pct(self): return random.uniform(0.8, 1.2) def _calculate_exp_timeout(self, key, failures): max_keys = { 'reconnect_backoff_ms': 'reconnect_backoff_max_ms', 'socket_connection_setup_timeout_ms': 'socket_connection_setup_timeout_max_ms', } timeout_ms = self.config[key] * 2 ** (failures - 1) if key in max_keys: max_ms = self.config[max_keys[key]] timeout_ms = min(max_ms, timeout_ms) return timeout_ms * self.jitter_pct() def update_backoff(self, node_id): failures, _, _ = self._backoff.get(node_id, (0, 0, 0)) failures += 1 backoff_ms = self._calculate_exp_timeout('reconnect_backoff_ms', failures) connect_ms = self._calculate_exp_timeout('socket_connection_setup_timeout_ms', failures) log.debug('%s reconnect backoff %d ms / connect timeout %d ms after %s failures', node_id, backoff_ms, connect_ms, failures) backoff_until_time = time.monotonic() + (backoff_ms / 1000) self._backoff[node_id] = (failures, backoff_until_time, connect_ms)
[docs] def connection_delay(self, node_id): """Connection delay in seconds. Uses exponential backoff/retry with jitter. See KIP-144. """ if node_id not in self._backoff: return 0 return max(0, self._backoff[node_id][1] - time.monotonic())
def socket_connection_setup_timeout_ms(self, node_id): if node_id not in self._backoff: return self.config['socket_connection_setup_timeout_ms'] return self._backoff[node_id][2]
[docs] def auth_failure(self, node_id): """Return the most recent auth-class failure for ``node_id``, or None if there is no sticky failure on record.""" return self._auth_failures.get(node_id)
[docs] def maybe_raise_auth_failure(self, node_id): """Raise the cached auth-class failure for ``node_id`` if any.""" exc = self._auth_failures.get(node_id) if exc is not None: raise exc
def close(self, node_id=None, timeout_ms=None): if node_id is not None: conn = self._conns.get(node_id) if conn is not None: conn.close() elif not self.closed: self.closed = True self._bootstrap_wakeup.notify() for conn in list(self._conns.values()): conn.close() self.cluster.close()
[docs] async def wait_for(self, future, timeout_ms): """Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout. Must be awaited from a coroutine running on this loop. The underlying future is not cancelled on timeout - it continues to run; the timeout only unblocks the awaiter. """ if timeout_ms is None: return await future wrapper = Future() def _on_success(value): if not wrapper.is_done: wrapper.success(value) def _on_failure(exc): if not wrapper.is_done: wrapper.failure(exc) future.add_callback(_on_success) future.add_errback(_on_failure) def _on_timeout(): if not wrapper.is_done: wrapper.failure(Errors.KafkaTimeoutError( 'Timed out after %s ms' % timeout_ms)) timer = self._net.call_later(timeout_ms / 1000, _on_timeout) try: return await wrapper finally: if not timer.is_done: try: self._net.unschedule(timer) except ValueError: pass
[docs] def call_soon(self, coro, *args): """Accepts a coroutine / awaitable / function and schedules it on the event loop. Thread-safe. Returns: Future """ return self._net.call_soon_with_future(coro, *args)
[docs] def run(self, coro, *args): """Schedules coro on the event loop, blocks until complete, returns value or raises. If an IO thread is running (via start()), the caller thread blocks on a cross-thread Event while the coroutine runs on the IO thread. Safe to call concurrently from multiple caller threads. If no IO thread is running, falls back to driving the loop on the caller thread (legacy behavior). """ return self._net.run(coro, *args)