import copy
import logging
import inspect
import random
import socket
import ssl
import time
from .inet import create_connection
from .connection import KafkaConnection
from .metrics import KafkaManagerMetrics
from .transport import KafkaSSLTransport, KafkaTCPTransport
from kafka.cluster import ClusterMetadata
import kafka.errors as Errors
from kafka.net.wakeup_notifier import WakeupNotifier
from kafka.protocol.broker_version_data import BrokerVersionData
from kafka.future import Future
from kafka.version import __version__
log = logging.getLogger(__name__)
[docs]
class KafkaConnectionManager:
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost:9092',
'client_id': 'kafka-python-' + __version__,
'client_software_name': 'kafka-python',
'client_software_version': __version__,
'receive_message_max_bytes': 1000000,
'reconnect_backoff_ms': 50,
'reconnect_backoff_max_ms': 30000,
'request_timeout_ms': 30000,
'socket_connection_setup_timeout_ms': 10000,
'socket_connection_setup_timeout_max_ms': 30000,
'socket_options': [
(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
],
'max_in_flight_requests_per_connection': 5,
'connections_max_idle_ms': 9 * 60 * 1000,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
'ssl_check_hostname': True,
'ssl_cafile': None,
'ssl_certfile': None,
'ssl_keyfile': None,
'ssl_password': None,
'ssl_crlfile': None,
'sasl_mechanism': None,
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_name': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'proxy_url': None,
'api_version': None,
'metrics': None,
'metric_group_prefix': '',
'metadata_max_age_ms': 300000,
'client_dns_lookup': 'use_all_dns_ips',
}
_VALID_DNS_LOOKUP_MODES = ('use_all_dns_ips', 'resolve_canonical_bootstrap_servers_only')
def __init__(self, net, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
for key in self.config:
if key in configs:
self.config[key] = configs[key]
if self.config['client_dns_lookup'] not in self._VALID_DNS_LOOKUP_MODES:
raise ValueError(
"client_dns_lookup must be one of %s; got %r"
% (self._VALID_DNS_LOOKUP_MODES, self.config['client_dns_lookup']))
if 'socks5_proxy' in configs:
if self.config['proxy_url'] is None:
log.warning('socks5_proxy is deprecated, use proxy_url instead')
self.config['proxy_url'] = configs['socks5_proxy']
self._net = net
self.cluster = ClusterMetadata(
bootstrap_servers=self.config['bootstrap_servers'],
metadata_max_age_ms=self.config['metadata_max_age_ms'],
client_dns_lookup=self.config['client_dns_lookup'],
)
self.cluster.attach(self)
self._conns = {}
self._backoff = dict() # node_id => (failures, backoff_until, socket_connect_setup_timeout_ms)
# Cache the most recent SASL / SSL / auth failure per node so we can
# surface it to the user instead of silently retrying forever.
# Cleared on successful connect.
self._auth_failures = {} # node_id => AuthenticationError
self._idle_check_delay = self.config['connections_max_idle_ms'] / 1000
self.close_idle_connections()
self.broker_version_data = None
self._bootstrap_future = None
self._bootstrap_wakeup = WakeupNotifier(self._net)
if self.config['metrics']:
self._sensors = KafkaManagerMetrics(
self.config['metrics'], self.config['metric_group_prefix'], self._conns)
else:
self._sensors = None
if self.config['api_version'] is not None:
self.broker_version_data = BrokerVersionData(self.config['api_version'])
self.closed = False
@property
def broker_version(self):
if self.broker_version_data is None:
return None
return self.broker_version_data.broker_version
def least_used_connections(self):
return sorted(filter(lambda conn: conn.connected, self._conns.values()), key=lambda conn: conn.transport.last_activity)
async def _do_bootstrap(self, deadline):
while not self.closed and (deadline is None or time.monotonic() < deadline):
bootstrap_broker = random.choice(self.cluster.bootstrap_brokers())
log.debug('Attempting bootstrap with %s', bootstrap_broker)
try:
timeout_ms = (deadline - time.monotonic()) * 1000 if deadline is not None else None
conn = self.get_connection(bootstrap_broker.node_id,
timeout_ms=timeout_ms,
pop_on_close=False,
refresh_metadata_on_err=False,
reset_backoff_on_connect=False)
except Errors.NodeNotReadyError:
delay = self.connection_delay(bootstrap_broker.node_id)
if deadline is not None:
delay = min(delay, max(0, deadline - time.monotonic()))
log.debug('Bootstrap %s NodeNotReadyError: backoff %s', bootstrap_broker, delay)
await self._bootstrap_wakeup(delay)
continue
try:
await conn
except Errors.IncompatibleBrokerVersion:
log.error('Did you attempt to connect to a kafka controller (no metadata support)?')
raise
except Exception as exc:
self._conns.pop(bootstrap_broker.node_id, conn).close(exc)
continue
try:
await self.cluster.refresh_metadata(bootstrap_broker.node_id)
if not self.cluster.brokers():
log.warning('Bootstrap metadata response has no brokers. Retrying.')
self.update_backoff(bootstrap_broker.node_id)
continue
except Exception as exc:
log.error(f'Bootstrap attempt to {bootstrap_broker.node_id} failed: {exc}')
self.update_backoff(bootstrap_broker.node_id)
continue
else:
self.reset_backoff(bootstrap_broker.node_id)
self.cluster.start_refresh_loop()
log.info('Bootstrap complete: %s', self.cluster)
return True
finally:
self._conns.pop(bootstrap_broker.node_id, conn).close()
else:
raise Errors.KafkaTimeoutError(
'Unable to bootstrap from %s' % (self.cluster.config['bootstrap_servers'],))
def bootstrap_async(self, timeout_ms=None, refresh=True):
if self._bootstrap_future is not None and (not refresh or not self._bootstrap_future.is_done):
return self._bootstrap_future
deadline = None if timeout_ms is None else time.monotonic() + timeout_ms / 1000
log.debug('Starting new bootstrap')
self._bootstrap_future = self.call_soon(self._do_bootstrap, deadline)
self._bootstrap_future.add_errback(lambda exc: log.error('Bootstrap failed: %s', exc))
return self._bootstrap_future
def bootstrap(self, timeout_ms=None, refresh=True):
self._net.run(self.bootstrap_async, timeout_ms, refresh)
@property
def bootstrapped(self):
return self._bootstrap_future is not None and self._bootstrap_future.succeeded()
def _connection_idle_at(self, conn):
return conn.transport.last_activity + self._idle_check_delay
def close_idle_connections(self):
for conn in self.least_used_connections():
next_idle_at = self._connection_idle_at(conn)
if time.monotonic() >= next_idle_at:
log.info('Closing idle connection to node %s', conn.node_id)
conn.close()
else:
break
else:
next_idle_at = time.monotonic() + self._idle_check_delay
log.debug('Next idle connections check in %d secs', next_idle_at - time.monotonic())
self._net.call_at(next_idle_at, self.close_idle_connections)
@property
def ssl_enabled(self):
return self.config['security_protocol'] in ('SSL', 'SASL_SSL')
def _build_ssl_context(self):
if self.config['ssl_context'] is not None:
return self.config['ssl_context']
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ctx.minimum_version = ssl.TLSVersion.TLSv1_2
ctx.check_hostname = self.config['ssl_check_hostname']
if self.config['ssl_cafile']:
ctx.load_verify_locations(self.config['ssl_cafile'])
else:
ctx.load_default_certs()
if self.config['ssl_certfile']:
ctx.load_cert_chain(
certfile=self.config['ssl_certfile'],
keyfile=self.config['ssl_keyfile'],
password=self.config['ssl_password'],
)
if self.config['ssl_crlfile']:
ctx.load_verify_locations(crl=self.config['ssl_crlfile'])
ctx.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
return ctx
async def _build_transport(self, node, timeout_at=None):
sock = await create_connection(self._net, node.host, node.port,
self.config['socket_options'],
proxy_url=self.config['proxy_url'],
timeout_at=timeout_at)
if self.ssl_enabled:
transport = KafkaSSLTransport(self._net, sock, self._build_ssl_context(),
host=node.host, ssl_check_hostname=self.config['ssl_check_hostname'])
else:
transport = KafkaTCPTransport(self._net, sock, host=node.host)
try:
await transport.handshake()
except Exception as e:
raise Errors.KafkaConnectionError('Handshake failed: %s' % e)
else:
return transport
async def _connect(self, node, conn, reset_backoff_on_connect=True, timeout_at=None):
try:
transport = await self._build_transport(node, timeout_at=timeout_at)
conn.connection_made(transport)
await conn.initialize(timeout_at=timeout_at)
except Exception as exc:
log.error('Connection failed: %s', exc)
conn.connection_lost(exc)
self.update_backoff(node.node_id)
if isinstance(exc, (Errors.SaslAuthenticationFailedError,
Errors.AuthorizationError)):
self._auth_failures[node.node_id] = exc
return
if self._sensors:
self._sensors.connection_created.record()
if reset_backoff_on_connect:
self.reset_backoff(node.node_id)
self._auth_failures.pop(node.node_id, None)
if conn.broker_version_data is not None:
if self.cluster.is_bootstrap(node.node_id):
self.broker_version_data = conn.broker_version_data
def get_connection(self, node_id, timeout_ms=None,
pop_on_close=True,
refresh_metadata_on_err=True,
reset_backoff_on_connect=True):
if node_id is None:
raise Errors.NodeNotReadyError('No node_id provided')
self.maybe_raise_auth_failure(node_id)
if self.connection_delay(node_id) > 0:
raise Errors.NodeNotReadyError(node_id)
elif node_id in self._conns:
return self._conns[node_id]
node = self.cluster.broker_metadata(node_id)
if node is None:
raise Errors.UnknownBrokerIdError(node_id)
conn = KafkaConnection(self._net, node_id=node_id, broker_version_data=self.broker_version_data, **self.config)
if pop_on_close:
conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None))
if self._sensors:
conn.close_future.add_both(lambda _: self._sensors.connection_closed.record())
if refresh_metadata_on_err:
conn.close_future.add_errback(lambda _: self.cluster.request_update())
self._conns[node_id] = conn
if timeout_ms is None:
timeout_ms = self.socket_connection_setup_timeout_ms(node_id)
timeout_at = time.monotonic() + timeout_ms / 1000
self._net.call_soon(lambda: self._connect(node, conn, reset_backoff_on_connect=reset_backoff_on_connect, timeout_at=timeout_at))
return conn
def send(self, request, node_id=None, request_timeout_ms=None):
node_id = node_id if node_id is not None else self.least_loaded_node()
try:
conn = self.get_connection(node_id)
except Errors.NodeNotReadyError as e:
return Future().failure(e)
else:
return conn.send_request(request, request_timeout_ms=request_timeout_ms)
[docs]
def least_loaded_node(self):
"""Choose the node with fewest outstanding requests, with fallbacks.
This method will prefer a node with an existing connection (not throttled)
with no in-flight-requests. If no such node is found, a node will be chosen
randomly from all nodes that are not throttled or "blacked out" (i.e.,
are not subject to a reconnect backoff). If no node metadata has been
obtained, will return a bootstrap node.
Returns:
node_id or None if no suitable node was found
"""
nodes = [broker.node_id for broker in self.cluster.brokers()]
random.shuffle(nodes)
inflight = float('inf')
found = None
for node_id in nodes:
conn = self._conns.get(node_id)
connected = conn is not None and conn.connected and not conn.paused
blacked_out = (conn and conn.paused) or self.connection_delay(node_id) > 0
curr_inflight = len(conn.in_flight_requests) if conn is not None else 0
if connected and curr_inflight == 0:
# if we find an established connection (not throttled)
# with no in-flight requests, we can stop right away
return node_id
elif not blacked_out and curr_inflight < inflight:
# otherwise if this is the best we have found so far, record that
inflight = curr_inflight
found = node_id
return found
def reset_backoff(self, node_id):
try:
del self._backoff[node_id]
except KeyError:
pass
def jitter_pct(self):
return random.uniform(0.8, 1.2)
def _calculate_exp_timeout(self, key, failures):
max_keys = {
'reconnect_backoff_ms': 'reconnect_backoff_max_ms',
'socket_connection_setup_timeout_ms': 'socket_connection_setup_timeout_max_ms',
}
timeout_ms = self.config[key] * 2 ** (failures - 1)
if key in max_keys:
max_ms = self.config[max_keys[key]]
timeout_ms = min(max_ms, timeout_ms)
return timeout_ms * self.jitter_pct()
def update_backoff(self, node_id):
failures, _, _ = self._backoff.get(node_id, (0, 0, 0))
failures += 1
backoff_ms = self._calculate_exp_timeout('reconnect_backoff_ms', failures)
connect_ms = self._calculate_exp_timeout('socket_connection_setup_timeout_ms', failures)
log.debug('%s reconnect backoff %d ms / connect timeout %d ms after %s failures',
node_id, backoff_ms, connect_ms, failures)
backoff_until_time = time.monotonic() + (backoff_ms / 1000)
self._backoff[node_id] = (failures, backoff_until_time, connect_ms)
[docs]
def connection_delay(self, node_id):
"""Connection delay in seconds.
Uses exponential backoff/retry with jitter. See KIP-144.
"""
if node_id not in self._backoff:
return 0
return max(0, self._backoff[node_id][1] - time.monotonic())
def socket_connection_setup_timeout_ms(self, node_id):
if node_id not in self._backoff:
return self.config['socket_connection_setup_timeout_ms']
return self._backoff[node_id][2]
[docs]
def auth_failure(self, node_id):
"""Return the most recent auth-class failure for ``node_id``,
or None if there is no sticky failure on record."""
return self._auth_failures.get(node_id)
[docs]
def maybe_raise_auth_failure(self, node_id):
"""Raise the cached auth-class failure for ``node_id`` if any."""
exc = self._auth_failures.get(node_id)
if exc is not None:
raise exc
def close(self, node_id=None, timeout_ms=None):
if node_id is not None:
conn = self._conns.get(node_id)
if conn is not None:
conn.close()
elif not self.closed:
self.closed = True
self._bootstrap_wakeup.notify()
for conn in list(self._conns.values()):
conn.close()
self.cluster.close()
[docs]
async def wait_for(self, future, timeout_ms):
"""Await `future` with a timeout in ms. Raises KafkaTimeoutError on timeout.
Must be awaited from a coroutine running on this loop. The underlying
future is not cancelled on timeout - it continues to run; the timeout
only unblocks the awaiter.
"""
if timeout_ms is None:
return await future
wrapper = Future()
def _on_success(value):
if not wrapper.is_done:
wrapper.success(value)
def _on_failure(exc):
if not wrapper.is_done:
wrapper.failure(exc)
future.add_callback(_on_success)
future.add_errback(_on_failure)
def _on_timeout():
if not wrapper.is_done:
wrapper.failure(Errors.KafkaTimeoutError(
'Timed out after %s ms' % timeout_ms))
timer = self._net.call_later(timeout_ms / 1000, _on_timeout)
try:
return await wrapper
finally:
if not timer.is_done:
try:
self._net.unschedule(timer)
except ValueError:
pass
[docs]
def call_soon(self, coro, *args):
"""Accepts a coroutine / awaitable / function and schedules it on the event loop.
Thread-safe.
Returns: Future
"""
return self._net.call_soon_with_future(coro, *args)
[docs]
def run(self, coro, *args):
"""Schedules coro on the event loop, blocks until complete, returns value or raises.
If an IO thread is running (via start()), the caller thread blocks on
a cross-thread Event while the coroutine runs on the IO thread. Safe
to call concurrently from multiple caller threads.
If no IO thread is running, falls back to driving the loop on the
caller thread (legacy behavior).
"""
return self._net.run(coro, *args)