Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions chunjun-connectors/chunjun-connector-oceanbase/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@
<artifactId>chunjun-connector-jdbc-base</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-oracle</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-connector-mysql</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.dtstack.chunjun.connector.oceanbase.config;

import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;

public class OceanBaseConf extends JdbcConfig {

private String oceanBaseMode = OceanBaseMode.MYSQL.name();

public String getOceanBaseMode() {
return oceanBaseMode;
}

public void setOceanBaseMode(String oceanBaseMode) {
this.oceanBaseMode = oceanBaseMode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.dtstack.chunjun.connector.oceanbase.config;

public enum OceanBaseMode {
MYSQL,
ORACLE
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.dtstack.chunjun.connector.oceanbase.converter;

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.connector.oracle.converter.BlobType;
import com.dtstack.chunjun.connector.oracle.converter.ClobType;
import com.dtstack.chunjun.connector.oracle.converter.ConvertUtil;
import com.dtstack.chunjun.connector.oracle.converter.OracleSyncConverter;
import com.dtstack.chunjun.converter.IDeserializationConverter;
import com.dtstack.chunjun.element.column.BytesColumn;
import com.dtstack.chunjun.element.column.StringColumn;
import com.dtstack.chunjun.element.column.TimestampColumn;
import com.dtstack.chunjun.element.column.ZonedTimestampColumn;

import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.ZonedTimestampType;

import com.oceanbase.jdbc.Blob;
import com.oceanbase.jdbc.Clob;
import com.oceanbase.jdbc.extend.datatype.DataTypeUtilities;
import com.oceanbase.jdbc.extend.datatype.TIMESTAMPLTZ;
import com.oceanbase.jdbc.extend.datatype.TIMESTAMPTZ;

import java.sql.Timestamp;
import java.util.TimeZone;

public class OceanbaseOracleSyncConverter extends OracleSyncConverter {

public OceanbaseOracleSyncConverter(RowType rowType, CommonConfig commonConfig) {
super(rowType, commonConfig);
}

@Override
protected IDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case VARCHAR:
if (type instanceof ClobType) {
return val -> {
Clob clob = (Clob) val;
return new StringColumn(ConvertUtil.convertClob(clob));
};
}
return val -> new StringColumn(val.toString());
case VARBINARY:
return val -> {
if (type instanceof BlobType) {
Blob blob = (Blob) val;
byte[] bytes = blob.getBytes(1, (int) blob.length());
return new BytesColumn(bytes);
} else {
return new BytesColumn((byte[]) val);
}
};
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int precision = ((TimestampType) type).getPrecision();
if (precision == 6) {
return val -> new TimestampColumn((Timestamp) val, 0); // java.sql.Timestamp
}
case TIMESTAMP_WITH_TIME_ZONE:
if (type instanceof ZonedTimestampType) {
final int zonedPrecision = ((ZonedTimestampType) type).getPrecision();
return val -> {
TIMESTAMPTZ timestamptz = (TIMESTAMPTZ) val;
Timestamp timestamp = timestamptz.timestampValue();
return new ZonedTimestampColumn(timestamp, zonedPrecision);
};
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (type instanceof LocalZonedTimestampType) {
final int localPrecision = ((LocalZonedTimestampType) type).getPrecision();
return val -> {
TIMESTAMPLTZ timestamptz = (TIMESTAMPLTZ) val;
// 重写处理12个字节情况,TIMESTAMPLTZ#toTimestamp
byte[] bytes = timestamptz.toBytes(); // 获取字节码
TimeZone timeZone;
if (bytes.length >= 14) {
// 字节数组长度足够,尝试提取时区信息
String tzStr =
DataTypeUtilities.toTimezoneStr(
bytes[12], bytes[13], "GMT", true);
timeZone = TimeZone.getTimeZone(tzStr);
} else {
// 字节数组长度不足,使用默认时区
timeZone = TimeZone.getDefault();
}
Timestamp timestamp =
new Timestamp(DataTypeUtilities.getOriginTime(bytes, timeZone));
timestamp.setNanos(DataTypeUtilities.getNanos(bytes, 7));
return new ZonedTimestampColumn(timestamp, timeZone, localPrecision);
};
}
}
return super.createInternalConverter(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public static DataType apply(TypeConfig type) {
case "DECIMAL":
case "DECIMAL UNSIGNED":
case "NUMERIC":
case "NUMBER":
return DataTypes.DECIMAL(38, 18);
case "DOUBLE":
case "DOUBLE UNSIGNED":
Expand Down Expand Up @@ -81,6 +82,7 @@ public static DataType apply(TypeConfig type) {
case "LONGTEXT":
case "ENUM":
case "SET":
case "VARCHAR2":
return DataTypes.STRING();
default:
throw new UnsupportedTypeException(type);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.dtstack.chunjun.connector.oceanbase.dialect;

import com.dtstack.chunjun.connector.mysql.converter.MysqlRawTypeConverter;
import com.dtstack.chunjun.connector.mysql.dialect.MysqlDialect;
import com.dtstack.chunjun.converter.RawTypeMapper;

import java.util.Optional;

public class OceanbaseMysqlModeDialect extends MysqlDialect {
OceanbaseDialect oceanbaseDialect = new OceanbaseDialect();

@Override
public String dialectName() {
return oceanbaseDialect.dialectName();
}

@Override
public boolean canHandle(String url) {
return oceanbaseDialect.canHandle(url);
}

@Override
public RawTypeMapper getRawTypeConverter() {
return MysqlRawTypeConverter::apply;
}

@Override
public Optional<String> defaultDriverName() {
return oceanbaseDialect.defaultDriverName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.dtstack.chunjun.connector.oceanbase.dialect;

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatement;
import com.dtstack.chunjun.connector.oceanbase.converter.OceanbaseOracleSyncConverter;
import com.dtstack.chunjun.connector.oracle.converter.OracleRawTypeConverter;
import com.dtstack.chunjun.connector.oracle.dialect.OracleDialect;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.converter.RawTypeMapper;

import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import io.vertx.core.json.JsonArray;

import java.sql.ResultSet;
import java.util.Optional;

public class OceanbaseOracleModeDialect extends OracleDialect {
OceanbaseDialect oceanbaseDialect = new OceanbaseDialect();

@Override
public String dialectName() {
return oceanbaseDialect.dialectName();
}

@Override
public boolean canHandle(String url) {
return oceanbaseDialect.canHandle(url);
}

@Override
public RawTypeMapper getRawTypeConverter() {
return OracleRawTypeConverter::apply;
}

@Override
public Optional<String> defaultDriverName() {
return oceanbaseDialect.defaultDriverName();
}

@Override
public AbstractRowConverter<ResultSet, JsonArray, FieldNamedPreparedStatement, LogicalType>
getColumnConverter(RowType rowType, CommonConfig commonConfig) {
return new OceanbaseOracleSyncConverter(rowType, commonConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,31 @@
package com.dtstack.chunjun.connector.oceanbase.sink;

import com.dtstack.chunjun.config.SyncConfig;
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.sink.JdbcSinkFactory;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf;
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;

public class OceanbaseSinkFactory extends JdbcSinkFactory {

private OceanBaseConf oceanBaseConf;

public OceanbaseSinkFactory(SyncConfig syncConfig) {
super(syncConfig, new OceanbaseDialect());
super(syncConfig, new OceanbaseMysqlModeDialect());
this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig;
if (oceanBaseConf != null) {
OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
// 若是for oracle模式
if (mode == OceanBaseMode.ORACLE) {
this.jdbcDialect = new OceanbaseOracleModeDialect();
}
}
}

@Override
protected Class<? extends JdbcConfig> getConfClass() {
return OceanBaseConf.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,76 @@
package com.dtstack.chunjun.connector.oceanbase.source;

import com.dtstack.chunjun.config.SyncConfig;
import com.dtstack.chunjun.connector.jdbc.config.JdbcConfig;
import com.dtstack.chunjun.connector.jdbc.source.JdbcSourceFactory;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseDialect;
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseConf;
import com.dtstack.chunjun.connector.oceanbase.config.OceanBaseMode;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseOracleModeDialect;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.commons.lang3.StringUtils;

import java.util.Properties;

public class OceanbaseSourceFactory extends JdbcSourceFactory {
// 默认是Mysql流式拉取
private static final int DEFAULT_FETCH_SIZE = Integer.MIN_VALUE;
private static final String ORACLE_JDBC_READ_TIMEOUT = "oracle.jdbc.ReadTimeout";
private static final String ORACLE_NET_CONNECT_TIMEOUT = "oracle.net.CONNECT_TIMEOUT";

private OceanBaseConf oceanBaseConf;

public OceanbaseSourceFactory(SyncConfig syncConfig, StreamExecutionEnvironment env) {
super(syncConfig, env, new OceanbaseDialect());
if (jdbcConfig.isPolling()
&& StringUtils.isEmpty(jdbcConfig.getStartLocation())
&& jdbcConfig.getFetchSize() == 0) {
jdbcConfig.setFetchSize(1000);
super(syncConfig, env, new OceanbaseMysqlModeDialect()); // 默认为mysql方言
this.oceanBaseConf = (OceanBaseConf) this.jdbcConfig;
if (oceanBaseConf != null) {
OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
// 若是for oracle模式
if (mode == OceanBaseMode.ORACLE) {
// 设置for oracle方言
this.jdbcDialect = new OceanbaseOracleModeDialect();
Properties properties = jdbcConfig.getProperties();
if (properties == null) {
properties = new Properties();
}
if (jdbcConfig.getConnectTimeOut() != 0) {
// queryTimeOut单位是秒 需要转换成毫秒
properties.putIfAbsent(
ORACLE_JDBC_READ_TIMEOUT,
String.valueOf(jdbcConfig.getQueryTimeOut() * 1000));
properties.putIfAbsent(
ORACLE_NET_CONNECT_TIMEOUT,
String.valueOf(jdbcConfig.getQueryTimeOut() * 3 * 1000));
jdbcConfig.setProperties(properties);
}
} else {
// 其他情况:for mysql模式 初始化
// 避免result.next阻塞
if (jdbcConfig.isPolling()
&& StringUtils.isEmpty(jdbcConfig.getStartLocation())
&& jdbcConfig.getFetchSize() == 0) {
jdbcConfig.setFetchSize(1000);
}
}
}
}

@Override
protected Class<? extends JdbcConfig> getConfClass() {
return OceanBaseConf.class;
}

@Override
protected int getDefaultFetchSize() {
if (oceanBaseConf != null) {
OceanBaseMode mode = OceanBaseMode.valueOf(oceanBaseConf.getOceanBaseMode());
// 处理for oracle情况
if (mode == OceanBaseMode.ORACLE) {
return super.getDefaultFetchSize();
}
}
return DEFAULT_FETCH_SIZE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.dtstack.chunjun.connector.oceanbase.table;

import com.dtstack.chunjun.connector.jdbc.dialect.JdbcDialect;
import com.dtstack.chunjun.connector.mysql.table.MysqlDynamicTableFactory;
import com.dtstack.chunjun.connector.oceanbase.dialect.OceanbaseMysqlModeDialect;

public class MysqlDynamicTableFactoryProxy extends MysqlDynamicTableFactory {
@Override
protected JdbcDialect getDialect() {
return new OceanbaseMysqlModeDialect();
}
}
Loading
Loading