Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,33 +166,36 @@ private static Optional<EnsemblePlacementPolicyConfig> getEnsemblePlacementPolic
return Optional.empty();
}

private static Pair<Set<String>, Set<String>> getIsolationGroup(
@VisibleForTesting
Pair<Set<String>, Set<String>> getIsolationGroup(
EnsemblePlacementPolicyConfig ensemblePlacementPolicyConfig) {
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>();
String className = IsolatedBookieEnsemblePlacementPolicy.class.getName();
if (ensemblePlacementPolicyConfig.getPolicyClass().getName().equals(className)) {
// Retain compatibility with ZkIsolatedBookieEnsemblePlacementPolicy
if (IsolatedBookieEnsemblePlacementPolicy.class.isAssignableFrom(ensemblePlacementPolicyConfig.getPolicyClass())) {
MutablePair<Set<String>, Set<String>> pair = new MutablePair<>(Collections.emptySet(), Collections.emptySet());
Map<String, Object> properties = ensemblePlacementPolicyConfig.getProperties();
String primaryIsolationGroupString = ConfigurationStringUtil
.castToString(properties.getOrDefault(ISOLATION_BOOKIE_GROUPS, ""));
String secondaryIsolationGroupString = ConfigurationStringUtil
.castToString(properties.getOrDefault(SECONDARY_ISOLATION_BOOKIE_GROUPS, ""));
if (!primaryIsolationGroupString.isEmpty()) {
pair.setLeft(Sets.newHashSet(primaryIsolationGroupString.split(",")));
} else {
pair.setLeft(Collections.emptySet());
}
if (!secondaryIsolationGroupString.isEmpty()) {
pair.setRight(Sets.newHashSet(secondaryIsolationGroupString.split(",")));
} else {
pair.setRight(Collections.emptySet());
}
return pair;
} else {
log.info()
.attr("policyClass", ensemblePlacementPolicyConfig.getPolicyClass().getName())
.log("The ensemble placement policy class is not compatible with "
+ "IsolatedBookieEnsemblePlacementPolicy, fallback to use defaultIsolationGroups");
return defaultIsolationGroups;
}
return pair;
}

@VisibleForTesting
Set<BookieId> getExcludedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> excludedBookies = new HashSet<>();
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return excludedBookies;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,14 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
Expand All @@ -58,6 +60,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -839,6 +842,139 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
assertTrue(blacklist.isEmpty());
}

/**
* Regression test for the NPE reported in the stack trace below. When custom metadata carries an
* {@link EnsemblePlacementPolicyConfig} whose policy class does NOT match
* {@link IsolatedBookieEnsemblePlacementPolicy}, the old {@code getIsolationGroup()} returned a
* {@code MutablePair} with {@code null} left/right, which caused a {@link NullPointerException} in
* {@code getExcludedBookiesWithIsolationGroups} when {@code getLeft().contains(...)} was called.
*
* <pre>
* java.lang.NullPointerException: Cannot invoke "java.util.Set.contains(Object)"
* because the return value of "org.apache.commons.lang3.tuple.Pair.getLeft()" is null
* at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookiesWithIsolationGroups(...)
* at IsolatedBookieEnsemblePlacementPolicy.getExcludedBookies(...)
* at IsolatedBookieEnsemblePlacementPolicy.replaceBookie(...)
* </pre>
*/
@Test
public void testReplaceBookieWithNonMatchingPolicyClassShouldNotThrowNPE() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> group1 = new HashMap<>();
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
group1.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());
group1.put(BOOKIE3, BookieInfo.builder().rack("rack0").build());
group1.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
bookieMapping.put("group1", group1);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "group1");
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

// Use a policy class that does NOT match IsolatedBookieEnsemblePlacementPolicy.
// In the old code this caused getIsolationGroup() to return a MutablePair with null left/right,
// triggering NPE at the getLeft().contains() call in getExcludedBookiesWithIsolationGroups.
EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
RackawareEnsemblePlacementPolicy.class, Collections.emptyMap());
Map<String, byte[]> customMetadata = new HashMap<>();
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());

BookieId bookie1Id = new BookieSocketAddress(BOOKIE1).toBookieId();
BookieId bookie2Id = new BookieSocketAddress(BOOKIE2).toBookieId();

// Must not throw NullPointerException; BKNotEnoughBookiesException is acceptable.
isolationPolicy.replaceBookie(2, 2, 2, customMetadata,
Arrays.asList(bookie1Id, bookie2Id), bookie2Id, null);
}

/**
* Verifies that {@link IsolatedBookieEnsemblePlacementPolicy#getIsolationGroup} treats
* {@link ZkIsolatedBookieEnsemblePlacementPolicy} (a subclass) exactly like
* {@link IsolatedBookieEnsemblePlacementPolicy} itself when reading isolation groups from
* {@link EnsemblePlacementPolicyConfig} properties.
*
* <p>Legacy Pulsar clusters may have persisted {@code EnsemblePlacementPolicyConfig} entries whose
* {@code policyClass} field is set to {@code ZkIsolatedBookieEnsemblePlacementPolicy}. The
* {@code isAssignableFrom} check in {@code getIsolationGroup} must recognise this subclass so that
* the isolation groups are read from the stored properties rather than falling back to the
* policy-level defaults.
*/
@Test
public void testGetIsolationGroupWithZkCompatiblePolicyClass() throws Exception {
// Group1 → default isolation group configured on the policy.
// Group2 → isolation group carried inside the custom metadata (ZkIsolated class).
final String defaultGroup = "Group1";
final String customGroup = "Group2";

Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> group1 = new HashMap<>();
group1.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
group1.put(BOOKIE2, BookieInfo.builder().rack("rack0").build());
Map<String, BookieInfo> group2 = new HashMap<>();
group2.put(BOOKIE3, BookieInfo.builder().rack("rack1").build());
group2.put(BOOKIE4, BookieInfo.builder().rack("rack1").build());
bookieMapping.put(defaultGroup, group1);
bookieMapping.put(customGroup, group2);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, defaultGroup);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

// --- unit-level: getIsolationGroup should parse properties, not fall back to defaults ---
Map<String, Object> props = new HashMap<>();
props.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customGroup);
props.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "secondaryGroup");
EnsemblePlacementPolicyConfig zkConfig = new EnsemblePlacementPolicyConfig(
ZkIsolatedBookieEnsemblePlacementPolicy.class, props);

Pair<Set<String>, Set<String>> groups = isolationPolicy.getIsolationGroup(zkConfig);
assertEquals(groups.getLeft(), Sets.newHashSet(customGroup),
"primary group must be read from ZkIsolated config properties");
assertEquals(groups.getRight(), Sets.newHashSet("secondaryGroup"),
"secondary group must be read from ZkIsolated config properties");

// --- integration-level: newEnsemble must select bookies from the ZkIsolated config group ---
Map<String, Object> placementPolicyProperties = new HashMap<>();
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, customGroup);
placementPolicyProperties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, "");
EnsemblePlacementPolicyConfig policyConfig = new EnsemblePlacementPolicyConfig(
ZkIsolatedBookieEnsemblePlacementPolicy.class, placementPolicyProperties);
Map<String, byte[]> customMetadata = new HashMap<>();
customMetadata.put(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG, policyConfig.encode());

Set<BookieId> bookieIdGroup2 = new HashSet<>();
bookieIdGroup2.add(new BookieSocketAddress(BOOKIE3).toBookieId());
bookieIdGroup2.add(new BookieSocketAddress(BOOKIE4).toBookieId());

List<BookieId> ensemble = isolationPolicy
.newEnsemble(2, 2, 2, customMetadata, new HashSet<>()).getResult();
assertTrue(bookieIdGroup2.containsAll(ensemble),
"ensemble should come from " + customGroup + " (ZkIsolated config), got " + ensemble);

// Sanity-check: without custom metadata the default group1 bookies are chosen.
Set<BookieId> bookieIdGroup1 = new HashSet<>();
bookieIdGroup1.add(new BookieSocketAddress(BOOKIE1).toBookieId());
bookieIdGroup1.add(new BookieSocketAddress(BOOKIE2).toBookieId());
List<BookieId> defaultEnsemble = isolationPolicy
.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
assertTrue(bookieIdGroup1.containsAll(defaultEnsemble),
"default ensemble should come from " + defaultGroup + ", got " + defaultEnsemble);
}

// The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into
// the metadata store, the cache needs some time to receive the notification and update accordingly.
private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) {
Expand Down
Loading