import errno
import logging
import socket
import time
from urllib.parse import urlparse
import kafka.errors as Errors
log = logging.getLogger(__name__)
[docs]
async def create_connection(net, host, port, socket_options=(), proxy_url=None, timeout_at=None):
"""Connect to host:port; raises KafkaConnectionError on failure"""
socket_factory = KafkaNetSocket(proxy_url)
addrs = socket_factory.dns_lookup(host, port)
exceptions = [Errors.KafkaConnectionError('DNS Resolution failure')]
for res in addrs:
try:
log.debug('%s: Attempting to connect to %s (options: %s)', socket_factory, res, socket_options)
sock = await socket_factory.connect(net, res, socket_options, timeout_at=timeout_at)
except (socket.error, OSError) as e:
exceptions.append(Errors.KafkaConnectionError('unable to connect: %s' % (e,)))
continue
except Errors.KafkaTimeoutError:
raise Errors.KafkaConnectionError('Connection timed out')
except Errors.KafkaConnectionError as e:
exceptions.append(e)
continue
else:
return sock
raise exceptions[-1]
[docs]
class KafkaNetSocket:
# scheme => handling class
_registry = {}
[docs]
@classmethod
def register_class(cls, klass):
for scheme in klass.SCHEMES:
cls._registry[scheme] = klass
def __init_subclass__(cls, **kw):
super().__init_subclass__(**kw)
KafkaNetSocket.register_class(cls)
def __new__(cls, proxy_url=None):
if proxy_url is None:
return super().__new__(cls)
try:
parsed = urlparse(proxy_url)
except Exception:
raise ValueError('Unable to parse proxy_url: %s' % (proxy_url,))
if not parsed.scheme:
raise ValueError('proxy_url requires scheme:// (%s)' % (proxy_url,))
try:
klass = KafkaNetSocket._registry[parsed.scheme]
except KeyError:
raise ValueError('Unsupported proxy url scheme: %s' % (parsed.scheme))
return super().__new__(klass)
def __init__(self, proxy_url=None):
pass
# simple sockets / no proxy
[docs]
def dns_lookup(self, host, port, raise_error=False):
# XXX: all DNS functions in Python are blocking. If we really
# want to be non-blocking here, we need to use a 3rd-party
# library like python-adns, or move resolution onto its
# own thread. This will be subject to the default libc
# name resolution timeout (5s on most Linux boxes)
try:
return socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror as ex:
err_str = "DNS lookup failed for %s:%d, %r" % (host, port, ex)
if not raise_error:
log.warning(err_str)
return []
raise Errors.KafkaConnectionError(err_str)
[docs]
def socket(self, family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP):
return socket.socket(family, sock_type, proto)
[docs]
async def connect(self, net, addrinfo, socket_options=(), timeout_at=None):
"""Create non-blocking socket (with options) and connect to addrinfo tuple"""
family, sock_type, proto, _canonname, sockaddr = addrinfo
sock = self.socket(family, sock_type, proto)
sock.setblocking(False)
for option in socket_options:
sock.setsockopt(*option)
return await self.sock_connect(net, sock, sockaddr, timeout_at=timeout_at)
[docs]
async def sock_connect(self, net, sock, sockaddr, timeout_at=None):
while timeout_at is None or time.monotonic() < timeout_at:
ret = None
try:
ret = self.connect_ex(sock, sockaddr)
except BlockingIOError:
ret = errno.EWOULDBLOCK
except socket.error as err:
ret = err.errno
# Connection succeeded
if not ret or ret == errno.EISCONN:
log.debug('Connected: %s', sock)
return sock
# Needs retry
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
elif ret in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022):
await net.wait_write(sock, timeout_at=timeout_at)
# Connection failed
else:
errstr = errno.errorcode.get(ret, 'UNKNOWN')
raise Errors.KafkaConnectionError('{} {}'.format(ret, errstr))
else:
raise Errors.KafkaTimeoutError('Connection timed out')
[docs]
def connect_ex(self, sock, sockaddr):
return sock.connect_ex(sockaddr)