Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import copy
import itertools
import logging
import selectors
import socket
import time

from . import ConfigResourceType

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType, valid_acl_operations
from kafka.client_async import KafkaClient, selectors
from kafka.net.compat import KafkaNetClient
from kafka.protocol.consumer.metadata import (
ConsumerProtocolSubscription, ConsumerProtocolAssignment, ConsumerProtocolType,
)
Expand Down Expand Up @@ -191,7 +192,7 @@ class KafkaAdminClient:
'metric_reporters': [],
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'kafka_client': KafkaClient,
'kafka_client': KafkaNetClient,
}

def __init__(self, **configs):
Expand Down
5 changes: 3 additions & 2 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import copy
import logging
import re
import selectors
import socket
import time

from kafka.errors import KafkaConfigurationError, UnsupportedVersionError

from kafka.client_async import KafkaClient, selectors
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.metrics import MetricConfig, Metrics
from kafka.net.compat import KafkaNetClient
from kafka.protocol.consumer import OffsetResetStrategy
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import Timer
Expand Down Expand Up @@ -339,7 +340,7 @@ class KafkaConsumer:
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
'kafka_client': KafkaClient,
'kafka_client': KafkaNetClient,
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

Expand Down
154 changes: 154 additions & 0 deletions kafka/net/compat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import logging
import threading
import time

import kafka.errors as Errors
from kafka.cluster import ClusterMetadata
from kafka.net.manager import KafkaConnectionManager
from kafka.net.selector import NetworkSelector


log = logging.getLogger(__name__)


class KafkaNetClient:
"""Drop-in replacement for KafkaClient backed by KafkaConnectionManager.

Provides the KafkaClient API surface that existing consumer/producer/admin
code depends on. Goal: shrink over time as components transition to using
KafkaConnectionManager directly (fire-and-forget via _request_buffer).
"""
def __init__(self, **configs):
self._lock = threading.RLock()
self._net = NetworkSelector(**configs)
cluster = ClusterMetadata(
bootstrap_servers=configs.get('bootstrap_servers', ['localhost:9092']),
metadata_max_age_ms=configs.get('metadata_max_age_ms', 300000),
)
self._manager = KafkaConnectionManager(self._net, cluster, **configs)

@property
def cluster(self):
return self._manager.cluster

# Connection state queries

def connected(self, node_id):
conn = self._manager._conns.get(node_id)
return conn is not None and conn.connected

def is_disconnected(self, node_id):
return not self.connected(node_id)

def is_ready(self, node_id):
conn = self._manager._conns.get(node_id)
return conn is not None and conn.connected and not conn.paused

def ready(self, node_id, **kwargs):
if self.is_ready(node_id):
return True
try:
self._manager.get_connection(node_id)
except Errors.NodeNotReadyError:
pass
return False

def maybe_connect(self, node_id, **kwargs):
try:
self._manager.get_connection(node_id)
except Errors.NodeNotReadyError:
pass

def await_ready(self, node_id, timeout_ms=30000):
if self.is_ready(node_id):
return True
self.maybe_connect(node_id)
conn = self._manager._conns.get(node_id)
if conn is not None and not conn.init_future.is_done:
self._manager.poll(timeout_ms=timeout_ms, future=conn.init_future)
if not self.is_ready(node_id):
raise Errors.KafkaConnectionError('Node %s not ready after %s ms' % (node_id, timeout_ms))
return True

# In-flight request tracking

def in_flight_request_count(self, node_id=None):
if node_id is not None:
conn = self._manager._conns.get(node_id)
return len(conn.in_flight_requests) if conn is not None else 0
return sum(len(c.in_flight_requests) for c in self._manager._conns.values())

def throttle_delay(self, node_id):
conn = self._manager._conns.get(node_id)
if conn is None:
return 0
remaining = conn._throttle_time - time.monotonic()
return max(0, remaining) * 1000

# Bootstrap / version

def bootstrap_connected(self):
return self._manager.bootstrapped

