Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@


import io.openmessaging.benchmark.utils.distributor.KeyDistributorType;
import java.util.Map;

public class Workload {
public String name;
Expand All @@ -29,6 +30,23 @@ public class Workload {

public int messageSize;

/**
* Message size distribution for variable-sized payloads.
* Keys are size ranges (e.g., "0-256", "256-1024", "1KB-4KB"),
* values are relative weights.
* Mutually exclusive with messageSize - if set, messageSize is ignored.
*/
public Map<String, Integer> messageSizeDistribution;

/**
* Returns true if this workload uses a size distribution instead of fixed size.
*
* @return true if messageSizeDistribution is configured, false otherwise
*/
public boolean usesDistribution() {
return messageSizeDistribution != null && !messageSizeDistribution.isEmpty();
}

public boolean useRandomizedPayloads;
public double randomBytesRatio;
public int randomizedPayloadPoolSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.openmessaging.benchmark.utils.RandomGenerator;
import io.openmessaging.benchmark.utils.Timer;
import io.openmessaging.benchmark.utils.payload.FilePayloadReader;
import io.openmessaging.benchmark.utils.payload.MessageSizeDistribution;
import io.openmessaging.benchmark.utils.payload.PayloadReader;
import io.openmessaging.benchmark.worker.Worker;
import io.openmessaging.benchmark.worker.commands.ConsumerAssignment;
Expand Down Expand Up @@ -95,16 +96,39 @@ public TestResult run() throws Exception {
});
}

final PayloadReader payloadReader = new FilePayloadReader(workload.messageSize);

ProducerWorkAssignment producerWorkAssignment = new ProducerWorkAssignment();
producerWorkAssignment.keyDistributorType = workload.keyDistributor;
producerWorkAssignment.publishRate = targetPublishRate;
producerWorkAssignment.payloadData = new ArrayList<>();

