Source code for kafka.net.socks5

import errno
import logging
import random
import socket
import struct
from urllib.parse import urlparse

from kafka.errors import KafkaConnectionError
from kafka.net.inet import KafkaNetSocket


log = logging.getLogger(__name__)


class ProxyConnectionStates:
    DISCONNECTED = '<disconnected>'
    CONNECTING = '<connecting>'
    NEGOTIATE_PROPOSE = '<negotiate_propose>'
    NEGOTIATING = '<negotiating>'
    AUTHENTICATING = '<authenticating>'
    REQUEST_SUBMIT = '<request_submit>'
    REQUESTING = '<requesting>'
    READ_ADDRESS = '<read_address>'
    COMPLETE = '<complete>'


[docs] class Socks5Proxy(KafkaNetSocket): """Socks5 proxy Manages connection through socks5 proxy with support for username/password authentication. """ # socks5h for remote dns SCHEMES = ('socks5', 'socks5h') def __init__(self, proxy_url): self._buffer_in = b'' self._buffer_out = b'' self._proxy_url = urlparse(proxy_url) if self._proxy_url.scheme not in self.SCHEMES: raise ValueError('Unsupported proxy scheme: %s' % (self._proxy_url.scheme,)) self._sock = None self._state = ProxyConnectionStates.DISCONNECTED self._target_afi = socket.AF_UNSPEC self._proxy_addr = self._get_proxy_addr() def _get_proxy_addr(self): proxy_addrs = self.dns_lookup(self._proxy_url.hostname, self._proxy_url.port, proxy=True) if not proxy_addrs: raise KafkaConnectionError('Unable to resolve proxy_url via dns') return random.choice(proxy_addrs) def _use_remote_lookup(self): return self._proxy_url.scheme == 'socks5h'
[docs] def dns_lookup(self, host, port, proxy=False): if proxy: return super().dns_lookup(host, port, raise_error=True) elif self._use_remote_lookup(): return [(socket.AF_UNSPEC, socket.SOCK_STREAM, socket.IPPROTO_TCP, '', (host, port))] else: return super().dns_lookup(host, port)
[docs] def socket(self, family=socket.AF_UNSPEC, sock_type=socket.SOCK_STREAM, proto=socket.IPPROTO_TCP): """Open and record a socket. Returns the actual underlying socket object to ensure e.g. selects and ssl wrapping works as expected. """ self._target_afi = family # Store the address family of the target proxy_family, _, _, _, _ = self._proxy_addr self._sock = socket.socket(proxy_family, sock_type, proto) return self._sock
def _flush_buf(self): """Send out all data that is stored in the outgoing buffer. It is expected that the caller handles error handling, including non-blocking as well as connection failure exceptions. """ while self._buffer_out: sent_bytes = self._sock.send(self._buffer_out) self._buffer_out = self._buffer_out[sent_bytes:] def _peek_buf(self, datalen): """Ensure local inbound buffer has enough data, and return that data without consuming the local buffer It's expected that the caller handles e.g. blocking exceptions""" while True: bytes_remaining = datalen - len(self._buffer_in) if bytes_remaining <= 0: break data = self._sock.recv(bytes_remaining) if not data: break self._buffer_in = self._buffer_in + data return self._buffer_in[:datalen] def _read_buf(self, datalen): """Read and consume bytes from socket connection It's expected that the caller handles e.g. blocking exceptions""" buf = self._peek_buf(datalen) if buf: self._buffer_in = self._buffer_in[len(buf):] return buf
[docs] def connect_ex(self, sock, addr): """Runs a state machine through connection to authentication to proxy connection request. The somewhat strange setup is to facilitate non-intrusive use from BrokerConnection state machine. This function is called with a socket in non-blocking mode. Both send and receive calls can return in EWOULDBLOCK/EAGAIN which we specifically avoid handling here. These are handled in main BrokerConnection connection loop, which then would retry calls to this function.""" assert sock is self._sock if self._state == ProxyConnectionStates.DISCONNECTED: self._state = ProxyConnectionStates.CONNECTING if self._state == ProxyConnectionStates.CONNECTING: _, _, _, _, sockaddr = self._proxy_addr ret = self._sock.connect_ex(sockaddr) if not ret or ret == errno.EISCONN: self._state = ProxyConnectionStates.NEGOTIATE_PROPOSE else: return ret if self._state == ProxyConnectionStates.NEGOTIATE_PROPOSE: if self._proxy_url.username and self._proxy_url.password: # Propose username/password self._buffer_out = b"\x05\x01\x02" else: # Propose no auth self._buffer_out = b"\x05\x01\x00" self._state = ProxyConnectionStates.NEGOTIATING if self._state == ProxyConnectionStates.NEGOTIATING: self._flush_buf() buf = self._read_buf(2) if buf[0:1] != b"\x05": log.error("Unrecognized SOCKS version") self._state = ProxyConnectionStates.DISCONNECTED self._sock.close() return errno.ECONNREFUSED if buf[1:2] == b"\x00": # No authentication required self._state = ProxyConnectionStates.REQUEST_SUBMIT elif buf[1:2] == b"\x02": # Username/password authentication selected userlen = len(self._proxy_url.username) passlen = len(self._proxy_url.password) self._buffer_out = struct.pack( "!bb{}sb{}s".format(userlen, passlen), 1, # version userlen, self._proxy_url.username.encode(), passlen, self._proxy_url.password.encode(), ) self._state = ProxyConnectionStates.AUTHENTICATING else: log.error("Unrecognized SOCKS authentication method") self._state = ProxyConnectionStates.DISCONNECTED self._sock.close() return errno.ECONNREFUSED if self._state == ProxyConnectionStates.AUTHENTICATING: self._flush_buf() buf = self._read_buf(2) if buf == b"\x01\x00": # Authentication succesful self._state = ProxyConnectionStates.REQUEST_SUBMIT else: log.error("Socks5 proxy authentication failure") self._state = ProxyConnectionStates.DISCONNECTED self._sock.close() return errno.ECONNREFUSED if self._state == ProxyConnectionStates.REQUEST_SUBMIT: if self._use_remote_lookup(): addr_type = 3 addr_len = len(addr[0]) elif self._target_afi == socket.AF_INET: addr_type = 1 addr_len = 4 elif self._target_afi == socket.AF_INET6: addr_type = 4 addr_len = 16 else: log.error("Unknown address family, %r", self._target_afi) self._state = ProxyConnectionStates.DISCONNECTED self._sock.close() return errno.ECONNREFUSED self._buffer_out = struct.pack( "!bbbb", 5, # version 1, # command: connect 0, # reserved addr_type, # 1 for ipv4, 4 for ipv6 address, 3 for domain name ) # Addr format depends on type if addr_type == 3: # len + domain name (no null terminator) self._buffer_out += struct.pack( "!b{}s".format(addr_len), addr_len, addr[0].encode('ascii'), ) else: # either 4 (type 1) or 16 (type 4) bytes of actual address self._buffer_out += struct.pack( "!{}s".format(addr_len), socket.inet_pton(self._target_afi, addr[0]), ) self._buffer_out += struct.pack("!H", addr[1]) # port self._state = ProxyConnectionStates.REQUESTING if self._state == ProxyConnectionStates.REQUESTING: self._flush_buf() buf = self._read_buf(2) if buf[0:2] == b"\x05\x00": self._state = ProxyConnectionStates.READ_ADDRESS else: log.error("Proxy request failed: %r", buf[1:2]) self._state = ProxyConnectionStates.DISCONNECTED self._sock.close() return errno.ECONNREFUSED if self._state == ProxyConnectionStates.READ_ADDRESS: # we don't really care about the remote endpoint address, but need to clear the stream buf = self._peek_buf(2) if buf[0:2] == b"\x00\x01": _ = self._read_buf(2 + 4 + 2) # ipv4 address + port elif buf[0:2] == b"\x00\x05": _ = self._read_buf(2 + 16 + 2) # ipv6 address + port else: log.error("Unrecognized remote address type %r", buf[1:2]) self._state = ProxyConnectionStates.DISCONNECTED self._sock.close() return errno.ECONNREFUSED self._state = ProxyConnectionStates.COMPLETE if self._state == ProxyConnectionStates.COMPLETE: return 0 # not reached; # Send and recv will raise socket error on EWOULDBLOCK/EAGAIN that is assumed to be handled by # the caller. The caller re-enters this state machine from retry logic with timer or via select & family log.error("Internal error, state %r not handled correctly", self._state) self._state = ProxyConnectionStates.DISCONNECTED if self._sock: self._sock.close() return errno.ECONNREFUSED