Skip to content

Commit fd3be9a

Browse files
authored
Simplify interacting with KafkaFixture (client factories, create_topics) (#2808)
1 parent aade6ec commit fd3be9a

4 files changed

Lines changed: 85 additions & 152 deletions

File tree

test/integration/conftest.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
import pytest
66

7+
from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
78
from test.testutil import env_kafka_version, random_string
8-
from test.integration.fixtures import KafkaFixture, ZookeeperFixture
9+
from test.integration.fixtures import KafkaFixture, ZookeeperFixture, create_topics, client_params
910

1011

1112
@pytest.fixture(scope="module")
@@ -55,10 +56,11 @@ def factory(**broker_params):
5556
zk.close()
5657

5758

59+
5860
@pytest.fixture
5961
def kafka_client(kafka_broker, request):
6062
"""Return a KafkaClient fixture"""
61-
(client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
63+
client = KafkaClient(**client_params(kafka_broker, request.node.name))
6264
yield client
6365
client.close()
6466

@@ -72,19 +74,23 @@ def kafka_consumer(kafka_consumer_factory):
7274
@pytest.fixture
7375
def kafka_consumer_factory(kafka_broker, topic, request):
7476
"""Return a KafkaConsumer factory fixture"""
75-
_consumer = [None]
76-
77-
def factory(topics=(topic,), **kafka_consumer_params):
78-
params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
79-
params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
80-
params.setdefault('auto_offset_reset', 'earliest')
81-
_consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=list(topics), **params))
82-
return _consumer[0]
77+
consumer = None
78+
79+
def factory(topics=(topic,), **override_params):
80+
nonlocal consumer
81+
params = {
82+
'heartbeat_interval_ms': 500,
83+
'auto_offset_reset': 'earliest',
84+
}
85+
params.update(override_params)
86+
params = client_params(kafka_broker, request.node.name, **params)
87+
consumer = KafkaConsumer(*topics, **params)
88+
return consumer
8389

8490
yield factory
8591

86-
if _consumer[0]:
87-
_consumer[0].close()
92+
if consumer:
93+
consumer.close()
8894

8995

9096
@pytest.fixture
@@ -95,19 +101,19 @@ def kafka_producer(kafka_producer_factory):
95101

96102
@pytest.fixture
97103
def kafka_producer_factory(kafka_broker, request):
98-
"""Return a KafkaProduce factory fixture"""
99-
_producer = [None]
104+
"""Return a KafkaProducer factory fixture"""
105+
producer = None
100106

101-
def factory(**kafka_producer_params):
102-
params = {} if kafka_producer_params is None else kafka_producer_params.copy()
103-
params.setdefault('client_id', 'producer_%s' % (request.node.name,))
104-
_producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
105-
return _producer[0]
107+
def factory(**params):
108+
nonlocal producer
109+
params = client_params(kafka_broker, request.node.name, **params)
110+
producer = KafkaProducer(**params)
111+
return producer
106112

107113
yield factory
108114

109-
if _producer[0]:
110-
_producer[0].close()
115+
if producer:
116+
producer.close()
111117

112118

113119
@pytest.fixture
@@ -119,24 +125,25 @@ def kafka_admin_client(kafka_admin_client_factory):
119125
@pytest.fixture
120126
def kafka_admin_client_factory(kafka_broker):
121127
"""Return a KafkaAdminClient factory fixture"""
122-
_admin_client = [None]
128+
admin_client = None
123129

124-
def factory(**kafka_admin_client_params):
125-
params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
126-
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
127-
return _admin_client[0]
130+
def factory(**params):
131+
nonlocal admin_client
132+
params = client_params(kafka_broker, 'admin', **params)
133+
admin_client = KafkaAdminClient(**params)
134+
return admin_client
128135

129136
yield factory
130137

131-
if _admin_client[0]:
132-
_admin_client[0].close()
138+
if admin_client:
139+
admin_client.close()
133140

134141

