Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion tests/soak/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ OpenTelemetry reporting supported through OTLP.
5. Run some tests
```bash
TESTID=<testid> ./run.sh ccloud.config
```
```

## Share Consumer

To run with a share consumer instead of the regular consumer:
```bash
SHARE=true TESTID=<testid> ./run.sh ccloud.config
```
Requires KIP-932 compatible librdkafka and broker.
8 changes: 7 additions & 1 deletion tests/soak/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ topic="pysoak-$TESTID-$librdkafka_version"
logfile="${TESTID}.log.bz2"
limit=$((50 * 1024 * 1024)) # 50MB
export HOSTNAME=$(hostname)

share_flag=""
if [[ "${SHARE:-}" == "true" ]]; then
share_flag="--share"
fi

echo "Starting soak client using topic $topic. Logging to $logfile."
set +x
while [ "$run" = true ]; do
# Ignore SIGINT in children (inherited)
trap "" SIGINT
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 |& tee /dev/tty | bzip2 > $logfile &
time opentelemetry-instrument $testdir/soakclient.py -i $TESTID -t $topic -r 80 -f $1 $share_flag |& tee /dev/tty | bzip2 > $logfile &
PID=$!
terminate_last() {
# List children of $PID only
Expand Down
237 changes: 214 additions & 23 deletions tests/soak/soakclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# long term validation testing.
#
# Usage:
# tests/soak/soakclient.py -i <testid> -t <topic> -r <produce-rate> -f <client-conf-file>
# tests/soak/soakclient.py -i <testid> -t <topic> -r <produce-rate> -f <client-conf-file> [--share]
#
# A unique topic should be used for each soakclient instance.
#
Expand All @@ -43,6 +43,12 @@
from confluent_kafka import Consumer, KafkaError, KafkaException, Producer, version
from confluent_kafka.admin import AdminClient, NewTopic

try:
from confluent_kafka import ShareConsumer
HAS_SHARE_CONSUMER = True
except ImportError:
HAS_SHARE_CONSUMER = False


class SoakRecord(object):
"""A private record type, with JSON serializer and deserializer"""
Expand All @@ -68,7 +74,7 @@ def deserialize(cls, binstr):

class SoakClient(object):
"""The SoakClient consists of a Producer sending messages at
the given rate, and a Consumer consuming the messages.
the given rate, and a Consumer or ShareConsumer consuming the messages.
Both clients print their message and error counters every 10 seconds.
The producer and consumer run in separate background threads.
"""
Expand Down Expand Up @@ -329,6 +335,147 @@ def producer_error_cb(self, err):
self.producer_error_cb_cnt += 1
self.incr_counter("producer.errorcb", 1)

def share_error_cb(self, err):
"""Share consumer error callback"""
self.logger.error("share: error_cb: {}".format(err))
self.share_error_cb_cnt += 1
self.incr_counter("consumer.errorcb", 1)

def share_status(self):
"""Print share consumer status"""
self.logger.info(
"share: {} messages consumed, {} duplicates, "
"{} missed, {} message errors, {} consumer errors, {} error_cbs".format(
self.share_msg_cnt,
self.share_msg_dup_cnt,
self.share_msg_miss_cnt,
self.share_msg_err_cnt,
self.share_err_cnt,
self.share_error_cb_cnt,
)
)

def share_run(self):
"""Share consumer main loop"""
self.share_consumer.subscribe([self.topic])

self.share_msg_cnt = 0
self.share_msg_dup_cnt = 0
self.share_msg_miss_cnt = 0
self.share_msg_err_cnt = 0
self.share_err_cnt = 0
self.share_error_cb_cnt = 0

# Track highest offset seen per partition for duplicate/gap detection.
# With implicit ack and a single share consumer, offsets should
# progress sequentially per partition.
hwmarks = defaultdict(int)

next_status = time.time() + self.disprate

while self.run:
now = time.time()
if now > next_status:
self.share_status()
next_status = now + self.disprate

try:
messages = self.share_consumer.consume_batch(timeout=1.0)
except Exception as ex:
self.logger.error("share: consume_batch exception: {}".format(ex))
self.share_err_cnt += 1
self.incr_counter("consumer.error", 1)
continue

if not messages:
continue

for msg in messages:
if msg.error() is not None:
self.logger.error("share: error: {}".format(msg.error()))
self.share_err_cnt += 1
self.incr_counter("consumer.error", 1)
continue

try:
record = SoakRecord.deserialize(msg.value()) # noqa unused variable
except ValueError as ex:
self.logger.info(
"share: Failed to deserialize message in "
"{} [{}] at offset {} (headers {}): {}".format(
msg.topic(), msg.partition(), msg.offset(),
msg.headers(), ex
)
)
self.share_msg_err_cnt += 1
self.incr_counter("consumer.msgerr", 1)

self.share_msg_cnt += 1
self.incr_counter("consumer.msg", 1)

# end-to-end latency
headers = dict(msg.headers())
txtime = headers.get('time', None)
if txtime is not None:
latency = time.time() - float(txtime)
self.set_gauge(
"consumer.e2e_latency", latency,
tags={"partition": "{}".format(msg.partition())}
)

if (self.share_msg_cnt % self.disprate) == 0:
self.logger.info(
"share: {} messages consumed: Message {} "
"[{}] at offset {} (headers {})".format(
self.share_msg_cnt, msg.topic(),
msg.partition(), msg.offset(), msg.headers()
)
)

# Track per-partition high-water mark for duplicate/gap detection
hwkey = "{}-{}".format(msg.topic(), msg.partition())
hw = hwmarks[hwkey]

if hw > 0:
if msg.offset() <= hw:
self.logger.warning(
"share: Old or duplicate message {} "
"[{}] at offset {} (headers {}): wanted offset > {}".format(
msg.topic(), msg.partition(), msg.offset(),
msg.headers(), hw
)
)
self.share_msg_dup_cnt += (hw + 1) - msg.offset()
self.incr_counter("consumer.msgdup", 1)
elif msg.offset() > hw + 1:
self.logger.warning(
"share: Lost messages, now at {} "
"[{}] at offset {} (headers {}): "
"expected offset {}+1".format(
msg.topic(), msg.partition(), msg.offset(),
msg.headers(), hw
)
)
self.share_msg_miss_cnt += msg.offset() - (hw + 1)
self.incr_counter("consumer.missedmsg", 1)

hwmarks[hwkey] = msg.offset()

self.share_consumer.close()
self.share_status()

def share_thread_main(self):
"""Share consumer thread main function"""
try:
self.share_run()
except KeyboardInterrupt:
self.logger.info("share: aborted by user")
self.run = False
except Exception as ex:
self.logger.fatal("share: fatal exception: {}\n{}".format(
ex, traceback.print_exc()))
self.run = False

def rtt_stats(self, d):
"""Extract broker rtt statistics from the stats dict in @param d"""

Expand Down Expand Up @@ -394,7 +541,7 @@ def create_topic(self, topic, conf):
else:
raise

def __init__(self, testid, topic, rate, conf):
def __init__(self, testid, topic, rate, conf, enable_share=False):
"""SoakClient constructor. conf is the client configuration"""
self.topic = topic
self.rate = rate
Expand Down Expand Up @@ -446,50 +593,90 @@ def filter_config(conf, filter_out, strip_prefix):
return out

# Create topic (might already exist)
aconf = filter_config(conf, ["consumer.", "producer."], "admin.")
aconf = filter_config(conf, ["consumer.", "producer.", "share."], "admin.")
aconf['client.id'] = self.testid
self.create_topic(self.topic, aconf)

#
# Create Producer and Consumer, each running in its own thread.
# Create Producer and Consumer/ShareConsumer, each in its own thread.
#
conf['stats_cb'] = self.stats_cb
conf['statistics.interval.ms'] = 120000

# Producer
pconf = filter_config(conf, ["consumer.", "admin."], "producer.")
pconf = filter_config(conf, ["consumer.", "admin.", "share."], "producer.")
pconf['error_cb'] = self.producer_error_cb
pconf['client.id'] = self.testid
self.producer = Producer(pconf)

# Consumer
cconf = filter_config(conf, ["producer.", "admin."], "consumer.")
cconf['error_cb'] = self.consumer_error_cb
cconf['on_commit'] = self.consumer_commit_cb
self.logger.info("consumer: using group.id {}".format(cconf['group.id']))
cconf['client.id'] = self.testid
self.consumer = Consumer(cconf)

# Initialize some counters to zero to make them appear in the metrics
self.incr_counter("consumer.error", 0)
self.incr_counter("consumer.msgdup", 0)
self.incr_counter("producer.errorcb", 0)

# Create and start producer thread
self.producer_thread = threading.Thread(target=self.producer_thread_main)
self.producer_thread.start()

# Create and start consumer thread
self.consumer_thread = threading.Thread(target=self.consumer_thread_main)
self.consumer_thread.start()
self.consumer = None
self.consumer_thread = None
self.share_consumer = None
self.share_thread = None

if enable_share:
if not HAS_SHARE_CONSUMER:
raise RuntimeError(
"ShareConsumer requested but not available in this "
"confluent_kafka build."
)

sconf = filter_config(conf, ["consumer.", "producer.", "admin."], "share.")
sconf['error_cb'] = self.share_error_cb
sconf['stats_cb'] = self.stats_cb
sconf['statistics.interval.ms'] = 120000
sconf['client.id'] = self.testid

# Always set a share-specific group.id.
sconf['group.id'] = 'soakclient-share-{}-{}-{}'.format(
self.hostname, version(), sys.version.split(' ')[0]
)

self.logger.info("share: using group.id {}".format(sconf['group.id']))
self.share_consumer = ShareConsumer(sconf)

# Initialize counters to zero
self.incr_counter("consumer.error", 0)
self.incr_counter("consumer.msgdup", 0)
self.incr_counter("consumer.msgerr", 0)
self.incr_counter("consumer.errorcb", 0)

# Create and start share consumer thread
self.share_thread = threading.Thread(target=self.share_thread_main)
self.share_thread.start()
else:
# Consumer
cconf = filter_config(conf, ["producer.", "admin.", "share."], "consumer.")
cconf['error_cb'] = self.consumer_error_cb
cconf['on_commit'] = self.consumer_commit_cb
self.logger.info("consumer: using group.id {}".format(cconf['group.id']))
cconf['client.id'] = self.testid
self.consumer = Consumer(cconf)

# Initialize some counters to zero to make them appear in the metrics
self.incr_counter("consumer.error", 0)
self.incr_counter("consumer.msgdup", 0)

# Create and start consumer thread
self.consumer_thread = threading.Thread(target=self.consumer_thread_main)
self.consumer_thread.start()

def terminate(self):
"""Terminate Producer and Consumer"""
"""Terminate Producer and Consumer/Share Consumer"""
soak.logger.info("Terminating (ran for {}s)".format(time.time() - self.start_time))
self.run = False
# Wait for background threads to finish.
self.producer_thread.join()
self.consumer_thread.join()
if self.share_thread is not None:
self.share_thread.join()
else:
self.consumer_thread.join()

# Final resource usage
soak.get_rusage()
Expand Down Expand Up @@ -576,6 +763,10 @@ def get_rusage(self):
parser.add_argument(
'-f', dest='conffile', type=argparse.FileType('r'), help='Configuration file (configprop=value format)'
)
parser.add_argument(
'--share', dest='share', action='store_true', default=False,
help='Enable share consumer thread'
)

args = parser.parse_args()

Expand Down Expand Up @@ -606,7 +797,7 @@ def get_rusage(self):
conf['enable.partition.eof'] = False

# Create SoakClient
soak = SoakClient(args.testid, args.topic, args.rate, conf)
soak = SoakClient(args.testid, args.topic, args.rate, conf, enable_share=args.share)

# Get initial resource usage
soak.get_rusage()
Expand Down