Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions test/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

import pytest

from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from test.testutil import env_kafka_version, random_string
from test.integration.fixtures import KafkaFixture, ZookeeperFixture
from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -55,10 +56,11 @@ def factory(**broker_params):
zk.close()



@pytest.fixture
def kafka_client(kafka_broker, request):
"""Return a KafkaClient fixture"""
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
client = KafkaClient(**client_params(kafka_broker, request.node.name))
yield client
client.close()

Expand All @@ -72,19 +74,23 @@ def kafka_consumer(kafka_consumer_factory):
@pytest.fixture
def kafka_consumer_factory(kafka_broker, topic, request):
"""Return a KafkaConsumer factory fixture"""
_consumer = [None]

def factory(topics=(topic,), **kafka_consumer_params):
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
params.setdefault('auto_offset_reset', 'earliest')
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params))
return _consumer[0]
consumer = None

def factory(topics=(topic,), **override_params):
nonlocal consumer
params = {
'heartbeat_interval_ms': 500,
'auto_offset_reset': 'earliest',
}
params.update(override_params)
params = client_params(kafka_broker, request.node.name, **params)
consumer = KafkaConsumer(*topics, **params)
return consumer

yield factory

if _consumer[0]:
_consumer[0].close()
if consumer:
consumer.close()


@pytest.fixture
Expand All @@ -95,19 +101,19 @@ def kafka_producer(kafka_producer_factory):

@pytest.fixture
def kafka_producer_factory(kafka_broker, request):
"""Return a KafkaProduce factory fixture"""
_producer = [None]
"""Return a KafkaProducer factory fixture"""
producer = None

def factory(**kafka_producer_params):
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
return _producer[0]
def factory(**params):
nonlocal producer
params = client_params(kafka_broker, request.node.name, **params)
producer = KafkaProducer(**params)
return producer

yield factory

if _producer[0]:
_producer[0].close()
if producer:
producer.close()


@pytest.fixture
Expand All @@ -119,24 +125,25 @@ def kafka_admin_client(kafka_admin_client_factory):
@pytest.fixture
def kafka_admin_client_factory(kafka_broker):
"""Return a KafkaAdminClient factory fixture"""
_admin_client = [None]
admin_client = None

def factory(**kafka_admin_client_params):
params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
return _admin_client[0]
def factory(**params):
nonlocal admin_client
params = client_params(kafka_broker, 'admin', **params)
admin_client = KafkaAdminClient(**params)
return admin_client

yield factory

if _admin_client[0]:
_admin_client[0].close()
if admin_client:
admin_client.close()


@pytest.fixture
def topic(kafka_broker, request):
"""Return a topic fixture"""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
create_topics(kafka_broker, [topic_name])
return topic_name


Expand Down
155 changes: 39 additions & 116 deletions test/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,51 @@

import py

from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError
from kafka.protocol.new.admin import CreateTopicsRequest
from kafka.protocol.new.metadata import MetadataRequest
from kafka import errors, KafkaAdminClient
from kafka.errors import InvalidReplicationFactorError
from test.testutil import env_kafka_version, random_string
from test.service import ExternalService, SpawnedService

log = logging.getLogger(__name__)


def create_topics(broker, topic_names, num_partitions=None, replication_factor=None):
"""Create topics on the given broker fixture.

Uses KafkaAdminClient for Kafka 0.10.1+, falls back to CLI.
"""
if num_partitions is None:
num_partitions = broker.partitions
if replication_factor is None:
replication_factor = broker.replicas
if env_kafka_version() >= (0, 10, 1, 0):
_create_topics_via_admin(broker, topic_names, num_partitions, replication_factor)
else:
for topic_name in topic_names:
# TODO: verify kafka-topics.sh support for early 0.8 brokers
broker._create_topic_via_cli(topic_name, num_partitions, replication_factor)


