Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ public final void prepareRun(BatchSinkContext context) throws Exception {

// Get the required bucket name and bucket instance (if it exists)
Storage storage = GCPUtils.getStorage(project, credentials);
String bucketName = getStagingBucketName(context, config, dataset);
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
dataset, config.getBucket());
bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
Bucket bucket = storage.get(bucketName);

if (!context.isPreviewEnabled()) {
Expand Down Expand Up @@ -306,34 +308,4 @@ protected Configuration getOutputConfiguration() throws IOException {
return configuration;
}

/**
* Identify a stating bucket name from the pipeline context and sink configuration
* @param context Sink Context
* @param config Sink Configuration
* @return Bucket name to use for this sink.
*/
protected String getStagingBucketName(BatchSinkContext context,
AbstractBigQuerySinkConfig config,
@Nullable Dataset dataset) {
// Get temporary bucket name from sink configuration
String bucketName = config.getBucket();

// Get Bucket Prefix from configuration
String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments());

// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
// we must set this prefix along with the destination location in order to create/reuse the bucket.
// Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare
// for a new bucket creation.
if (bucketName == null && bucketPrefix != null) {
// If the destination dataset exists, use the dataset location. Otherwise, use location from configuration.
String datasetLocation = dataset != null ? dataset.getLocation() : config.getLocation();

// Get the bucket name for the specified location.
bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation);
}

return BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void prepareRun(BatchSourceContext context) throws Exception {
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
config.getServiceAccountType());

String bucketName = getBucketName(context, dataset);
String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), null,
dataset, config.getBucket());

// Configure GCS Bucket to use
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration,
Expand Down Expand Up @@ -392,20 +393,4 @@ private void emitLineage(BatchSourceContext context, Schema schema, Type sourceT
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
}
}

@Nullable
private String getBucketName(BatchSourceContext context, Dataset dataset) {
// Get the bucket name from configuration, and the bucket prefix if defined.
String bucketName = config.getBucket();
String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments());

// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
// we must set this prefix along with the source dataset location.
// Otherwise, return the original bucket name (may be null).
if (bucketName == null && bucketPrefix != null) {
bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, dataset.getLocation());
}

