Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,19 @@ private static Set<String> getDeprecatedFieldsFromConfigClass(final Class<?> con

private static String retrieveDbzFieldWithReflection(final java.lang.reflect.Field reflectionField) {
try {
return ((Field) reflectionField.get(null)).name();
Object object = reflectionField.get(null);
if (object instanceof Field field) {
return field.name();
} else if (object instanceof String fieldString) {
String removedDotAndCapitalize = Stream
.of(fieldString.split("."))
.map(field -> StringUtils.capitalize(field))
.collect(Collectors.joining(""));
return StringUtils.uncapitalize(removedDotAndCapitalize);
} else {
throw new IllegalArgumentException(
"Error occurred in field : " + reflectionField.getName() + " retrieved value is " + object);
}
} catch (IllegalAccessException e) {
throw new IllegalArgumentException("Error occurred in field : " + reflectionField.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "snapshotIncludeCollectionList": getOrCreateConfiguration(target).setSnapshotIncludeCollectionList(property(camelContext, java.lang.String.class, value)); return true;
case "snapshotlocktimeoutms":
case "snapshotLockTimeoutMs": getOrCreateConfiguration(target).setSnapshotLockTimeoutMs(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
case "snapshotmaxthreadsmultiplier":
case "snapshotMaxThreadsMultiplier": getOrCreateConfiguration(target).setSnapshotMaxThreadsMultiplier(property(camelContext, int.class, value)); return true;
case "snapshotmode":
case "snapshotMode": getOrCreateConfiguration(target).setSnapshotMode(property(camelContext, java.lang.String.class, value)); return true;
case "snapshotmodeconfigurationbasedsnapshotdata":
Expand Down Expand Up @@ -367,6 +369,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "snapshotIncludeCollectionList": return java.lang.String.class;
case "snapshotlocktimeoutms":
case "snapshotLockTimeoutMs": return long.class;
case "snapshotmaxthreadsmultiplier":
case "snapshotMaxThreadsMultiplier": return int.class;
case "snapshotmode":
case "snapshotMode": return java.lang.String.class;
case "snapshotmodeconfigurationbasedsnapshotdata":
Expand Down Expand Up @@ -559,6 +563,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "snapshotIncludeCollectionList": return getOrCreateConfiguration(target).getSnapshotIncludeCollectionList();
case "snapshotlocktimeoutms":
case "snapshotLockTimeoutMs": return getOrCreateConfiguration(target).getSnapshotLockTimeoutMs();
case "snapshotmaxthreadsmultiplier":
case "snapshotMaxThreadsMultiplier": return getOrCreateConfiguration(target).getSnapshotMaxThreadsMultiplier();
case "snapshotmode":
case "snapshotMode": return getOrCreateConfiguration(target).getSnapshotMode();
case "snapshotmodeconfigurationbasedsnapshotdata":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "snapshotIncludeCollectionList": target.getConfiguration().setSnapshotIncludeCollectionList(property(camelContext, java.lang.String.class, value)); return true;
case "snapshotlocktimeoutms":
case "snapshotLockTimeoutMs": target.getConfiguration().setSnapshotLockTimeoutMs(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
case "snapshotmaxthreadsmultiplier":
case "snapshotMaxThreadsMultiplier": target.getConfiguration().setSnapshotMaxThreadsMultiplier(property(camelContext, int.class, value)); return true;
case "snapshotmode":
case "snapshotMode": target.getConfiguration().setSnapshotMode(property(camelContext, java.lang.String.class, value)); return true;
case "snapshotmodeconfigurationbasedsnapshotdata":
Expand Down Expand Up @@ -362,6 +364,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "snapshotIncludeCollectionList": return java.lang.String.class;
case "snapshotlocktimeoutms":
case "snapshotLockTimeoutMs": return long.class;
case "snapshotmaxthreadsmultiplier":
case "snapshotMaxThreadsMultiplier": return int.class;
case "snapshotmode":
case "snapshotMode": return java.lang.String.class;
case "snapshotmodeconfigurationbasedsnapshotdata":
Expand Down Expand Up @@ -555,6 +559,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "snapshotIncludeCollectionList": return target.getConfiguration().getSnapshotIncludeCollectionList();
case "snapshotlocktimeoutms":
case "snapshotLockTimeoutMs": return target.getConfiguration().getSnapshotLockTimeoutMs();
case "snapshotmaxthreadsmultiplier":
case "snapshotMaxThreadsMultiplier": return target.getConfiguration().getSnapshotMaxThreadsMultiplier();
case "snapshotmode":
case "snapshotMode": return target.getConfiguration().getSnapshotMode();
case "snapshotmodeconfigurationbasedsnapshotdata":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DebeziumDb2EndpointUriFactory extends org.apache.camel.support.comp
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(94);
Set<String> props = new HashSet<>(95);
props.add("additionalProperties");
props.add("bridgeErrorHandler");
props.add("cdcChangeTablesSchema");
Expand Down Expand Up @@ -99,6 +99,7 @@ public class DebeziumDb2EndpointUriFactory extends org.apache.camel.support.comp
props.add("snapshotFetchSize");
props.add("snapshotIncludeCollectionList");
props.add("snapshotLockTimeoutMs");
props.add("snapshotMaxThreadsMultiplier");
props.add("snapshotMode");
props.add("snapshotModeConfigurationBasedSnapshotData");
props.add("snapshotModeConfigurationBasedSnapshotOnDataError");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class Db2ConnectorEmbeddedDebeziumConfiguration
private String heartbeatTopicsPrefix = "__debezium-heartbeat";
@UriParam(label = LABEL_NAME)
private int snapshotFetchSize;
@UriParam(label = LABEL_NAME, defaultValue = "1")
private int snapshotMaxThreadsMultiplier = 1;
@UriParam(label = LABEL_NAME)
private String openlineageIntegrationJobTags;
@UriParam(label = LABEL_NAME, defaultValue = "10s", javaType = "java.time.Duration")
Expand Down Expand Up @@ -363,6 +365,22 @@ public int getSnapshotFetchSize() {
return snapshotFetchSize;
}

/**
* The factor used to scale the number of snapshot chunks per table. The
* default behavior is to take 'row_count/snapshot.max.threads' to compute
* the number of rows per chunks. This may not be ideal for larger tables,
* and using the multiplier, the formula is adjusted to increase the number
* of chunks by using 'row_count/(snapshot.max.threads *
* snapshot.max.threads.multiplier).
*/
public void setSnapshotMaxThreadsMultiplier(int snapshotMaxThreadsMultiplier) {
this.snapshotMaxThreadsMultiplier = snapshotMaxThreadsMultiplier;
}

public int getSnapshotMaxThreadsMultiplier() {
return snapshotMaxThreadsMultiplier;
}

/**
* The job's tags emitted by Debezium. A comma-separated list of key-value
* pairs.For example: k1=v1,k2=v2
Expand Down Expand Up @@ -1240,6 +1258,7 @@ protected Configuration createConnectorConfiguration() {
addPropertyIfNotNull(configBuilder, "converters", converters);
addPropertyIfNotNull(configBuilder, "heartbeat.topics.prefix", heartbeatTopicsPrefix);
addPropertyIfNotNull(configBuilder, "snapshot.fetch.size", snapshotFetchSize);
addPropertyIfNotNull(configBuilder, "snapshot.max.threads.multiplier", snapshotMaxThreadsMultiplier);
addPropertyIfNotNull(configBuilder, "openlineage.integration.job.tags", openlineageIntegrationJobTags);
addPropertyIfNotNull(configBuilder, "snapshot.lock.timeout.ms", snapshotLockTimeoutMs);
addPropertyIfNotNull(configBuilder, "cdc.change.tables.schema", cdcChangeTablesSchema);
Expand Down
Loading
Loading