def _create_topics_via_admin(broker, topic_names, num_partitions, replication_factor):
from kafka.admin import NewTopic
params = broker._enrich_client_params({}, client_id='topic_creator')
admin = KafkaAdminClient(**params)
try:
topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names]
admin.create_topics(topics)
except InvalidReplicationFactorError:
time.sleep(0.5)
topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names]
admin.create_topics(topics)
finally:
admin.close()


def client_params(broker, client_id='client', **overrides):
"""Build connection params for a client from a broker fixture."""
return broker._enrich_client_params(overrides, client_id='%s_%s' % (client_id, random_string(4)))


def get_open_port():
sock = socket.socket()
sock.bind(("127.0.0.1", 0))
Expand Down Expand Up @@ -313,10 +348,8 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None,

if self.external:
self.child = ExternalService(self.host, self.port)
self._client = next(self.get_clients(1, client_id='_internal_client'))
self.running = True
else:
self._client = None
self.running = False

self.sasl_config = ''
Expand Down Expand Up @@ -500,8 +533,6 @@ def start(self):
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')

self._client = next(self.get_clients(1, client_id='_internal_client'))

self.out("Done!")
self.running = True

Expand Down Expand Up @@ -603,82 +634,6 @@ def _format_log_dirs(self):
raise RuntimeError("Failed to format log dirs!")
return True

def _send_request(self, request, timeout=None):
def _failure(error):
raise error
retries = 10
while True:
node_id = self._client.least_loaded_node()
for connect_retry in range(40):
self._client.maybe_connect(node_id)
if self._client.connected(node_id):
break
self._client.poll(timeout_ms=100)
else:
raise RuntimeError('Could not connect to broker with node id %s' % (node_id,))

try:
future = self._client.send(node_id, request)
future.error_on_callbacks = True
future.add_errback(_failure)
self._client.poll(future=future, timeout_ms=timeout)
if not future.is_done:
raise KafkaTimeoutError()
return future.value
except Exception as exc:
time.sleep(1)
retries -= 1
if retries == 0:
raise exc
else:
pass # retry

def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000):
if num_partitions is None:
num_partitions = self.partitions
if replication_factor is None:
replication_factor = self.replicas

# Try different methods to create a topic, from the fastest to the slowest
if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas:
self._create_topic_via_metadata(topic_name, timeout_ms)
elif env_kafka_version() >= (0, 10, 1, 0):
try:
self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
except InvalidReplicationFactorError:
# wait and try again
# in CI the brokers sometimes take a while to find themselves
time.sleep(0.5)
self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
else:
self._create_topic_via_cli(topic_name, num_partitions, replication_factor)

def _create_topic_via_metadata(self, topic_name, timeout_ms=10000):
timeout_at = time.time() + timeout_ms / 1000
while time.time() < timeout_at:
response = self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
if response.topics[0][0] == 0:
return
log.warning("Unable to create topic via MetadataRequest: err %d", response.topics[0][0])
time.sleep(1)
else:
raise RuntimeError('Unable to create topic via MetadataRequest')

def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
version = self._client.api_version(CreateTopicsRequest, max_version=7)
topic = CreateTopicsRequest.CreatableTopic(
name=topic_name,
num_partitions=num_partitions,
replication_factor=replication_factor,
assignments=[],
configs=[]
)
request = CreateTopicsRequest[version](topics=[topic], timeout_ms=timeout_ms)
response = self._send_request(request, timeout=timeout_ms)
for topic_result in response.topics:
if topic_result.error_code != 0:
raise errors.for_code(topic_result.error_code)

