Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2754,13 +2754,18 @@ protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnfor
"schemaValidationEnforced");
}

protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema) {
protected void internalSetIsAllowAutoUpdateSchema(boolean isAllowAutoUpdateSchema,
Boolean isAllowAutoUpdateSchemaWithReplicator) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

mutatePolicy((policies) -> {
policies.is_allow_auto_update_schema = isAllowAutoUpdateSchema;
if (isAllowAutoUpdateSchemaWithReplicator != null) {
policies.is_allow_auto_update_schema_with_replicator =
isAllowAutoUpdateSchemaWithReplicator;
}
return policies;
}, (policies) -> policies.is_allow_auto_update_schema,
"isAllowAutoUpdateSchema");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed isAllowAutoUpdateSchemaWithReplicator?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been modified at the line-2583

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2813,10 +2813,13 @@ public void getIsAllowAutoUpdateSchema(
public void setIsAllowAutoUpdateSchema(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@QueryParam("allowAutoUpdateSchemaWithReplicator")
@ApiParam(value = "Allow replicator to auto update schema")
Boolean allowAutoUpdateSchemaWithReplicator,
@ApiParam(value = "Flag of whether to allow auto update schema", required = true)
boolean isAllowAutoUpdateSchema) {
validateNamespaceName(tenant, namespace);
internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema);
internalSetIsAllowAutoUpdateSchema(isAllowAutoUpdateSchema, allowAutoUpdateSchemaWithReplicator);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
protected volatile boolean isEncryptionRequired = false;

protected volatile Boolean isAllowAutoUpdateSchema;
@Getter
protected volatile Boolean isAllowAutoUpdateSchemaWithReplicator;

protected volatile PublishRateLimiter topicPublishRateLimiter;
protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;
Expand Down Expand Up @@ -733,14 +735,19 @@ public CompletableFuture<Boolean> hasSchema() {

@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
return addSchema(schema, false);
}

@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema, boolean isReplicatorProducer) {
if (schema == null) {
return CompletableFuture.completedFuture(SchemaVersion.Empty);
}

String id = getSchemaId();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();

if (allowAutoUpdateSchema()) {
if (allowAutoUpdateSchema(isReplicatorProducer)) {
return schemaRegistryService.putSchemaIfAbsent(id, schema, getSchemaCompatibilityStrategy());
} else {
return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
Expand All @@ -756,14 +763,19 @@ public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
}
}

private boolean allowAutoUpdateSchema() {
private boolean allowAutoUpdateSchema(boolean isReplicatorProducer) {
if (brokerService.isSystemTopic(topic)) {
return true;
}
if (isAllowAutoUpdateSchema == null) {
return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
// Allowed auto updating.
boolean allowSchemaAutoUpdate = isAllowAutoUpdateSchema == null
? brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled()
: isAllowAutoUpdateSchema;
if (allowSchemaAutoUpdate) {
return true;
}
return isAllowAutoUpdateSchema;
// Allowed replicator to update schemas.
return isReplicatorProducer && isAllowAutoUpdateSchemaWithReplicator;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1808,8 +1808,10 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}

disableTcpNoDelayIfNeeded(topicName.toString(), producerName);

CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
boolean isReplicatorProducer = Producer.isRemoteOrShadow(producerName,
getBrokerService().getPulsar().getConfig().getReplicatorPrefix());
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema,
isReplicatorProducer);

schemaVersionFuture.exceptionallyAsync(exception -> {
if (producerFuture.completeExceptionally(exception)) {
Expand Down Expand Up @@ -2934,7 +2936,13 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea
service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
if (topicOpt.isPresent()) {
Topic topic = topicOpt.get();
CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema);
boolean isReplicatorProducer = false;
if (commandGetOrCreateSchema.hasProducerName()) {
isReplicatorProducer = Producer.isRemoteOrShadow(commandGetOrCreateSchema.getProducerName(),
getBrokerService().getPulsar().getConfig().getReplicatorPrefix());
}
CompletableFuture<SchemaVersion> schemaVersionFuture =
tryAddSchema(topic, schema, isReplicatorProducer);
Comment on lines +2939 to +2945
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it be a security issue that user can just set producer name to pulsar.repl.* to get rid of the schema upload control? if user disabled is_allow_auto_update_schema

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it will.

schemaVersionFuture.exceptionally(ex -> {
ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
String message = ex.getMessage();
Expand Down Expand Up @@ -3419,9 +3427,10 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
});
}

private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema,
boolean isReplicatorProducer) {
if (schema != null) {
return topic.addSchema(schema);
return topic.addSchema(schema, isReplicatorProducer);
} else {
return topic.hasSchema().thenCompose((hasSchema) -> {
log.debug()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,14 @@ default CompletableFuture<Position> getLastDispatchablePosition() {
*/
CompletableFuture<SchemaVersion> addSchema(SchemaData schema);

/**
* Add a schema to the topic, with an optional override for replication schema auto-update.
* The default implementation preserves existing behavior.
*/
default CompletableFuture<SchemaVersion> addSchema(SchemaData schema, boolean isReplicatorProducer) {
return addSchema(schema);
}

/**
* Delete the schema if this topic has a schema defined for it.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ public CompletableFuture<Void> initialize() {
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
isAllowAutoUpdateSchemaWithReplicator =
policies.is_allow_auto_update_schema_with_replicator;
}
updatePublishRateLimiter();
updateResourceGroupLimiter(policies);
Expand Down Expand Up @@ -1157,6 +1159,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {

isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
isAllowAutoUpdateSchemaWithReplicator = data.is_allow_auto_update_schema_with_replicator;

List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ public CompletableFuture<Void> initialize() {
this.isEncryptionRequired = policies.encryption_required;

isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
isAllowAutoUpdateSchemaWithReplicator = policies.is_allow_auto_update_schema_with_replicator;
}, getOrderedExecutor())
.thenCompose(ignore -> initTopicPolicy())
.thenCompose(ignore -> removeOrphanReplicationCursors())
Expand Down Expand Up @@ -3803,6 +3804,7 @@ public CompletableFuture<Void> onPoliciesUpdate(@NonNull Policies data) {
checkReplicatedSubscriptionControllerState();
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
isAllowAutoUpdateSchemaWithReplicator = data.is_allow_auto_update_schema_with_replicator;

// Apply policies for components.
List<CompletableFuture<Void>> applyPolicyTasks = applyUpdatedTopicPolicies();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.changed;
import static org.apache.pulsar.common.policies.data.NamespaceIsolationPolicyUnloadScope.none;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -3305,6 +3306,8 @@ public void testMaxProducersPerTopicUnlimited() throws Exception {
}
}

static final ThreadLocal<Boolean> COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY = new ThreadLocal<>();

private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
final var topics = pulsar.getBrokerService().getTopics();
AbstractTopic topic = (AbstractTopic) topics.get(topicName).join().get();
Expand All @@ -3314,9 +3317,23 @@ private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
counter.incrementAndGet();
return invocation.callRealMethod();
COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.set(true);
try {
return invocation.callRealMethod();
} finally {
COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.set(false);
}
}
}).when(spyTopic).addSchema(any(SchemaData.class));
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
if (!COUNTER_AVOID_COUNTING_ADD_SCHEMA_REPEATEDLY.get()) {
counter.incrementAndGet();
}
return invocation.callRealMethod();
}
}).when(spyTopic).addSchema(any(SchemaData.class), anyBoolean());
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import lombok.CustomLog;
import org.apache.avro.reflect.AvroAlias;
import org.apache.avro.reflect.AvroDefault;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -280,4 +283,68 @@ public void testDisabledV1() throws Exception {
testAutoUpdateDisabled("prop-xyz/ns1", "persistent://prop-xyz/ns1/disabled");
testAutoUpdateDisabled("prop-xyz/ns2", "non-persistent://prop-xyz/ns2/disabled-np");
}

@Test(timeOut = 60_000)
@SuppressWarnings("deprecation")
public void testIsAllowAutoUpdateSchemaWithReplicator() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("prop-xyz/ns");
admin.namespaces().createNamespace(namespace);
final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();

// By default, it is true.
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(admin.namespaces().getPolicies(namespace).is_allow_auto_update_schema_with_replicator);
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
});

admin.namespaces().setIsAllowAutoUpdateSchema(
namespace, true, false);
Awaitility.await().untilAsserted(() -> {
// namespace level.
Policies policies = admin.namespaces().getPolicies(namespace);
Assert.assertTrue(policies.is_allow_auto_update_schema);
Assert.assertFalse(policies.is_allow_auto_update_schema_with_replicator);
// topic level.
Assert.assertFalse(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
});

admin.namespaces().setIsAllowAutoUpdateSchema(
namespace, false, false);
Awaitility.await().untilAsserted(() -> {
// namespace level.
Policies policies = admin.namespaces().getPolicies(namespace);
Assert.assertFalse(policies.is_allow_auto_update_schema);
Assert.assertFalse(policies.is_allow_auto_update_schema_with_replicator);
// topic level.
Assert.assertFalse(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
});

admin.namespaces().setIsAllowAutoUpdateSchema(
namespace, true, true);
Awaitility.await().untilAsserted(() -> {
// namespace level.
Policies policies = admin.namespaces().getPolicies(namespace);
Assert.assertTrue(policies.is_allow_auto_update_schema);
Assert.assertTrue(policies.is_allow_auto_update_schema_with_replicator);
// topic level.
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
});

admin.namespaces().setIsAllowAutoUpdateSchema(
namespace, false, true);
Awaitility.await().untilAsserted(() -> {
// namespace level.
Policies policies = admin.namespaces().getPolicies(namespace);
Assert.assertFalse(policies.is_allow_auto_update_schema);
Assert.assertTrue(policies.is_allow_auto_update_schema_with_replicator);
// topic level.
Assert.assertTrue(persistentTopic.getIsAllowAutoUpdateSchemaWithReplicator());
});

// cleanup.
admin.topics().delete(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2141,7 +2141,7 @@ public void testIsAllowAutoUpdateSchema() {
execFlag = setAuthorizationPolicyOperationChecker(subject,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY, PolicyOperation.WRITE);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true));
() -> subAdmin.namespaces().setIsAllowAutoUpdateSchema(namespace, true, true));
Assert.assertTrue(execFlag.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void testNegativeAcksDeleteFromUnackedTracker() throws Exception {
*/
@Test
public void testNegativeAcksWithBatch() throws Exception {
admin.namespaces().setIsAllowAutoUpdateSchema(getNamespace(), true);
admin.namespaces().setIsAllowAutoUpdateSchema(getNamespace(), true, true);
String topic = newTopicName();

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
admin.schemas().createSchema(fqtn, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());

admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false, true);
ProducerBuilder<Schemas.PersonTwo> producerThreeBuilder = pulsarClient
.newProducer(Schema.AVRO(SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
Expand All @@ -359,7 +359,7 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili
Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
}

admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true, true);
ConsumerBuilder<Schemas.PersonTwo> comsumerBuilder = pulsarClient.newConsumer(Schema.AVRO(
SchemaDefinition.<Schemas.PersonTwo>builder().withAlwaysAllowNull
(false).withSupportSchemaVersioning(true).
Expand All @@ -382,7 +382,7 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili
producer.close();
consumerTwo.close();

admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false, true);

producer = producerThreeBuilder.create();
consumerTwo = comsumerBuilder.subscribe();
Expand Down Expand Up @@ -427,7 +427,7 @@ public void testSchemaComparison() throws Exception {
SchemaInfo schemaInfo = SchemaInfo.builder().type(SchemaType.AVRO).schema(changeSchemaBytes).build();
admin.schemas().createSchema(fqtn, schemaInfo);

admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false, true);
ProducerBuilder<Schemas.PersonOne> producerOneBuilder = pulsarClient
.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(fqtn);
Expand Down
Loading
Loading