Skip to content

[fix][broker]The partitions of the topic was wrongly created even though this cluster was not allowed to access it#25443

Open
poorbarcode wants to merge 3 commits intoapache:masterfrom
poorbarcode:fix/create_missing_partitions_without_allowed_clusters
Open

[fix][broker]The partitions of the topic was wrongly created even though this cluster was not allowed to access it#25443
poorbarcode wants to merge 3 commits intoapache:masterfrom
poorbarcode:fix/create_missing_partitions_without_allowed_clusters

Conversation

@poorbarcode
Copy link
Copy Markdown
Contributor

Motivation

Env:

  • cluster-1 and cluster-2 share configuration metadata store
  • create a partitioned topic public/default/tp1
    • The ZK node /admin/partitioned-topics/{topic} will be created on the shared configuration metadata store.
    • The topic will enable a binary way replication between cluster-1 and cluster-2
  • Remove cluster-2 from the topic-level replication policies
    • The partition public/default/tp1-partition-0 will be deleted automatically.

Issue: call pulsar-admin topics create-missed-partitions <topic>, the partition public/default/tp1-partition-0 was loaded up again.

Modifications

Fix the issue

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

…ugh this cluster was not allowed to access it
@poorbarcode poorbarcode added this to the 5.0.0 milestone Mar 31, 2026
@poorbarcode poorbarcode self-assigned this Mar 31, 2026
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Mar 31, 2026
Comment on lines +2019 to +2053
public CompletableFuture<Boolean> isAllowedCurrentClusterAccess(@NonNull TopicName topicName) {
final String cluster = getPulsar().getConfig().getClusterName();
return getCombinedTopicPolicies(topicName).thenApply(triple -> {
Optional<TopicPolicies> topicP = triple.getRight();
Optional<TopicPolicies> globalTopicP = triple.getMiddle();
Optional<Policies> nsPolicies = triple.getLeft();
// Disabled a cluster for a namespace manually.
if (nsPolicies.isPresent() && !nsPolicies.get().allowed_clusters.isEmpty()
&& !nsPolicies.get().allowed_clusters.contains(cluster)) {
return false;
}
// Manually enabled topic-level replication, which can skip to set a namespace-level replication.
if (topicP.isPresent() && CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) {
if (topicP.get().getReplicationClusters().contains(cluster)) {
return true;
} else {
return false;
}
}
if (globalTopicP.isPresent() && CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) {
if (globalTopicP.get().getReplicationClusters().contains(cluster)) {
return true;
} else {
return false;
}
}
// No settings for replication/allowed_clusters.
if (nsPolicies.isEmpty()) {
return true;
}
// Namespace level settings.
return nsPolicies.get().replication_clusters.isEmpty()
|| nsPolicies.get().replication_clusters.contains(cluster);
});
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already have a mechanism for retrieving the applied policies for a topic? We should use it to avoid writing duplicate code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we already have a mechanism for retrieving the applied policies for a topic? We should use it to avoid writing duplicate code.

No, we don't have such a method. There are currently two methods

  • Persistent topic has a HierarchyTopicPolicies of merged policies, and this result is relied upon for inspection.
  • Check when the namespace modifies replication/allowed-cluster, see also BrokerService.isCurrentClusterAllowed(Namespace)

The newly added method has maximised the reuse of code (by merging the namespace check method BrokerService.isCurrentClusterAllowed(Namespace).

BTW, the method newly added will become the method you mentioned: a mechanism for retrieving the applied policies for a topic

@poorbarcode
Copy link
Copy Markdown
Contributor Author

/pulsarbot rerun-failure-checks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants