Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions kafka/net/inet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,47 @@
import socket

import kafka.errors as Errors
from kafka.socks5_wrapper import Socks5Wrapper


log = logging.getLogger(__name__)


async def create_connection(net, host, port, socket_options=()):
async def create_connection(net, host, port, socket_options=(), socks5_proxy=None):
"""Connect to host:port; raises KafkaConnectionError on failure"""
if socks5_proxy and Socks5Wrapper.use_remote_lookup(socks5_proxy):
addrs = [(socket.AF_UNSPEC, socket.SOCK_STREAM, 0, '', (host, port))]
else:
addrs = dns_lookup(host, port)

exceptions = [Errors.KafkaConnectionError('DNS Resolution failure')]
for res in dns_lookup(host, port):
for res in addrs:
af, _socktype, _proto, _canonname, sa = res
# TODO: socks5 proxy
try:
sock = socket.socket(af, socket.SOCK_STREAM)
proxy = Socks5Wrapper(socks5_proxy, af) if socks5_proxy else None
sock = (proxy or socket).socket(af, socket.SOCK_STREAM)
sock.setblocking(False)
for option in socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
sock = await connect_sock(net, sock, sa, proxy=proxy)
except (socket.error, OSError) as e:
exceptions.append(Errors.KafkaConnectionError('unable to initialize socket object: %s' % (e,)))
exceptions.append(Errors.KafkaConnectionError('unable to connect: %s' % (e,)))
continue
try:
sock = await connect_sock(net, sock, sa)
except Errors.KafkaConnectionError as e:
exceptions.append(e)
continue
else:
exceptions = []
return sock
else:
raise exceptions[-1]
raise exceptions[-1]


async def connect_sock(net, sock, sockaddr):
async def connect_sock(net, sock, sockaddr, proxy=None):
while True:
ret = None
try:
ret = sock.connect_ex(sockaddr)
ret = (proxy or sock).connect_ex(sockaddr)
except BlockingIOError:
ret = errno.EWOULDBLOCK
except socket.error as err:
ret = err.errno

Expand Down
4 changes: 3 additions & 1 deletion kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class KafkaConnectionManager:
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
}
def __init__(self, net, cluster, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
Expand Down Expand Up @@ -159,7 +160,8 @@ async def _connect(self, node, conn):

try:
sock = await create_connection(self._net, node.host, node.port,
self.config['socket_options'])
self.config['socket_options'],
socks5_proxy=self.config['socks5_proxy'])
except Errors.KafkaConnectionError as e:
conn.connection_lost(e)
self.update_backoff(node.node_id)
Expand Down
80 changes: 80 additions & 0 deletions test/net/test_inet.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,83 @@ def test_so_error_after_wait(self):
pass
rsock.close()
wsock.close()


class TestConnectSockWithProxy:
def test_proxy_connect_ex_called(self):
net = NetworkSelector()
sock = MagicMock()
proxy = MagicMock()
proxy.connect_ex.return_value = 0
task = net.run_until_done(connect_sock(net, sock, ('127.0.0.1', 9092), proxy=proxy))
assert task.result is sock
proxy.connect_ex.assert_called_once_with(('127.0.0.1', 9092))
sock.connect_ex.assert_not_called()

def test_proxy_blocking_io_retries(self):
net = NetworkSelector()
rsock, wsock = socket.socketpair()
rsock.setblocking(False)
wsock.setblocking(False)
proxy = MagicMock()
proxy.connect_ex.side_effect = [BlockingIOError, 0]
# Use real socket fd so wait_write works
mock_sock = MagicMock()
mock_sock.fileno.return_value = wsock.fileno()
task = net.run_until_done(connect_sock(net, mock_sock, ('127.0.0.1', 9092), proxy=proxy))
assert task.result is mock_sock
assert proxy.connect_ex.call_count == 2
rsock.close()
wsock.close()

def test_proxy_connection_refused(self):
net = NetworkSelector()
sock = MagicMock()
proxy = MagicMock()
proxy.connect_ex.return_value = errno.ECONNREFUSED
task = net.run_until_done(connect_sock(net, sock, ('127.0.0.1', 9092), proxy=proxy))
assert isinstance(task.exception, Errors.KafkaConnectionError)


class TestCreateConnectionWithProxy:
def test_proxy_creates_socket_via_wrapper(self):
net = NetworkSelector()
mock_sock = MagicMock()
mock_sock.connect_ex.return_value = 0
mock_proxy = MagicMock()
mock_proxy.socket.return_value = mock_sock
mock_proxy.connect_ex.return_value = 0
with patch('kafka.net.inet.Socks5Wrapper', return_value=mock_proxy) as mock_cls:
task = net.run_until_done(
create_connection(net, 'broker', 9092, socks5_proxy='socks5://proxy:1080'))
mock_cls.assert_called_once_with('socks5://proxy:1080', socket.AF_UNSPEC)
mock_proxy.socket.assert_called_once()
assert task.result is mock_sock

def test_proxy_remote_dns_skips_local_lookup(self):
net = NetworkSelector()
mock_sock = MagicMock()
mock_sock.connect_ex.return_value = 0
mock_proxy = MagicMock()
mock_proxy.socket.return_value = mock_sock
mock_proxy.connect_ex.return_value = 0
with patch('kafka.net.inet.Socks5Wrapper') as mock_cls, \
patch('kafka.net.inet.dns_lookup') as mock_dns:
mock_cls.return_value = mock_proxy
mock_cls.use_remote_lookup.return_value = True
task = net.run_until_done(
create_connection(net, 'broker', 9092, socks5_proxy='socks5h://proxy:1080'))
mock_dns.assert_not_called()

def test_no_proxy_uses_direct_socket(self):
net = NetworkSelector()
fake_addr = [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.0.0.1', 9092))]
mock_sock = MagicMock()
mock_sock.connect_ex.return_value = 0
with patch('kafka.net.inet.dns_lookup', return_value=fake_addr), \
patch('kafka.net.inet.socket.socket', return_value=mock_sock), \
patch('kafka.net.inet.Socks5Wrapper') as mock_cls:
task = net.run_until_done(
create_connection(net, 'host', 9092))
mock_cls.assert_not_called()
assert task.result is mock_sock
Loading