Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ public class SinkConfig {
private String transformFunction;
private String transformFunctionClassName;
private String transformFunctionConfig;
private String logTopic;
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ public class SourceConfig {
private BatchSourceConfig batchSourceConfig;
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;
private String logTopic;
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--transform-function-config", description = "Configuration of the transform function "
+ "applied before the Sink")
protected String transformFunctionConfig;
@Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;

protected SinkConfig sinkConfig;

Expand Down Expand Up @@ -605,6 +607,9 @@ void processArguments() throws Exception {
if (transformFunctionConfig != null) {
sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
}
if (null != logTopic) {
sinkConfig.setLogTopic(logTopic);
}

// check if configs are valid
validateSinkConfigs(sinkConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
@Parameter(names = "--secrets", description = "The map of secretName to an object that encapsulates "
+ "how the secret is fetched by the underlying secrets provider")
protected String secretsString;
@Parameter(names = "--log-topic", description = "The topic to which the logs of a Pulsar Sink are produced")
protected String logTopic;

protected SourceConfig sourceConfig;

Expand Down Expand Up @@ -500,6 +502,9 @@ void processArguments() throws Exception {
}
sourceConfig.setSecrets(secretsMap);
}
if (null != logTopic) {
sourceConfig.setLogTopic(logTopic);
}

// check if source configs are valid
validateSourceConfigs(sourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
if (sinkConfig.getName() != null) {
functionDetailsBuilder.setName(sinkConfig.getName());
}
if (sinkConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sinkConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
Expand Down Expand Up @@ -321,6 +324,9 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
sinkConfig.setRetainOrdering(false);
sinkConfig.setRetainKeyOrdering(false);
}
if (!isEmpty(functionDetails.getLogTopic())) {
sinkConfig.setLogTopic(functionDetails.getLogTopic());
}

sinkConfig.setProcessingGuarantees(convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));

Expand Down Expand Up @@ -426,6 +432,12 @@ public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConf
throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
}
}
if (!isEmpty(sinkConfig.getLogTopic())) {
if (!TopicName.isValid(sinkConfig.getLogTopic())) {
throw new IllegalArgumentException(
String.format("LogTopic topic %s is invalid", sinkConfig.getLogTopic()));
}
}

if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Sink parallelism must be a positive number");
Expand Down Expand Up @@ -613,6 +625,9 @@ public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig ne
if (mergedConfig.getInputSpecs() == null) {
mergedConfig.setInputSpecs(new HashMap<>());
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}

if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
if (sourceConfig.getName() != null) {
functionDetailsBuilder.setName(sourceConfig.getName());
}
if (sourceConfig.getLogTopic() != null) {
functionDetailsBuilder.setLogTopic(sourceConfig.getLogTopic());
}
functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
if (sourceConfig.getParallelism() != null) {
functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
Expand Down Expand Up @@ -274,6 +277,9 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
producerConfig.setCompressionType(convertFromFunctionDetailsCompressionType(spec.getCompressionType()));
sourceConfig.setProducerConfig(producerConfig);
}
if (!isEmpty(functionDetails.getLogTopic())) {
sourceConfig.setLogTopic(functionDetails.getLogTopic());
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
Expand Down Expand Up @@ -308,6 +314,12 @@ public static ExtractedSourceDetails validateAndExtractDetails(SourceConfig sour
if (!isEmpty(sourceConfig.getTopicName()) && !TopicName.isValid(sourceConfig.getTopicName())) {
throw new IllegalArgumentException("Topic name is invalid");
}
if (!isEmpty(sourceConfig.getLogTopic())) {
if (!TopicName.isValid(sourceConfig.getLogTopic())) {
throw new IllegalArgumentException(
String.format("LogTopic topic %s is invalid", sourceConfig.getLogTopic()));
}
}
if (sourceConfig.getParallelism() != null && sourceConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Source parallelism must be a positive number");
}
Expand Down Expand Up @@ -434,6 +446,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
if (newConfig.getSecrets() != null) {
mergedConfig.setSecrets(newConfig.getSecrets());
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public void testConvertBackFidelity() throws IOException {

sinkConfig.setTransformFunction("builtin://transform");
sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
sinkConfig.setLogTopic("log-topic");

Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null, null));
assertEquals(Function.SubscriptionType.SHARED, functionDetails.getSource().getSubscriptionType());
Expand Down Expand Up @@ -522,6 +523,22 @@ public void testMergeDifferentTransformFunctionConfig() {
);
}

@Test
public void testMergeDifferentLogTopic() {
SinkConfig sinkConfig = createSinkConfig();
SinkConfig newSinkConfig = createUpdatedSinkConfig("logTopic", "Different");
SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, newSinkConfig);
assertEquals(
mergedConfig.getLogTopic(),
"Different"
);
mergedConfig.setLogTopic(sinkConfig.getLogTopic());
assertEquals(
new Gson().toJson(sinkConfig),
new Gson().toJson(mergedConfig)
);
}

@Test
public void testValidateConfig() {
SinkConfig sinkConfig = createSinkConfig();
Expand Down Expand Up @@ -559,6 +576,7 @@ private SinkConfig createSinkConfig() {
sinkConfig.setTransformFunction("builtin://transform");
sinkConfig.setTransformFunctionClassName("Transform");
sinkConfig.setTransformFunctionConfig("{\"key\": \"value\"}");
sinkConfig.setLogTopic("log-topic");
return sinkConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,22 @@ public void testMergeDifferentProducerConfig() {
);
}

@Test
public void testMergeDifferentLogTopic() {
SourceConfig sourceConfig = createSourceConfig();
SourceConfig newSourceConfig = createUpdatedSourceConfig("logTopic", "Different");
SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(sourceConfig, newSourceConfig);
assertEquals(
mergedConfig.getLogTopic(),
"Different"
);
mergedConfig.setLogTopic(sourceConfig.getLogTopic());
assertEquals(
new Gson().toJson(sourceConfig),
new Gson().toJson(mergedConfig)
);
}

@Test
public void testValidateConfig() {
SourceConfig sourceConfig = createSourceConfig();
Expand Down Expand Up @@ -399,6 +415,7 @@ private SourceConfig createSourceConfig() {
sourceConfig.setProducerConfig(producerConfig);

sourceConfig.setConfigs(configs);
sourceConfig.setLogTopic("log-topic");
return sourceConfig;
}

Expand Down