135142
@pytest.fixture
136143
def topic(kafka_broker, request):
137144
"""Return a topic fixture"""
138145
topic_name = '%s_%s' % (request.node.name, random_string(10))
139-
kafka_broker.create_topics([topic_name])
146+
create_topics(kafka_broker, [topic_name])
140147
return topic_name
141148

142149

test/integration/fixtures.py

Lines changed: 39 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,51 @@
1111

1212
import py
1313

14-
from kafka import errors, KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
15-
from kafka.errors import InvalidReplicationFactorError, KafkaTimeoutError
16-
from kafka.protocol.new.admin import CreateTopicsRequest
17-
from kafka.protocol.new.metadata import MetadataRequest
14+
from kafka import errors, KafkaAdminClient
15+
from kafka.errors import InvalidReplicationFactorError
1816
from test.testutil import env_kafka_version, random_string
1917
from test.service import ExternalService, SpawnedService
2018

2119
log = logging.getLogger(__name__)
2220

2321

22+
def create_topics(broker, topic_names, num_partitions=None, replication_factor=None):
23+
"""Create topics on the given broker fixture.
24+
25+
Uses KafkaAdminClient for Kafka 0.10.1+, falls back to CLI.
26+
"""
27+
if num_partitions is None:
28+
num_partitions = broker.partitions
29+
if replication_factor is None:
30+
replication_factor = broker.replicas
31+
if env_kafka_version() >= (0, 10, 1, 0):
32+
_create_topics_via_admin(broker, topic_names, num_partitions, replication_factor)
33+
else:
34+
for topic_name in topic_names:
35+
# TODO: verify kafka-topics.sh support for early 0.8 brokers
36+
broker._create_topic_via_cli(topic_name, num_partitions, replication_factor)
37+
38+
39+
def _create_topics_via_admin(broker, topic_names, num_partitions, replication_factor):
40+
from kafka.admin import NewTopic
41+
params = broker._enrich_client_params({}, client_id='topic_creator')
42+
admin = KafkaAdminClient(**params)
43+
try:
44+
topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names]
45+
admin.create_topics(topics)
46+
except InvalidReplicationFactorError:
47+
time.sleep(0.5)
48+
topics = [NewTopic(name, num_partitions, replication_factor) for name in topic_names]
49+
admin.create_topics(topics)
50+
finally:
51+
admin.close()
52+
53+
54+
def client_params(broker, client_id='client', **overrides):
55+
"""Build connection params for a client from a broker fixture."""
56+
return broker._enrich_client_params(overrides, client_id='%s_%s' % (client_id, random_string(4)))
57+
58+
2459
def get_open_port():
2560
sock = socket.socket()
2661
sock.bind(("127.0.0.1", 0))
@@ -313,10 +348,8 @@ def __init__(self, host, port, broker_id, zookeeper=None, zk_chroot=None,
313348

314349
if self.external:
315350
self.child = ExternalService(self.host, self.port)
316-
self._client = next(self.get_clients(1, client_id='_internal_client'))
317351
self.running = True
318352
else:
319-
self._client = None
320353
self.running = False
321354

322355
self.sasl_config = ''
@@ -500,8 +533,6 @@ def start(self):
500533
else:
501534
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
502535

503-
self._client = next(self.get_clients(1, client_id='_internal_client'))
504-
505536
self.out("Done!")
506537
self.running = True
507538

@@ -603,82 +634,6 @@ def _format_log_dirs(self):
603634
raise RuntimeError("Failed to format log dirs!")
604635
return True
605636

606-
def _send_request(self, request, timeout=None):
607-
def _failure(error):
608-
raise error
609-
retries = 10
610-
while True:
611-
node_id = self._client.least_loaded_node()
612-
for connect_retry in range(40):
613-
self._client.maybe_connect(node_id)
614-
if self._client.connected(node_id):
615-
break
616-
self._client.poll(timeout_ms=100)
617-
else:
618-
raise RuntimeError('Could not connect to broker with node id %s' % (node_id,))
619-
620-
try:
621-
future = self._client.send(node_id, request)
622-
future.error_on_callbacks = True
623-
future.add_errback(_failure)
624-
self._client.poll(future=future, timeout_ms=timeout)
625-
if not future.is_done:
626-
raise KafkaTimeoutError()
627-
return future.value
628-
except Exception as exc:
629-
time.sleep(1)
630-
retries -= 1
631-
if retries == 0:
632-
raise exc
633-
else:
634-
pass # retry
635-
636-
def _create_topic(self, topic_name, num_partitions=None, replication_factor=None, timeout_ms=10000):
637-
if num_partitions is None:
638-
num_partitions = self.partitions
639-
if replication_factor is None:
640-
replication_factor = self.replicas
641-
642-
# Try different methods to create a topic, from the fastest to the slowest
643-
if self.auto_create_topic and num_partitions == self.partitions and replication_factor == self.replicas:
644-
self._create_topic_via_metadata(topic_name, timeout_ms)
645-
elif env_kafka_version() >= (0, 10, 1, 0):
646-
try:
647-
self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
648-
except InvalidReplicationFactorError:
649-
# wait and try again
650-
# in CI the brokers sometimes take a while to find themselves
651-
time.sleep(0.5)
652-
self._create_topic_via_admin_api(topic_name, num_partitions, replication_factor, timeout_ms)
653-
else:
654-
self._create_topic_via_cli(topic_name, num_partitions, replication_factor)
655-
656-
def _create_topic_via_metadata(self, topic_name, timeout_ms=10000):
657-
timeout_at = time.time() + timeout_ms / 1000
658-
while time.time() < timeout_at:
659-
response = self._send_request(MetadataRequest[0]([topic_name]), timeout_ms)
660-
if response.topics[0][0] == 0:
661-
return
662-
log.warning("Unable to create topic via MetadataRequest: err %d", response.topics[0][0])
663-
time.sleep(1)
664-
else:
665-
raise RuntimeError('Unable to create topic via MetadataRequest')
666-
667-
def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_factor, timeout_ms=10000):
668-
version = self._client.api_version(CreateTopicsRequest, max_version=7)
669-
topic = CreateTopicsRequest.CreatableTopic(
670-
name=topic_name,
671-
num_partitions=num_partitions,
672-
replication_factor=replication_factor,
673-
assignments=[],
674-
configs=[]
675-
)
676-
request = CreateTopicsRequest[version](topics=[topic], timeout_ms=timeout_ms)
677-
response = self._send_request(request, timeout=timeout_ms)
678-
for topic_result in response.topics:
679-
if topic_result.error_code != 0:
680-
raise errors.for_code(topic_result.error_code)
681-
682637
def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
683638
args = self.run_script('kafka-topics.sh',
684639
'--create',
@@ -726,10 +681,6 @@ def get_topic_names(self):
726681
raise RuntimeError("Failed to list topics!")
727682
return stdout.decode().splitlines(False)
728683

729-
def create_topics(self, topic_names, num_partitions=None, replication_factor=None):
730-
for topic_name in topic_names:
731-
self._create_topic(topic_name, num_partitions, replication_factor)
732-
733684
def _enrich_client_params(self, params, **defaults):
734685
params = params.copy()
735686
for key, value in defaults.items():
@@ -746,34 +697,6 @@ def _enrich_client_params(self, params, **defaults):
746697
params.setdefault('ssl_check_hostname', False)
747698
return params
748699

749-
@staticmethod
750-
def _create_many_clients(cnt, cls, *args, **params):
751-
client_id = params['client_id']
752-
for _ in range(cnt):
753-
params['client_id'] = '%s_%s' % (client_id, random_string(4))
754-
yield cls(*args, **params)
755-
756-
def get_clients(self, cnt=1, **params):
757-
params = self._enrich_client_params(params, client_id='client')
758-
for client in self._create_many_clients(cnt, KafkaClient, **params):
759-
yield client
760-
761-
def get_admin_clients(self, cnt, **params):
762-
params = self._enrich_client_params(params, client_id='admin_client')
763-
for client in self._create_many_clients(cnt, KafkaAdminClient, **params):
764-
yield client
765-
766-
def get_consumers(self, cnt, topics, **params):
767-
params = self._enrich_client_params(
768-
params, client_id='consumer', heartbeat_interval_ms=500, auto_offset_reset='earliest'
769-
)
770-
for client in self._create_many_clients(cnt, KafkaConsumer, *topics, **params):
771-
yield client
772-
773-
def get_producers(self, cnt, **params):
774-
params = self._enrich_client_params(params, client_id='producer')
775-
for client in self._create_many_clients(cnt, KafkaProducer, **params):
776-
yield client
777700

778701

779702
def get_api_versions():

test/integration/test_admin_integration.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from kafka.structs import TopicPartition
1919
from test.testutil import env_kafka_version, random_string
20+
from test.integration.fixtures import create_topics
2021

2122

2223
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -329,7 +330,7 @@ def test_delete_consumergroups_with_errors(kafka_admin_client, kafka_consumer_fa
329330
def _topic2(kafka_broker, request):
330331
"""Same as `topic` fixture, but a different name if you need to topics."""
331332
topic_name = '%s_%s' % (request.node.name, random_string(10))
332-
kafka_broker.create_topics([topic_name])
333+
create_topics(kafka_broker, [topic_name])
333334
return topic_name
334335

335336
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Delete records requires broker >=0.11.0")

test/integration/test_sasl_integration.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
import pytest
66

7+
from kafka import KafkaAdminClient, KafkaClient, KafkaConsumer, KafkaProducer
78
from kafka.admin import NewTopic
89
from kafka.protocol.new.metadata import MetadataRequest
910
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
11+
from test.integration.fixtures import client_params, create_topics
1012

1113

1214
@pytest.fixture(
@@ -32,15 +34,15 @@ def sasl_kafka(request, kafka_broker_factory):
3234

3335
def test_admin(request, sasl_kafka):
3436
topic_name = special_to_underscore(request.node.name + random_string(4))
35-
admin, = sasl_kafka.get_admin_clients(1)
37+
admin = KafkaAdminClient(**client_params(sasl_kafka, 'admin'))
3638
admin.create_topics([NewTopic(topic_name, 1, 1)])
3739
assert topic_name in sasl_kafka.get_topic_names()
3840

3941

4042
def test_produce_and_consume(request, sasl_kafka):
4143
topic_name = special_to_underscore(request.node.name + random_string(4))
42-
sasl_kafka.create_topics([topic_name], num_partitions=2)
43-
producer, = sasl_kafka.get_producers(1)
44+
create_topics(sasl_kafka, [topic_name], num_partitions=2)
45+
producer = KafkaProducer(**client_params(sasl_kafka, 'producer'))
4446

4547
messages_and_futures = [] # [(message, produce_future),]
4648
for i in range(100):
@@ -52,7 +54,7 @@ def test_produce_and_consume(request, sasl_kafka):
5254
for (msg, f) in messages_and_futures:
5355
assert f.succeeded()
5456

55-
consumer, = sasl_kafka.get_consumers(1, [topic_name])
57+
consumer = KafkaConsumer(topic_name, **client_params(sasl_kafka, 'consumer', auto_offset_reset='earliest'))
5658
messages = {0: [], 1: []}
5759
for i, message in enumerate(consumer, 1):
5860
logging.debug("Consumed message %s", repr(message))
@@ -66,9 +68,9 @@ def test_produce_and_consume(request, sasl_kafka):
6668

6769
def test_client(request, sasl_kafka):
6870
topic_name = special_to_underscore(request.node.name + random_string(4))
69-
sasl_kafka.create_topics([topic_name], num_partitions=1)
71+
create_topics(sasl_kafka, [topic_name], num_partitions=1)
7072

71-
client, = sasl_kafka.get_clients(1)
73+
client = KafkaClient(**client_params(sasl_kafka, 'client'))
7274
request = MetadataRequest(topics=None, version=1)
7375
timeout_at = time.time() + 1
7476
while not client.is_ready(0):

0 commit comments

Comments
 (0)