def get_broker_version(self, timeout_ms=None):
if self._manager.broker_version_data is None:
if timeout_ms is not None:
self.check_version(timeout_ms=timeout_ms)
else:
return None
return self._manager.broker_version_data.broker_version

def check_version(self, node_id=None, timeout_ms=10000):
f = self._manager.bootstrap(timeout_ms=timeout_ms)
self._manager.poll(timeout_ms=timeout_ms, future=f)
if f.failed():
raise f.exception
return self._manager.broker_version_data.broker_version

# Request sending

def send(self, node_id, request, **kwargs):
return self._manager.send(request, node_id=node_id)

def send_and_receive(self, node_id, request, timeout_ms=30000):
self.await_ready(node_id, timeout_ms=timeout_ms)
f = self.send(node_id, request)
self._manager.poll(timeout_ms=timeout_ms, future=f)
if f.succeeded():
return f.value
elif f.failed():
raise f.exception
raise Errors.KafkaTimeoutError('Request timed out')

# Delegation

def poll(self, timeout_ms=None, future=None):
with self._lock:
if self._manager.cluster.need_update:
self._manager.update_metadata()
return self._manager.poll(timeout_ms=timeout_ms, future=future)

def close(self, node_id=None):
self._manager.close(node_id=node_id)
if node_id is None:
self._net.close()

def least_loaded_node(self):
return self._manager.least_loaded_node()

def least_loaded_node_refresh_ms(self):
brokers = self._manager.cluster.brokers() or self._manager.cluster.bootstrap_brokers()
if not brokers:
return self._manager.config['reconnect_backoff_ms']
delays = [self._manager.connection_delay(broker.node_id) for broker in brokers]
return min(delays) * 1000

def connection_delay(self, node_id):
return self._manager.connection_delay(node_id)

def wakeup(self):
self._net.wakeup()

def api_version(self, operation, max_version=None):
assert self._manager.broker_version_data is not None
return self._manager.broker_version_data.api_version(operation, max_version=max_version)
63 changes: 58 additions & 5 deletions kafka/net/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from .connection import KafkaConnection
from .transport import KafkaSSLTransport, KafkaTCPTransport
import kafka.errors as Errors
from kafka.protocol.broker_version_data import BrokerVersionData
from kafka.protocol.broker_version_data import BrokerVersionData, VERSION_CHECKS
from kafka.future import Future


Expand Down Expand Up @@ -40,6 +40,7 @@ class KafkaConnectionManager:
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
'api_version': None,
}
def __init__(self, net, cluster, **configs):
self.config = copy.copy(self.DEFAULT_CONFIG)
Expand All @@ -57,6 +58,8 @@ def __init__(self, net, cluster, **configs):
self.broker_version_data = None
self._bootstrap_future = None
self._metadata_future = None
if self.config['api_version'] is not None:
self.broker_version_data = BrokerVersionData(self.config['api_version'])

def least_used_connections(self):
return sorted(filter(lambda conn: conn.connected, self._conns.values()), key=lambda conn: conn.transport.last_activity)
Expand Down Expand Up @@ -93,6 +96,10 @@ async def _do_bootstrap(self, future, deadline):
try:
response = await self.send(self.cluster.metadata_request(), node_id=bootstrap_broker.node_id)
self.cluster.update_metadata(response)
if not self.cluster.brokers():
log.warning('Bootstrap metadata response has no brokers. Retrying.')
await self._net.sleep(self.config['reconnect_backoff_ms'] / 1000)
continue
self._conns.pop(bootstrap_broker.node_id, conn).close()
future.success(True)
return
Expand Down Expand Up @@ -185,12 +192,58 @@ async def _connect(self, node, conn):
try:
await conn.init_future
except Exception:
self.update_backoff(node.node_id)
pass

if conn.broker_version_data is not None:
self.reset_backoff(node.node_id)
if self.cluster.is_bootstrap(node.node_id):
self.broker_version_data = conn.broker_version_data
return

