Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ jobs:
distribution: 'temurin'
cache: 'maven'
- name: add dependencies
run: |
wget http://nexus.dev.dtstack.cn/nexus/content/repositories/dtstack-release/com/esen/jdbc/gbase/8.3.81.53/gbase-8.3.81.53.jar
wget https://cdn.gbase.cn/products/27/czrl6z38BvTfEQS4uyQcS/gbasedbtjdbc_3.5.1.jar
./mvnw install:install-file -DgroupId=com.esen.jdbc -DartifactId=gbase -Dversion=8.3.81.53 -Dpackaging=jar -Dfile=./gbase-8.3.81.53.jar
./mvnw install:install-file -DgroupId=com.gbasedbt.jdbc.Driver -DartifactId=gbasedbt -Dversion=3.5.1_1_d0c87a -Dpackaging=jar -Dfile=./gbasedbtjdbc_3.5.1.jar
- name: build project
run: |
./mvnw clean package -Dmaven.test.skip --no-snapshot-updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ClickhouseRawTypeConverter {
public static DataType apply(TypeConfig type) {
switch (type.getType().toUpperCase(Locale.ENGLISH)) {
case "BOOLEAN":
case "BOOL":
return DataTypes.BOOLEAN();
case "TINYINT":
case "INT8":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,4 +525,18 @@ default Function<Tuple3<String, Integer, Integer>, TypeConfig> typeBuilder() {
default TableIdentify getTableIdentify(String confSchema, String confTable) {
return new TableIdentify(null, confSchema, confTable, this::quoteIdentifier, false);
}

/**
* Add additional parameters to jdbc properties,for reader only.
*
* @param jdbcConf jdbc datasource configuration
*/
default void putReaderExtParam(JdbcConfig jdbcConf) {}

/**
* Add additional parameters to jdbc properties,for writer only.
*
* @param jdbcConf jdbc datasource configuration
*/
default void putWriterExtParam(JdbcConfig jdbcConf) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public JdbcSinkFactory(SyncConfig syncConfig, JdbcDialect jdbcDialect) {

@Override
public DataStreamSink<RowData> createSink(DataStream<RowData> dataSet) {
jdbcDialect.putWriterExtParam(jdbcConfig);
JdbcOutputFormatBuilder builder = getBuilder();
initColumnInfo();
builder.setJdbcConf(jdbcConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ protected Class<? extends JdbcConfig> getConfClass() {

@Override
public DataStream<RowData> createSource() {
jdbcDialect.putReaderExtParam(jdbcConfig);
initColumnInfo();
initRestoreConfig();
initPollingConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ protected JdbcConfig getSinkConnectionConfig(

jdbcConfig.setUniqueKey(keyFields);
resetTableInfo(jdbcConfig);
getDialect().putWriterExtParam(jdbcConfig);
return jdbcConfig;
}

Expand Down Expand Up @@ -257,6 +258,7 @@ protected JdbcConfig getSourceConnectionConfig(ReadableConfig readableConfig) {
if (StringUtils.isBlank(jdbcConfig.getCustomSql())) {
resetTableInfo(jdbcConfig);
}
getDialect().putReaderExtParam(jdbcConfig);
return jdbcConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.jdbc.conf.TableIdentify;
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter;
Expand All @@ -37,6 +38,7 @@
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -166,4 +168,28 @@ public Function<Tuple3<String, Integer, Integer>, TypeConfig> typeBuilder() {
return typeConfig;
});
}

@Override
public void putWriterExtParam(JdbcConfig jdbcConf) {
Properties properties = jdbcConf.getProperties();
if (properties == null) {
properties = new Properties();
}
properties.putIfAbsent("useCursorFetch", "true");
properties.putIfAbsent("rewriteBatchedStatements", "true");
properties.put("tinyInt1isBit", "false");
jdbcConf.setProperties(properties);
}

@Override
public void putReaderExtParam(JdbcConfig jdbcConf) {
Properties properties = jdbcConf.getProperties();
if (properties == null) {
properties = new Properties();
}
properties.putIfAbsent("useCursorFetch", "true");
properties.putIfAbsent("rewriteBatchedStatements", "true");
properties.put("tinyInt1isBit", "false");
jdbcConf.setProperties(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;

public class MysqlSinkFactory extends JdbcSinkFactory {

public MysqlSinkFactory(SyncConfig syncConfig) {
super(syncConfig, new MysqlDialect());
JdbcUtil.putExtParam(jdbcConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -42,7 +41,6 @@ public MysqlSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env)
&& jdbcConfig.getFetchSize() == 0) {
jdbcConfig.setFetchSize(1000);
}
JdbcUtil.putExtParam(jdbcConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,14 @@

package com.dtstack.chunjun.connector.mysql.table;

import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.table.JdbcDynamicTableFactory;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.ResolvedSchema;

public class MysqlDynamicTableFactory extends JdbcDynamicTableFactory {

// 默认是Mysql流式拉取
Expand All @@ -54,21 +49,6 @@ protected int getDefaultFetchSize() {
return DEFAULT_FETCH_SIZE;
}

@Override
protected JdbcConfig getSourceConnectionConfig(ReadableConfig readableConfig) {
JdbcConfig jdbcConfig = super.getSourceConnectionConfig(readableConfig);
JdbcUtil.putExtParam(jdbcConfig);
return jdbcConfig;
}

@Override
protected JdbcConfig getSinkConnectionConfig(
ReadableConfig readableConfig, ResolvedSchema schema) {
JdbcConfig jdbcConfig = super.getSinkConnectionConfig(readableConfig, schema);
JdbcUtil.putExtParam(jdbcConfig);
return jdbcConfig;
}

@Override
protected JdbcInputFormatBuilder getInputFormatBuilder() {
return new JdbcInputFormatBuilder(new JdbcInputFormat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.dtstack.chunjun.connector.jdbc.source.distribute.DistributedJdbcInputFormat;
import com.dtstack.chunjun.connector.jdbc.source.distribute.DistributedJdbcInputFormatBuilder;
import com.dtstack.chunjun.connector.jdbc.source.distribute.DistributedJdbcSourceFactory;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
import com.dtstack.chunjun.connector.mysqld.utils.MySqlDataSource;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand Down Expand Up @@ -57,7 +56,6 @@ public class MysqldSourceFactory extends DistributedJdbcSourceFactory {

public MysqldSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) {
super(syncConfig, env, new MysqlDialect());
JdbcUtil.putExtParam(jdbcConfig);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public static DataType apply(TypeConfig type) {
if (type.getType().contains("TIME ZONE")) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
} else {
return DataTypes.TIMESTAMP();
return type.toTimestampDataType(0);
}
}
throw new UnsupportedTypeException(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.dtstack.chunjun.connector.postgresql.dialect;

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
Expand All @@ -36,6 +37,7 @@
import java.sql.ResultSet;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;

public class PostgresqlDialect implements JdbcDialect {
Expand Down Expand Up @@ -160,4 +162,14 @@ public String getCopyStatement(
return String.format(
COPY_SQL_TEMPL, tableLocation, fieldsExpression, fieldDelimiter, nullVal);
}

@Override
public void putWriterExtParam(JdbcConfig jdbcConf) {
Properties properties = jdbcConf.getProperties();
if (properties == null) {
properties = new Properties();
}
properties.putIfAbsent("reWriteBatchedInserts", "true");
jdbcConf.setProperties(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.config.TypeConfig;
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.jdbc.source.JdbcInputSplit;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
Expand Down Expand Up @@ -55,6 +56,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -333,4 +335,16 @@ public Function<Tuple3<String, Integer, Integer>, TypeConfig> typeBuilder() {
return typeConfig;
});
}

@Override
public void putWriterExtParam(JdbcConfig jdbcConf) {
Properties properties = jdbcConf.getProperties();
if (properties == null) {
properties = new Properties();
}
properties.putIfAbsent("enablePrepareOnFirstPreparedStatementCall", "true");
properties.putIfAbsent("disableStatementPooling", "false");
properties.putIfAbsent("statementPoolingCacheSize", "50");
jdbcConf.setProperties(properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.dtstack.chunjun.config.SyncConfig;
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
import com.dtstack.chunjun.connector.jdbc.util.JdbcUtil;
import com.dtstack.chunjun.connector.sybase.dialect.SybaseDialect;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -36,6 +35,5 @@ public SybaseSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env
&& jdbcConfig.getFetchSize() == 0) {
jdbcConfig.setFetchSize(1000);
}
JdbcUtil.putExtParam(jdbcConfig);
}
}