Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -750,34 +750,47 @@ public CompletableFuture<Void> splitNamespaceBundleAsync(ServiceUnitId bundle,
final String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(bundle.toString());
NamespaceBundle namespaceBundle =
pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(namespaceName, bundleRange);
return pulsar.getNamespaceService().getSplitBoundary(namespaceBundle, splitAlgorithm, boundaries)
.thenCompose(splitBundlesPair -> {
if (splitBundlesPair == null) {
String msg = format("Bundle %s not found under namespace", namespaceBundle);
log.error(msg);
return FutureUtil.failedFuture(new IllegalStateException(msg));
return pulsar.getNamespaceService().getSplitBoundary(namespaceBundle, boundaries, splitAlgorithm)
.thenCompose(splitBoundaries -> {
if (splitBoundaries == null || splitBoundaries.isEmpty()) {
log.info("[{}] No valid boundary found in {} to split bundle {}",
namespaceBundle.getNamespaceObject(), boundaries, namespaceBundle.getBundleRange());
return CompletableFuture.completedFuture(null);
}

return getOwnershipAsync(Optional.empty(), bundle)
.thenCompose(brokerOpt -> {
if (brokerOpt.isEmpty()) {
String msg = String.format("Namespace bundle: %s is not owned by any broker.",
bundle);
log.warn(msg);
throw new IllegalStateException(msg);
return pulsar.getNamespaceService().getNamespaceBundleFactory()
.splitBundles(namespaceBundle, splitBoundaries.size() + 1, splitBoundaries)
.thenCompose(splitBundlesPair -> {
if (splitBundlesPair == null) {
String msg = format("Bundle %s not found under namespace", namespaceBundle);
log.error(msg);
return FutureUtil.failedFuture(new IllegalStateException(msg));
}
String sourceBroker = brokerOpt.get();
SplitDecision splitDecision = new SplitDecision();
List<NamespaceBundle> splitBundles = splitBundlesPair.getRight();
Map<String, Optional<String>> splitServiceUnitToDestBroker = new HashMap<>();
splitBundles.forEach(splitBundle -> splitServiceUnitToDestBroker
.put(splitBundle.getBundleRange(), Optional.empty()));
splitDecision.setSplit(
new Split(bundle.toString(), sourceBroker, splitServiceUnitToDestBroker));
splitDecision.setLabel(Success);
splitDecision.setReason(Admin);
return splitAsync(splitDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);

return getOwnershipAsync(Optional.empty(), bundle)
.thenCompose(brokerOpt -> {
if (brokerOpt.isEmpty()) {
String msg = String.format(
"Namespace bundle: %s is not owned by any broker.",
bundle);
log.warn(msg);
throw new IllegalStateException(msg);
}
String sourceBroker = brokerOpt.get();
SplitDecision splitDecision = new SplitDecision();
List<NamespaceBundle> splitBundles = splitBundlesPair.getRight();
Map<String, Optional<String>> splitServiceUnitToDestBroker =
new HashMap<>();
splitBundles.forEach(splitBundle -> splitServiceUnitToDestBroker
.put(splitBundle.getBundleRange(), Optional.empty()));
splitDecision.setSplit(
new Split(bundle.toString(), sourceBroker,
splitServiceUnitToDestBroker));
splitDecision.setLabel(Success);
splitDecision.setReason(Admin);
return splitAsync(splitDecision,
conf.getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS);
});
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.pulsar.common.lookup.GetTopicsResult;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.BundleSplitOption;
import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.FlowOrQpsEquallyDivideBundleSplitOption;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -950,7 +951,7 @@ void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
CompletableFuture<Void> completionFuture,
NamespaceBundleSplitAlgorithm splitAlgorithm,
List<Long> boundaries) {
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config, splitAlgorithm);

splitAlgorithm.getSplitBoundary(bundleSplitOption).whenComplete((splitBoundaries, ex) -> {
CompletableFuture<List<NamespaceBundle>> updateFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -1093,16 +1094,16 @@ public CompletableFuture<Pair<NamespaceBundles, List<NamespaceBundle>>> getSplit

public CompletableFuture<List<Long>> getSplitBoundary(
NamespaceBundle bundle, List<Long> boundaries, NamespaceBundleSplitAlgorithm nsBundleSplitAlgorithm) {
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config);
BundleSplitOption bundleSplitOption = getBundleSplitOption(bundle, boundaries, config, nsBundleSplitAlgorithm);
return nsBundleSplitAlgorithm.getSplitBoundary(bundleSplitOption);
}

private BundleSplitOption getBundleSplitOption(NamespaceBundle bundle,
List<Long> boundaries,
ServiceConfiguration config) {
ServiceConfiguration config,
NamespaceBundleSplitAlgorithm splitAlgorithm) {
BundleSplitOption bundleSplitOption;
if (config.getDefaultNamespaceBundleSplitAlgorithm()
.equals(NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE)) {
if (splitAlgorithm instanceof FlowOrQpsEquallyDivideBundleSplitAlgorithm) {
Map<String, TopicStatsImpl> topicStatsMap = pulsar.getBrokerService().getTopicStats(bundle);
bundleSplitOption = new FlowOrQpsEquallyDivideBundleSplitOption(this, bundle, boundaries,
topicStatsMap,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,24 @@ public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throw
}
}

@Test
public void testNamespaceSplitBundleWithFlowOrQpsAlgorithmUsesRequestedAlgorithm() throws Exception {
final String namespace = "prop-xyz/flow-or-qps-split";
final String topicName = "persistent://" + namespace + "/topic-1";
admin.namespaces().createNamespace(namespace, 1);
admin.topics().createNonPartitionedTopic(topicName);
admin.lookups().lookupTopic(topicName);

try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true,
NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE);
} catch (Exception e) {
fail("split bundle with flow_or_qps_equally_divide shouldn't have thrown exception", e);
}

assertEquals(admin.namespaces().getBundles(namespace).getNumBundles(), 1);
}

@Test
public void testNamespacesGetTopicHashPositions() throws Exception {
// Force to create a namespace with only one bundle and create a topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -1023,6 +1024,21 @@ public void testSplitBundleWithSpecificPositionAdminAPI() throws Exception {
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
}

@Test(timeOut = 30 * 1000)
public void testSplitBundleWithFlowOrQpsAdminAPINoValidBoundary() throws Exception {
String namespace = "public/test-split-with-flow-or-qps";
String topic = "persistent://" + namespace + "/test-split-with-flow-or-qps";
admin.namespaces().createNamespace(namespace, 1);
admin.topics().createNonPartitionedTopic(topic);
admin.lookups().lookupTopic(topic);

admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true,
NamespaceBundleSplitAlgorithm.FLOW_OR_QPS_EQUALLY_DIVIDE);

assertEquals(admin.namespaces().getBundles(namespace).getNumBundles(), 1);
}

@Test(timeOut = 30 * 1000)
public void testDeleteNamespaceBundle() throws Exception {
final String namespace = "public/testDeleteNamespaceBundle";
Expand Down
Loading