def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
args = self.run_script('kafka-topics.sh',
'--create',
Expand Down Expand Up @@ -726,10 +681,6 @@ def get_topic_names(self):
raise RuntimeError("Failed to list topics!")
return stdout.decode().splitlines(False)

def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
for topic_name in topic_names:
self._create_topic(topic_name, num_partitions, replication_factor)

def _enrich_client_params(self, params, **defaults):
params = params.copy()
for key, value in defaults.items():
Expand All @@ -746,34 +697,6 @@ def _enrich_client_params(self, params, **defaults):
params.setdefault('ssl_check_hostname', False)
return params

@staticmethod
def _create_many_clients(cnt, cls, *args, **params):
client_id = params['client_id']
for _ in range(cnt):
params['client_id'] = '%s_%s' % (client_id, random_string(4))
yield cls(*args, **params)

def get_clients(self, cnt=1, **params):
params = self._enrich_client_params(params, client_id='client')
for client in self._create_many_clients(cnt, KafkaClient, **params):
yield client

def get_admin_clients(self, cnt, **params):
params = self._enrich_client_params(params, client_id='admin_client')
for client in self._create_many_clients(cnt, KafkaAdminClient, **params):
yield client

def get_consumers(self, cnt, topics, **params):
params = self._enrich_client_params(
params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest'
)
for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params):
yield client

def get_producers(self, cnt, **params):
params = self._enrich_client_params(params, client_id='producer')
for client in self._create_many_clients(cnt, KafkaProducer, **params):
yield client


def get_api_versions():
Expand Down
3 changes: 2 additions & 1 deletion test/integration/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from kafka.structs import TopicPartition
from test.testutil import env_kafka_version, random_string
from test.integration.fixtures import create_topics


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -329,7 +330,7 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa
def _topic2(kafka_broker, request):
"""Same as `topic` fixture, but a different name if you need to topics."""
topic_name = '%s_%s' % (request.node.name, random_string(10))
kafka_broker.create_topics([topic_name])
create_topics(kafka_broker, [topic_name])
return topic_name

@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")
Expand Down
14 changes: 8 additions & 6 deletions test/integration/test_sasl_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@

import pytest

from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
from kafka.admin import NewTopic
from kafka.protocol.new.metadata import MetadataRequest
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
from test.integration.fixtures import client_params, create_topics


@pytest.fixture(
Expand All @@ -32,15 +34,15 @@ def sasl_kafka(request, kafka_broker_factory):

def test_admin(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
admin, = sasl_kafka.get_admin_clients(1)
admin = KafkaAdminClient(**client_params(sasl_kafka, 'admin'))
admin.create_topics([NewTopic(topic_name, 1, 1)])
assert topic_name in sasl_kafka.get_topic_names()


def test_produce_and_consume(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
sasl_kafka.create_topics([topic_name], num_partitions=2)
producer, = sasl_kafka.get_producers(1)
create_topics(sasl_kafka, [topic_name], num_partitions=2)
producer = KafkaProducer(**client_params(sasl_kafka, 'producer'))

messages_and_futures = [] # [(message, produce_future),]
for i in range(100):
Expand All @@ -52,7 +54,7 @@ def test_produce_and_consume(request, sasl_kafka):
for (msg, f) in messages_and_futures:
assert f.succeeded()

consumer, = sasl_kafka.get_consumers(1, [topic_name])
consumer = KafkaConsumer(topic_name, **client_params(sasl_kafka, 'consumer', auto_offset_reset='earliest'))
messages = {0: [], 1: []}
for i, message in enumerate(consumer, 1):
logging.debug("Consumed message %s", repr(message))
Expand All @@ -66,9 +68,9 @@ def test_produce_and_consume(request, sasl_kafka):

def test_client(request, sasl_kafka):
topic_name = special_to_underscore(request.node.name + random_string(4))
sasl_kafka.create_topics([topic_name], num_partitions=1)
create_topics(sasl_kafka, [topic_name], num_partitions=1)

client, = sasl_kafka.get_clients(1)
client = KafkaClient(**client_params(sasl_kafka, 'client'))
request = MetadataRequest(topics=None, version=1)
timeout_at = time.time() + 1
while not client.is_ready(0):
Expand Down
Loading