# ApiVersions failed (old broker < 0.10). Probe with known requests
# to identify the broker version. Each probe needs a fresh connection
# because old brokers disconnect on unrecognized requests.
for version, request in VERSION_CHECKS:
log.debug('Probing broker %s version %s with %s', node.node_id, version, request)
try:
sock = await create_connection(self._net, node.host, node.port,
self.config['socket_options'],
socks5_proxy=self.config['socks5_proxy'])
if self.ssl_enabled:
hostname = node.host if self.config['ssl_check_hostname'] else None
probe_transport = KafkaSSLTransport(self._net, sock, self._build_ssl_context(), hostname)
else:
probe_transport = KafkaTCPTransport(self._net, sock)
await probe_transport.handshake()

# Create a new connection with the probed version pre-set
probe_conn = KafkaConnection(self._net, node_id=node.node_id, **self.config)
probe_conn.broker_version_data = BrokerVersionData(version)
probe_conn.connection_made(probe_transport)
# _check_version will see broker_version_data is set and call _init_complete
await probe_conn.init_future
# Verify the probe request succeeds
await probe_conn.send_request(request)
except Exception:
try:
probe_conn.close()
except Exception:
pass
continue

log.info('Broker %s version identified as %s via probe', node.node_id, version)
log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version)
# Replace the failed conn with the working probe conn
self._conns[node.node_id] = probe_conn
probe_conn.close_future.add_both(lambda _: self._conns.pop(node.node_id, None))
probe_conn.close_future.add_errback(lambda _: self.cluster.request_update())
self.reset_backoff(node.node_id)
if self.cluster.is_bootstrap(node.node_id):
self.broker_version_data = probe_conn.broker_version_data
return

self.reset_backoff(node.node_id)
if self.cluster.is_bootstrap(node.node_id):
self.broker_version_data = conn.broker_version_data
self.update_backoff(node.node_id)

def get_connection(self, node_id):
if node_id is None:
Expand Down
5 changes: 3 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import atexit
import copy
import logging
import selectors
import socket
import threading
import warnings
import weakref

import kafka.errors as Errors
from kafka.client_async import KafkaClient, selectors
from kafka.net.compat import KafkaNetClient
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd
from kafka.metrics import MetricConfig, Metrics
from kafka.partitioner.default import DefaultPartitioner
Expand Down Expand Up @@ -418,7 +419,7 @@ class KafkaProducer:
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'socks5_proxy': None,
'kafka_client': KafkaClient,
'kafka_client': KafkaNetClient,
}

DEPRECATED_CONFIGS = ('buffer_memory',)
Expand Down
5 changes: 1 addition & 4 deletions test/integration/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import pytest

from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState
from kafka.structs import TopicPartition
Expand All @@ -23,9 +22,7 @@ def test_consumer(kafka_broker, topic):
# 0.8.2 brokers need a topic to function well
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
consumer.poll(timeout_ms=500)
assert len(consumer._client._conns) > 0
node_id = list(consumer._client._conns.keys())[0]
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
assert consumer._client.cluster.brokers()
consumer.close()


Expand Down
15 changes: 15 additions & 0 deletions test/integration/test_ssl_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,18 @@ def test_kafka_client_ssl(self, ssl_kafka):
)
assert client.broker_version_data
client.close()

def test_legacy_kafka_client_ssl(self, ssl_kafka):
"""Test LegacyKafkaClient (kafka.net) can connect over SSL."""
from kafka.net.compat import KafkaNetClient

client = KafkaNetClient(
bootstrap_servers='localhost:%d' % ssl_kafka.port,
security_protocol='SSL',
ssl_cafile=os.path.join(ssl_kafka.ssl_dir, 'ca-cert'),
ssl_check_hostname=False,
)
version = client.check_version(timeout_ms=5000)
assert version is not None
assert client.cluster.brokers()
client.close()
Loading
Loading