-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker]Part-3 of PIP-433: always allow replicator to register a new compatible schema #25461
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[improve][broker]Part-3 of PIP-433: always allow replicator to register a new compatible schema #25461
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1808,8 +1808,10 @@ protected void handleProducer(final CommandProducer cmdProducer) { | |
| } | ||
|
|
||
| disableTcpNoDelayIfNeeded(topicName.toString(), producerName); | ||
|
|
||
| CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema); | ||
| boolean isReplicatorProducer = Producer.isRemoteOrShadow(producerName, | ||
| getBrokerService().getPulsar().getConfig().getReplicatorPrefix()); | ||
| CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema, | ||
| isReplicatorProducer); | ||
|
|
||
| schemaVersionFuture.exceptionallyAsync(exception -> { | ||
| if (producerFuture.completeExceptionally(exception)) { | ||
|
|
@@ -2934,7 +2936,13 @@ protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCrea | |
| service.getTopicIfExists(topicName).thenAccept(topicOpt -> { | ||
| if (topicOpt.isPresent()) { | ||
| Topic topic = topicOpt.get(); | ||
| CompletableFuture<SchemaVersion> schemaVersionFuture = tryAddSchema(topic, schema); | ||
| boolean isReplicatorProducer = false; | ||
| if (commandGetOrCreateSchema.hasProducerName()) { | ||
| isReplicatorProducer = Producer.isRemoteOrShadow(commandGetOrCreateSchema.getProducerName(), | ||
| getBrokerService().getPulsar().getConfig().getReplicatorPrefix()); | ||
| } | ||
| CompletableFuture<SchemaVersion> schemaVersionFuture = | ||
| tryAddSchema(topic, schema, isReplicatorProducer); | ||
|
Comment on lines
+2939
to
+2945
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will it be a security issue that user can just set producer name to pulsar.repl.* to get rid of the schema upload control? if user disabled is_allow_auto_update_schema
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it will. |
||
| schemaVersionFuture.exceptionally(ex -> { | ||
| ServerError errorCode = BrokerServiceException.getClientErrorCode(ex); | ||
| String message = ex.getMessage(); | ||
|
|
@@ -3419,9 +3427,10 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { | |
| }); | ||
| } | ||
|
|
||
| private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) { | ||
| private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema, | ||
| boolean isReplicatorProducer) { | ||
| if (schema != null) { | ||
| return topic.addSchema(schema); | ||
| return topic.addSchema(schema, isReplicatorProducer); | ||
| } else { | ||
| return topic.hasSchema().thenCompose((hasSchema) -> { | ||
| log.debug() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed isAllowAutoUpdateSchemaWithReplicator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has been modified at the line-2583