return bucketName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import io.cdap.cdap.api.RuntimeContext;
import io.cdap.cdap.api.SQLEngineContext;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
Expand Down Expand Up @@ -116,7 +116,7 @@ public class BigQuerySQLEngine
private Configuration configuration;
private String project;
private String datasetProject;
private String dataset;
private String datasetName;
private String bucket;
private String runId;
private Map<String, BigQuerySQLDataset> datasets;
Expand Down Expand Up @@ -153,12 +153,14 @@ public void prepareRun(SQLEngineContext context) throws Exception {
null : GCPUtils.loadServiceAccountCredentials(serviceAccount, sqlEngineConfig.isServiceAccountFilePath());
project = sqlEngineConfig.getProject();
datasetProject = sqlEngineConfig.getDatasetProject();
dataset = sqlEngineConfig.getDataset();
bucket = sqlEngineConfig.getBucket() != null ? sqlEngineConfig.getBucket() : "bqpushdown-" + runId;
datasetName = sqlEngineConfig.getDataset();

// Initialize BQ and GCS clients.
bigQuery = GCPUtils.getBigQuery(project, credentials);
storage = GCPUtils.getStorage(project, credentials);
Dataset dataset = bigQuery.getDataset(DatasetId.of(datasetProject, datasetName));
bucket = BigQueryUtil.getStagingBucketName(context.getRuntimeArguments(), sqlEngineConfig.getLocation(),
dataset, sqlEngineConfig.getBucket());

String cmekKey = !Strings.isNullOrEmpty(sqlEngineConfig.cmekKey) ? sqlEngineConfig.cmekKey :
ctx.getRuntimeArguments().get(CmekUtils.CMEK_KEY);
Expand All @@ -169,11 +171,12 @@ public void prepareRun(SQLEngineContext context) throws Exception {
configuration = BigQueryUtil.getBigQueryConfig(sqlEngineConfig.getServiceAccount(), sqlEngineConfig.getProject(),
cmekKeyName, sqlEngineConfig.getServiceAccountType());
// Create resources needed for this execution
BigQuerySinkUtils.createResources(bigQuery, storage, DatasetId.of(datasetProject, dataset), bucket,
BigQuerySinkUtils.createResources(bigQuery, storage, DatasetId.of(datasetProject, datasetName), bucket,
sqlEngineConfig.getLocation(), cmekKeyName);
// Configure GCS bucket that is used to stage temporary files.
// If the bucket is created for this run, mar it for deletion after executon is completed
BigQuerySinkUtils.configureBucket(configuration, bucket, runId, sqlEngineConfig.getBucket() == null);
// If the bucket is created for this run, mark it for deletion after executon is completed
String fallbackBucketName = "bqpushdown-" + runId;
BigQuerySinkUtils.configureBucket(configuration, bucket, fallbackBucketName);

// Configure credentials for the source
BigQuerySourceUtils.configureServiceAccount(configuration, sqlEngineConfig.connection);
Expand Down Expand Up @@ -210,7 +213,7 @@ public SQLPushDataset<StructuredRecord, StructuredRecord, NullWritable> getPushP
sqlEngineConfig,
configuration,
bigQuery,
DatasetId.of(datasetProject, dataset),
DatasetId.of(datasetProject, datasetName),
bucket,
runId);

Expand Down Expand Up @@ -241,7 +244,7 @@ public SQLPullDataset<StructuredRecord, LongWritable, GenericData.Record> getPul
return BigQueryPullDataset.getInstance(sqlPullRequest,
configuration,
bigQuery,
DatasetId.of(datasetProject, dataset),
DatasetId.of(datasetProject, datasetName),
table,
bucket,
runId);
Expand Down Expand Up @@ -306,7 +309,7 @@ public SQLDataset join(SQLJoinRequest sqlJoinRequest) throws SQLEngineException
// Get SQL builder for this Join operation
BigQueryJoinSQLBuilder builder = new BigQueryJoinSQLBuilder(
sqlJoinRequest.getJoinDefinition(),
DatasetId.of(datasetProject, dataset),
DatasetId.of(datasetProject, datasetName),
getStageNameToBQTableNameMap());

// Execute Select job with the supplied query.
Expand All @@ -328,7 +331,7 @@ public SQLDatasetProducer getProducer(SQLPullRequest pullRequest, PullCapability

return new BigQuerySparkDatasetProducer(sqlEngineConfig,
datasetProject,
dataset,
datasetName,
table,
pullRequest.getDatasetSchema());
}
Expand Down Expand Up @@ -363,7 +366,7 @@ public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException
String destinationTable = BigQuerySQLEngineUtils.getNewTableName(runId);

// Create empty table to store query results.
TableId destinationTableId = TableId.of(datasetProject, dataset, destinationTable);
TableId destinationTableId = TableId.of(datasetProject, this.datasetName, destinationTable);

// Build Big Query Write instance and execute write operation.
BigQueryReadDataset readDataset = BigQueryReadDataset.getInstance(datasetName,
Expand Down Expand Up @@ -393,7 +396,7 @@ public SQLWriteResult write(SQLWriteRequest writeRequest) {

// Get source table information (from the stage we are attempting to write into the sink)
String sourceTable = datasets.get(writeRequest.getDatasetName()).getBigQueryTable();
TableId sourceTableId = TableId.of(datasetProject, dataset, sourceTable);
TableId sourceTableId = TableId.of(datasetProject, this.datasetName, sourceTable);

// Build Big Query Write instance and execute write operation.
BigQueryWrite bigQueryWrite = BigQueryWrite.getInstance(datasetName,
Expand Down Expand Up @@ -560,15 +563,15 @@ private BigQuerySelectDataset executeSelect(String datasetName,
String table = BigQuerySQLEngineUtils.getNewTableName(runId);

// Create empty table to store query results.
BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, project, dataset, table);
BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, project, this.datasetName, table);

BigQuerySelectDataset selectDataset = BigQuerySelectDataset.getInstance(
datasetName,
outputSchema,
sqlEngineConfig,
bigQuery,
project,
DatasetId.of(datasetProject, dataset),
DatasetId.of(datasetProject, this.datasetName),
table,
jobId,
jobType,
Expand Down Expand Up @@ -640,7 +643,7 @@ protected void deleteTable(String stageName, BigQuerySQLDataset bqDataset) throw
}

String tableName = bqDataset.getBigQueryTable();
TableId tableId = TableId.of(datasetProject, dataset, tableName);
TableId tableId = TableId.of(datasetProject, datasetName, tableName);

// Delete this table if found
if (bigQuery.getTable(tableId) != null) {
Expand Down
32 changes: 30 additions & 2 deletions src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.client.googleapis.media.MediaHttpUploader;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
Expand Down Expand Up @@ -792,9 +793,9 @@ public static String getFQN(String datasetProject, String datasetName, String ta
* @return the bucket prefix to use for this pipeline
*/
@Nullable
public static String getBucketPrefix(SettableArguments arguments) {
public static String getBucketPrefix(Map<String, String> arguments) {
// If the bucket prefix property is set, use it.
if (arguments.has(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) {
if (arguments.containsKey(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) {
String bucketPrefix = arguments.get(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME);
validateBucketPrefix(bucketPrefix);
LOG.debug("Using bucket prefix for temporary buckets: {}", bucketPrefix);
Expand Down Expand Up @@ -858,4 +859,31 @@ public static Map<String, String> getJobTags(String jobType) {
labels.put("type", jobType);
return labels;
}

/**
* Identify a stating bucket name from the pipeline context and plugin configuration
* @param arguments runtime arguments
* @param configLocation location from plugin configuration
* @param dataset BigQuery dataset
* @param bucket bucket from plugin configuration
* @return Bucket name to use for this sink.
*/
@Nullable
public static String getStagingBucketName(Map<String, String> arguments, @Nullable String configLocation,
@Nullable Dataset dataset, @Nullable String bucket) {
// Get Bucket Prefix from configuration
String bucketPrefix = BigQueryUtil.getBucketPrefix(arguments);

// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
// we must set this prefix along with the destination location in order to create/reuse the bucket.
// Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare
// for a new bucket creation.
if (Strings.isNullOrEmpty(bucket) && bucketPrefix != null) {
// If the destination dataset exists, use the dataset location. Otherwise, use location from configuration.
String datasetLocation = dataset != null ? dataset.getLocation() : configLocation;
// Get the bucket name for the specified location.
bucket = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation);
}
return bucket;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this behave is the supplied Bucket is nullable and there is no prefix set? As far as I can tell, the returned bucket will be null.

If this is the case, please add the @Nullable annotation to this function.

Following question is: Should this function return a bucket name if not supplied and no prefix is set?

Copy link
Contributor Author

@itsankit-google itsankit-google Nov 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is expected to return null in that case. Added the @Nullable annotation.

No we don't expect this method to return a bucket name if not supplied and if no prefix is set.
BigQuerySinkUtils#configureBucket and BigQuerySourceUtils#getOrCreateBucket expect the bucket argument to be nullable and will keep the existing behaviour.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Job;
Expand Down Expand Up @@ -284,23 +285,29 @@ private void prepareRunBigQueryDataset(BatchSinkContext context) throws Exceptio
baseConfiguration = getBaseConfiguration(cmekKeyName);
// asset.getResourceSpec().getName() will be of format 'projects/datasetProjectName/datasets/datasetName'
String[] assetValues = asset.getResourceSpec().getName().split("/");
String dataset = assetValues[assetValues.length - 1];
String datasetName = assetValues[assetValues.length - 1];
String datasetProject = assetValues[assetValues.length - 3];
bigQuery = GCPUtils.getBigQuery(datasetProject, credentials);
String bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, null, runUUID.toString());
// Get required dataset ID and dataset instance (if it exists)
DatasetId datasetId = DatasetId.of(datasetProject, datasetName);
Dataset dataset = bigQuery.getDataset(datasetId);
String bucket = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(),
dataset, null);
String fallbackBucketName = "dataplex-" + runUUID;
bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, bucket, fallbackBucketName);
if (!context.isPreviewEnabled()) {
BigQuerySinkUtils.createResources(bigQuery, GCPUtils.getStorage(project, credentials),
DatasetId.of(datasetProject, dataset),
DatasetId.of(datasetProject, datasetName),
bucket, config.getLocation(), cmekKeyName);
}

Schema configSchema = config.getSchema(collector);
Schema outputSchema = configSchema == null ? context.getInputSchema() : configSchema;
configureTable(outputSchema, dataset, datasetProject, collector);
configureTable(outputSchema, datasetName, datasetProject, collector);
configureBigQuerySink();
initOutput(context, bigQuery,
config.getReferenceName(BigQueryUtil.getFQN(datasetProject, dataset, config.getTable())),
config.getTable(), outputSchema, bucket, collector, dataset, datasetProject);
config.getReferenceName(BigQueryUtil.getFQN(datasetProject, datasetName, config.getTable())),
config.getTable(), outputSchema, bucket, collector, datasetName, datasetProject);
}


Expand Down
Loading