Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class ConnectionConfig extends PluginConfig implements DatabaseC
public static final String CONNECTION_ARGUMENTS = "connectionArguments";
public static final String JDBC_PLUGIN_NAME = "jdbcPluginName";
public static final String JDBC_PLUGIN_TYPE = "jdbc";
public static final String TRANSACTION_ISOLATION_LEVEL = "transactionIsolationLevel";


@Name(JDBC_PLUGIN_NAME)
@Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.db.connector.AbstractDBConnectorConfig;
import io.cdap.plugin.db.source.AbstractDBSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -49,6 +51,7 @@ public abstract class AbstractDBSpecificSourceConfig extends PluginConfig implem
public static final String DATABASE = "database";
public static final String FETCH_SIZE = "fetchSize";
public static final String DEFAULT_FETCH_SIZE = "1000";
public static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificSourceConfig.class);

@Name(Constants.Reference.REFERENCE_NAME)
@Description(Constants.Reference.REFERENCE_NAME_DESCRIPTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.cdap.plugin.common.KeyValueListParser;
import io.cdap.plugin.common.db.DBConnectorProperties;
import io.cdap.plugin.db.ConnectionConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -37,6 +39,8 @@
*/
public abstract class AbstractDBConnectorConfig extends PluginConfig implements DBConnectorProperties {

private static final Logger LOG = LoggerFactory.getLogger(AbstractDBConnectorConfig.class);

@Name(ConnectionConfig.JDBC_PLUGIN_NAME)
@Description("Name of the JDBC driver to use. This is the value of the 'jdbcPluginName' key defined in the JSON " +
"file for the JDBC plugin.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.Connection;
Expand All @@ -56,6 +58,7 @@ public abstract class AbstractDBSpecificConnector<T extends DBWritable> extends
implements BatchConnector<LongWritable, T> {

private final AbstractDBConnectorConfig config;
private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificConnector.class);

protected AbstractDBSpecificConnector(AbstractDBConnectorConfig config) {
super(config);
Expand Down Expand Up @@ -99,6 +102,7 @@ public InputFormatProvider getInputFormatProvider(ConnectorContext context, Samp
tableQuery, null, false);
connectionConfigAccessor.setConnectionArguments(Maps.fromProperties(config.getConnectionArgumentsProperties()));
connectionConfigAccessor.getConfiguration().setInt(MRJobConfig.NUM_MAPS, 1);
LOG.debug("Moving inside AbstractDBConnectorConfig");
Map<String, String> additionalArguments = config.getAdditionalArguments();
for (Map.Entry<String, String> argument : additionalArguments.entrySet()) {
connectionConfigAccessor.getConfiguration().set(argument.getKey(), argument.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.plugin.db.ConnectionConfig;
import io.cdap.plugin.db.TransactionIsolationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

Expand All @@ -30,9 +34,11 @@
*/
public abstract class AbstractDBSpecificConnectorConfig extends AbstractDBConnectorConfig {

private static final Logger LOG = LoggerFactory.getLogger(AbstractDBSpecificConnectorConfig.class);
@Name(ConnectionConfig.HOST)
@Description("Database host")
@Macro

@Nullable
protected String host;

Expand All @@ -42,6 +48,12 @@ public abstract class AbstractDBSpecificConnectorConfig extends AbstractDBConnec
@Nullable
protected Integer port;

@Name(ConnectionConfig.TRANSACTION_ISOLATION_LEVEL)
@Description("The transaction isolation level for the database session.")
@Macro
@Nullable
protected String transactionIsolationLevel;

public String getHost() {
return host;
}
Expand All @@ -55,4 +67,22 @@ public int getPort() {
public boolean canConnect() {
return super.canConnect() && !containsMacro(ConnectionConfig.HOST) && !containsMacro(ConnectionConfig.PORT);
}

@Override
public Map<String, String> getAdditionalArguments() {
Map<String, String> additonalArguments = new HashMap<>();
LOG.debug("inside get AdditionalArguemnts of AbstractDBSpecificConnectorConfig");
if (getTransactionIsolationLevel() != null) {
additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel());
}
return additonalArguments;
}

public String getTransactionIsolationLevel() {
if (transactionIsolationLevel == null) {
return null;
}
return TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
}
}

6 changes: 6 additions & 0 deletions mysql-plugin/docs/MySQL-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ authentication. Optional for databases that do not require authentication.

**Password:** Password to use to connect to the specified database.

**Transaction Isolation Level** The transaction isolation level of the databse connection
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.

**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
will be passed to the JDBC driver, as connection arguments, for JDBC drivers that may need additional configurations.
This is a semicolon-separated list of key-value pairs, where each pair is separated by a equals '=' and specifies
Expand Down
6 changes: 6 additions & 0 deletions mysql-plugin/docs/Mysql-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ You also can use the macro function ${conn(connection-name)}.

**Password:** Password to use to connect to the specified database.

**Transaction Isolation Level** The transaction isolation level of the databse connection
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.

**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.

Expand Down
6 changes: 6 additions & 0 deletions mysql-plugin/docs/Mysql-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s

**Password:** Password to use to connect to the specified database.

**Transaction Isolation Level** The transaction isolation level of the databse connection
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.
- TRANSACTION_READ_UNCOMMITTED: Allows dirty reads (reading uncommitted changes from other transactions). Non-repeatable reads and phantom reads are possible.

**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,15 @@ public MysqlConnectorConfig getConnection() {
return connection;
}

@Override
public String getTransactionIsolationLevel() {
return connection.getTransactionIsolationLevel();
}

@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
connection.getAdditionalArguments();
super.validate(collector);
}

Expand Down
14 changes: 14 additions & 0 deletions mysql-plugin/widgets/MySQL-connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@
"widget-attributes": {
"default": "3306"
}
},
{
"widget-type": "select",
"label": "Transaction Isolation Level",
"name": "transactionIsolationLevel",
"widget-attributes": {
"values": [
"TRANSACTION_READ_UNCOMMITTED",
"TRANSACTION_READ_COMMITTED",
"TRANSACTION_REPEATABLE_READ",
"TRANSACTION_SERIALIZABLE"
],
"default": "TRANSACTION_SERIALIZABLE"
}
}
]
},
Expand Down
18 changes: 18 additions & 0 deletions mysql-plugin/widgets/Mysql-batchsink.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@
"label": "Password",
"name": "password"
},
{
"widget-type": "select",
"label": "Transaction Isolation Level",
"name": "transactionIsolationLevel",
"widget-attributes": {
"values": [
"TRANSACTION_READ_UNCOMMITTED",
"TRANSACTION_READ_COMMITTED",
"TRANSACTION_REPEATABLE_READ",
"TRANSACTION_SERIALIZABLE"
],
"default": "TRANSACTION_SERIALIZABLE"
}
},
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
Expand Down Expand Up @@ -225,6 +239,10 @@
"type": "property",
"name": "password"
},
{
"type": "property",
"name": "transactionIsolationLevel"
},
{
"type": "property",
"name": "host"
Expand Down
18 changes: 18 additions & 0 deletions mysql-plugin/widgets/Mysql-batchsource.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@
"label": "Password",
"name": "password"
},
{
"widget-type": "select",
"label": "Transaction Isolation Level",
"name": "transactionIsolationLevel",
"widget-attributes": {
"values": [
"TRANSACTION_READ_UNCOMMITTED",
"TRANSACTION_READ_COMMITTED",
"TRANSACTION_REPEATABLE_READ",
"TRANSACTION_SERIALIZABLE"
],
"default": "TRANSACTION_SERIALIZABLE"
}
},
{
"widget-type": "keyvalue",
"label": "Connection Arguments",
Expand Down Expand Up @@ -277,6 +291,10 @@
"type": "property",
"name": "password"
},
{
"type": "property",
"name": "transactionIsolationLevel"
},
{
"type": "property",
"name": "host"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,6 @@ public String getConnectionString() {
@Macro
private String database;

@Name(OracleConstants.TRANSACTION_ISOLATION_LEVEL)
@Description("The transaction isolation level for the database session.")
@Macro
@Nullable
private String transactionIsolationLevel;

@Name(OracleConstants.USE_SSL)
@Description("Turns on SSL encryption. Connection will fail if SSL is not available")
@Nullable
Expand Down Expand Up @@ -124,6 +118,7 @@ public Properties getConnectionArgumentsProperties() {
return prop;
}

@Override
public String getTransactionIsolationLevel() {
//if null default to the highest isolation level possible
if (transactionIsolationLevel == null) {
Expand All @@ -133,16 +128,7 @@ public String getTransactionIsolationLevel() {
//This ensures that the role is mapped to the right serialization level, even w/ incorrect user input
//if role is SYSDBA or SYSOP it will map to read_committed. else serialized
return (!getRole().equals(ROLE_NORMAL)) ? TransactionIsolationLevel.Level.TRANSACTION_READ_COMMITTED.name() :
TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
}

@Override
public Map<String, String> getAdditionalArguments() {
Map<String, String> additonalArguments = new HashMap<>();
if (getTransactionIsolationLevel() != null) {
additonalArguments.put(TransactionIsolationLevel.CONF_KEY, getTransactionIsolationLevel());
}
return additonalArguments;
TransactionIsolationLevel.Level.valueOf(transactionIsolationLevel).name();
}

@Override
Expand Down
5 changes: 5 additions & 0 deletions postgresql-plugin/docs/PostgreSQL-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ authentication. Optional for databases that do not require authentication.

**Password:** Password to use to connect to the specified database.

**Transaction Isolation Level** The transaction isolation level of the databse connection
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.

**Database:** The name of the database to connect to.

**Connection Arguments:** A list of arbitrary string tag/value pairs as connection arguments. These arguments
Expand Down
5 changes: 5 additions & 0 deletions postgresql-plugin/docs/Postgres-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ You also can use the macro function ${conn(connection-name)}.

**Password:** Password to use to connect to the specified database.

**Transaction Isolation Level** The transaction isolation level of the databse connection
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.

**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.

Expand Down
5 changes: 5 additions & 0 deletions postgresql-plugin/docs/Postgres-batchsource.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is s

**Password:** Password to use to connect to the specified database.

**Transaction Isolation Level** The transaction isolation level of the databse connection
- TRANSACTION_READ_COMMITTED: No dirty reads. Non-repeatable reads and phantom reads are possible.
- TRANSACTION_SERIALIZABLE: No dirty reads. Non-repeatable and phantom reads are prevented.
- TRANSACTION_REPEATABLE_READ: No dirty reads. Prevents non-repeatable reads, but phantom reads are still possible.

**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Macro;
import io.cdap.cdap.api.annotation.Name;
import io.cdap.plugin.db.TransactionIsolationLevel;
import io.cdap.plugin.db.connector.AbstractDBSpecificConnectorConfig;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.cdap.plugin.db.source.AbstractDBSource;
import io.cdap.plugin.util.DBUtils;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import javax.annotation.Nullable;
Expand All @@ -50,6 +52,8 @@
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = PostgresConnector.NAME)})
public class PostgresSource extends AbstractDBSource<PostgresSource.PostgresSourceConfig> {

private static final Logger LOG = LoggerFactory.getLogger(PostgresSource.class);

private final PostgresSourceConfig postgresSourceConfig;

public PostgresSource(PostgresSourceConfig postgresSourceConfig) {
Expand Down Expand Up @@ -143,9 +147,15 @@ protected PostgresConnectorConfig getConnection() {
return connection;
}

@Override
public String getTransactionIsolationLevel() {
return connection.getTransactionIsolationLevel();
}

@Override
public void validate(FailureCollector collector) {
ConfigUtil.validateConnection(this, useConnection, connection, collector);
connection.getAdditionalArguments();
super.validate(collector);
}

Expand Down
Loading