if (workload.useRandomizedPayloads) {
// create messages that are part random and part zeros
// better for testing effects of compression
if (workload.usesDistribution()) {
// Distribution mode: create one payload per bucket with weighted selection at runtime
MessageSizeDistribution dist = new MessageSizeDistribution(workload.messageSizeDistribution);
List<Integer> sizes = dist.getBucketMaxSizes();
Random r = new Random();

log.info(
"Creating {} payloads for size distribution (max sizes: {}, weighted avg: {} bytes)",
sizes.size(),
sizes,
dist.getAvgSize());

for (int size : sizes) {
byte[] payload = new byte[size];
if (workload.useRandomizedPayloads) {
int randomBytes = (int) (size * workload.randomBytesRatio);
r.nextBytes(payload);
// Zero out non-random portion for compressibility testing
for (int j = randomBytes; j < size; j++) {
payload[j] = 0;
}
}
producerWorkAssignment.payloadData.add(payload);
}
producerWorkAssignment.payloadWeights = dist.getWeights();

} else if (workload.useRandomizedPayloads) {
// Existing fixed-size randomized payload logic
Random r = new Random();
int randomBytes = (int) (workload.messageSize * workload.randomBytesRatio);
int zerodBytes = workload.messageSize - randomBytes;
Expand All @@ -116,6 +140,8 @@ public TestResult run() throws Exception {
producerWorkAssignment.payloadData.add(combined);
}
} else {
// Existing file-based payload logic
final PayloadReader payloadReader = new FilePayloadReader(workload.messageSize);
producerWorkAssignment.payloadData.add(payloadReader.load(workload.payloadFile));
}

Expand Down Expand Up @@ -275,13 +301,19 @@ private void buildAndDrainBacklog(int testDurationMinutes) throws IOException {

this.needToWaitForBacklogDraining = true;

// Use average size when distribution is configured, otherwise fixed messageSize
int effectiveMessageSize =
workload.usesDistribution()
? new MessageSizeDistribution(workload.messageSizeDistribution).getAvgSize()
: workload.messageSize;

long requestedBacklogSize = workload.consumerBacklogSizeGB * 1024 * 1024 * 1024;

while (true) {
CountersStats stats = worker.getCountersStats();
long currentBacklogSize =
(workload.subscriptionsPerTopic * stats.messagesSent - stats.messagesReceived)
* workload.messageSize;
* effectiveMessageSize;

if (currentBacklogSize >= requestedBacklogSize) {
break;
Expand All @@ -300,7 +332,7 @@ private void buildAndDrainBacklog(int testDurationMinutes) throws IOException {

worker.resumeConsumers();

long backlogMessageCapacity = requestedBacklogSize / workload.messageSize;
long backlogMessageCapacity = requestedBacklogSize / effectiveMessageSize;
long backlogEmptyLevel = (long) ((1.0 - workload.backlogDrainRatio) * backlogMessageCapacity);
final long minBacklog = Math.max(1000L, backlogEmptyLevel);

Expand Down Expand Up @@ -343,7 +375,11 @@ private TestResult printAndCollectStats(long testDurations, TimeUnit unit) throw
result.driver = driverName;
result.topics = workload.topics;
result.partitions = workload.partitionsPerTopic;
result.messageSize = workload.messageSize;
// Use average size when distribution is configured
result.messageSize =
workload.usesDistribution()
? new MessageSizeDistribution(workload.messageSizeDistribution).getAvgSize()
: workload.messageSize;
result.producersPerTopic = workload.producersPerTopic;
result.consumersPerTopic = workload.consumerPerSubscription;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.benchmark.utils.payload;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Parses and represents a message size distribution from workload config.
* Creates one payload size per bucket and provides weights for runtime selection.
*
* <p>Example configuration:
* <pre>
* messageSizeDistribution:
* "0-256": 234
* "256-1024": 456
* "1024-4096": 678
* </pre>
*/
public class MessageSizeDistribution {

private final List<Bucket> buckets;
private final int totalWeight;

/**
* Represents a single size bucket with min/max range and weight.
*/
public static class Bucket {
public final int minSize;
public final int maxSize;
public final int weight;

Bucket(int minSize, int maxSize, int weight) {
this.minSize = minSize;
this.maxSize = maxSize;
this.weight = weight;
}

/**
* Returns midpoint of range as the representative size for this bucket.
*
* @return the representative size (midpoint of min and max)
*/
public int getRepresentativeSize() {
return (minSize + maxSize) / 2;
}
}

public MessageSizeDistribution(Map<String, Integer> config) {
if (config == null || config.isEmpty()) {
throw new IllegalArgumentException("Distribution config cannot be null or empty");
}

List<Bucket> parsed = new ArrayList<>();
int total = 0;

for (Map.Entry<String, Integer> e : config.entrySet()) {
int[] range = parseRange(e.getKey());
int weight = e.getValue();
if (weight < 0) {
throw new IllegalArgumentException("Weight cannot be negative: " + weight);
}
parsed.add(new Bucket(range[0], range[1], weight));
total += weight;
}

this.buckets = parsed;
this.totalWeight = total;

if (totalWeight <= 0) {
throw new IllegalArgumentException("Distribution weights must sum to a positive value");
}
}

private int[] parseRange(String range) {
if (range == null || range.isEmpty()) {
throw new IllegalArgumentException("Range cannot be null or empty");
}

// Find the last hyphen that separates min and max (to handle negative numbers if any)
int lastHyphen = range.lastIndexOf('-');
if (lastHyphen <= 0) {
throw new IllegalArgumentException("Invalid range format (expected 'min-max'): " + range);
}

String minPart = range.substring(0, lastHyphen);
String maxPart = range.substring(lastHyphen + 1);

int minSize = parseSize(minPart);
int maxSize = parseSize(maxPart);

if (minSize < 0) {
throw new IllegalArgumentException("Min size cannot be negative: " + minSize);
}
if (maxSize < minSize) {
throw new IllegalArgumentException(
"Max size must be >= min size: " + minSize + " > " + maxSize);
}

return new int[] {minSize, maxSize};
}

private int parseSize(String s) {
s = s.trim().toUpperCase();
int mult = 1;

if (s.endsWith("KB")) {
mult = 1024;
s = s.substring(0, s.length() - 2);
} else if (s.endsWith("MB")) {
mult = 1024 * 1024;
s = s.substring(0, s.length() - 2);
} else if (s.endsWith("B")) {
s = s.substring(0, s.length() - 1);
}

try {
return Integer.parseInt(s.trim()) * mult;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid size format: " + s, e);
}
}

/**
* Returns list of representative sizes, one per bucket (for payload generation).
*
* @return list of representative sizes
*/
public List<Integer> getBucketSizes() {
List<Integer> sizes = new ArrayList<>();
for (Bucket b : buckets) {
sizes.add(b.getRepresentativeSize());
}
return sizes;
}

/**
* Returns list of max sizes, one per bucket (for payload generation).
* Using max sizes ensures the system is tested with the largest messages in each bucket range.
*
* @return list of max sizes per bucket
*/
public List<Integer> getBucketMaxSizes() {
List<Integer> sizes = new ArrayList<>();
for (Bucket b : buckets) {
sizes.add(b.maxSize);
}
return sizes;
}

/**
* Returns weights array matching bucket order (for runtime selection).
*
* @return array of weights for each bucket
*/
public int[] getWeights() {
int[] weights = new int[buckets.size()];
for (int i = 0; i < buckets.size(); i++) {
weights[i] = buckets.get(i).weight;
}
return weights;
}

/**
* Returns cumulative weights array for O(log n) binary search selection.
*
* @return array of cumulative weights
*/
public int[] getCumulativeWeights() {
int[] cumulative = new int[buckets.size()];
int sum = 0;
for (int i = 0; i < buckets.size(); i++) {
sum += buckets.get(i).weight;
cumulative[i] = sum;
}
return cumulative;
}

public int getTotalWeight() {
return totalWeight;
}

public int getMaxSize() {
return buckets.stream().mapToInt(b -> b.maxSize).max().orElse(0);
}

/**
* Returns weighted average size across all buckets.
*
* @return the weighted average size
*/
public int getAvgSize() {
long sum = 0;
for (Bucket b : buckets) {
sum += (long) b.getRepresentativeSize() * b.weight;
}
return (int) (sum / totalWeight);
}

public int getBucketCount() {
return buckets.size();
}

public List<Bucket> getBuckets() {
return buckets;
}
}

Loading