Skip to content

Commit 3adb4b6

Browse files
committed
PLUGIN-1413 Added argument for BQ temporary staging bucket names
1 parent 4315efc commit 3adb4b6

File tree

5 files changed

+290
-14
lines changed

5 files changed

+290
-14
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
import com.google.auth.Credentials;
1919
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.Dataset;
2021
import com.google.cloud.bigquery.DatasetId;
2122
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
2223
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
2324
import com.google.cloud.kms.v1.CryptoKeyName;
25+
import com.google.cloud.storage.Bucket;
26+
import com.google.cloud.storage.Storage;
2427
import com.google.common.base.Strings;
2528
import io.cdap.cdap.api.data.batch.Output;
2629
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
@@ -32,7 +35,6 @@
3235
import io.cdap.cdap.etl.api.batch.BatchSink;
3336
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
3437
import io.cdap.plugin.common.Asset;
35-
import io.cdap.plugin.common.ReferenceNames;
3638
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
3739
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
3840
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
@@ -88,14 +90,23 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
8890
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
8991
collector.getOrThrowException();
9092
baseConfiguration = getBaseConfiguration(cmekKeyName);
91-
String bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, config.getBucket(), runUUID.toString());
93+
94+
// Get required dataset ID and dataset instance (if it exists)
95+
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
96+
Dataset dataset = bigQuery.getDataset(datasetId);
97+
98+
// Get the required bucket name and bucket instance (if it exists)
99+
Storage storage = GCPUtils.getStorage(project, credentials);
100+
String bucketName = getStagingBucketName(context, config, dataset);
101+
Bucket bucket = storage.get(bucketName);
102+
92103
if (!context.isPreviewEnabled()) {
93-
BigQuerySinkUtils.createResources(bigQuery, GCPUtils.getStorage(project, credentials),
94-
DatasetId.of(config.getDatasetProject(), config.getDataset()),
95-
bucket, config.getLocation(), cmekKeyName);
104+
BigQuerySinkUtils.createResources(bigQuery, dataset, datasetId,
105+
storage, bucket, bucketName,
106+
config.getLocation(), cmekKeyName);
96107
}
97108

98-
prepareRunInternal(context, bigQuery, bucket);
109+
prepareRunInternal(context, bigQuery, bucketName);
99110
}
100111

101112
@Override
@@ -291,4 +302,34 @@ protected Configuration getOutputConfiguration() throws IOException {
291302
return configuration;
292303
}
293304

305+
/**
306+
* Identify a stating bucket name from the pipeline context and sink configuration
307+
* @param context Sink Context
308+
* @param config Sink Configuration
309+
* @return Bucket name to use for this sink.
310+
*/
311+
protected String getStagingBucketName(BatchSinkContext context,
312+
AbstractBigQuerySinkConfig config,
313+
@Nullable Dataset dataset) {
314+
// Get temporary bucket name from sink configuration
315+
String bucketName = config.getBucket();
316+
317+
// Get Bucket Prefix from configuration
318+
String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments());
319+
320+
// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
321+
// we must set this prefix along with the destination location in order to create/reuse the bucket.
322+
// Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare
323+
// for a new bucket creation.
324+
if (bucketName == null && bucketPrefix != null) {
325+
// If the destination dataset exists, use the dataset location. Otherwise, use location from configuration.
326+
String datasetLocation = dataset != null ? dataset.getLocation() : config.getLocation();
327+
328+
// Get the bucket name for the specified location.
329+
bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation);
330+
}
331+
332+
return BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
333+
}
334+
294335
}

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,25 +111,50 @@ public final class BigQuerySinkUtils {
111111
public static void createResources(BigQuery bigQuery, Storage storage,
112112
DatasetId datasetId, String bucketName, @Nullable String location,
113113
@Nullable CryptoKeyName cmekKeyName) throws IOException {
114+
// Get the required Dataset and bucket instances using the supplied clients
114115
Dataset dataset = bigQuery.getDataset(datasetId);
115116
Bucket bucket = storage.get(bucketName);
116117

118+
createResources(bigQuery, dataset, datasetId, storage, bucket, bucketName, location, cmekKeyName);
119+
}
120+
121+
/**
122+
* Creates the given dataset and bucket if the supplied ones are null.
123+
*
124+
* If the dataset already exists but the
125+
* bucket does not, the bucket will be created in the same location as the dataset. If the bucket already exists
126+
* but the dataset does not, the dataset will attempt to be created in the same location. This may fail if the bucket
127+
* is in a location that BigQuery does not yet support.
128+
*
129+
* @param bigQuery the bigquery client for the project
130+
* @param dataset the bigquery dataset instance (may be null)
131+
* @param datasetId the Id of the dataset
132+
* @param storage the storage client for the project
133+
* @param bucket the storage bucket instance (may be null)
134+
* @param bucketName the name of the bucket
135+
* @param location the location of the resources, this is only applied if both the bucket and dataset do not exist
136+
* @param cmekKey the name of the cmek key
137+
* @throws IOException
138+
*/
139+
public static void createResources(BigQuery bigQuery, @Nullable Dataset dataset, DatasetId datasetId,
140+
Storage storage, @Nullable Bucket bucket, String bucketName,
141+
@Nullable String location, @Nullable CryptoKeyName cmekKey) throws IOException {
117142
if (dataset == null && bucket == null) {
118-
createBucket(storage, bucketName, location, cmekKeyName,
143+
createBucket(storage, bucketName, location, cmekKey,
119144
() -> String.format("Unable to create Cloud Storage bucket '%s'", bucketName));
120-
createDataset(bigQuery, datasetId, location, cmekKeyName,
145+
createDataset(bigQuery, datasetId, location, cmekKey,
121146
() -> String.format("Unable to create BigQuery dataset '%s.%s'", datasetId.getProject(),
122147
datasetId.getDataset()));
123148
} else if (bucket == null) {
124149
createBucket(
125-
storage, bucketName, dataset.getLocation(), cmekKeyName,
150+
storage, bucketName, dataset.getLocation(), cmekKey,
126151
() -> String.format(
127152
"Unable to create Cloud Storage bucket '%s' in the same location ('%s') as BigQuery dataset '%s'. "
128153
+ "Please use a bucket that is in the same location as the dataset.",
129154
bucketName, dataset.getLocation(), datasetId.getProject() + "." + datasetId.getDataset()));
130155
} else if (dataset == null) {
131156
createDataset(
132-
bigQuery, datasetId, bucket.getLocation(), cmekKeyName,
157+
bigQuery, datasetId, bucket.getLocation(), cmekKey,
133158
() -> String.format(
134159
"Unable to create BigQuery dataset '%s' in the same location ('%s') as Cloud Storage bucket '%s'. "
135160
+ "Please use a bucket that is in a supported location.",

src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.google.cloud.bigquery.TimePartitioning;
2929
import com.google.cloud.kms.v1.CryptoKeyName;
3030
import com.google.cloud.storage.Storage;
31-
import com.google.common.base.Strings;
3231
import com.google.common.collect.ImmutableMap;
3332
import com.google.gson.Gson;
3433
import io.cdap.cdap.api.annotation.Description;
@@ -52,11 +51,9 @@
5251
import io.cdap.cdap.etl.api.validation.ValidationFailure;
5352
import io.cdap.plugin.common.Asset;
5453
import io.cdap.plugin.common.LineageRecorder;
55-
import io.cdap.plugin.common.ReferenceNames;
5654
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
5755
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
5856
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
59-
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
6057
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
6158
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
6259
import io.cdap.plugin.gcp.common.CmekUtils;
@@ -150,10 +147,12 @@ public void prepareRun(BatchSourceContext context) throws Exception {
150147
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
151148
config.getServiceAccountType());
152149

150+
String bucketName = getBucketName(context, dataset);
151+
153152
// Configure GCS Bucket to use
154153
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration,
155154
storage,
156-
config.getBucket(),
155+
bucketName,
157156
dataset,
158157
bucketPath,
159158
cmekKeyName);
@@ -393,4 +392,20 @@ private void emitLineage(BatchSourceContext context, Schema schema, Type sourceT
393392
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
394393
}
395394
}
395+
396+
@Nullable
397+
private String getBucketName(BatchSourceContext context, Dataset dataset) {
398+
// Get the bucket name from configuration, and the bucket prefix if defined.
399+
String bucketName = config.getBucket();
400+
String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments());
401+
402+
// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
403+
// we must set this prefix along with the source dataset location.
404+
// Otherwise, return the original bucket name (may be null).
405+
if (bucketName == null && bucketPrefix != null) {
406+
bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, dataset.getLocation());
407+
}
408+
409+
return bucketName;
410+
}
396411
}

src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929
import com.google.cloud.bigquery.TimePartitioning;
3030
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
3131
import com.google.cloud.kms.v1.CryptoKeyName;
32+
import com.google.common.annotations.VisibleForTesting;
3233
import com.google.common.base.Strings;
3334
import com.google.common.collect.ImmutableMap;
3435
import com.google.common.collect.ImmutableSet;
3536
import io.cdap.cdap.api.data.schema.Schema;
3637
import io.cdap.cdap.etl.api.FailureCollector;
38+
import io.cdap.cdap.etl.api.action.SettableArguments;
3739
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
3840
import io.cdap.cdap.etl.api.validation.InvalidStageException;
3941
import io.cdap.cdap.etl.api.validation.ValidationFailure;
@@ -63,6 +65,8 @@
6365
import java.util.regex.Pattern;
6466
import java.util.stream.Collectors;
6567
import java.util.stream.StreamSupport;
68+
import java.util.zip.CRC32;
69+
import java.util.zip.Checksum;
6670
import javax.annotation.Nullable;
6771

6872
/**
@@ -73,6 +77,7 @@ public final class BigQueryUtil {
7377
private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class);
7478

7579
private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
80+
private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix";
7681

7782
public static final String BUCKET_PATTERN = "[a-z0-9._-]+";
7883
public static final String DATASET_PATTERN = "[A-Za-z0-9_]+";
@@ -772,4 +777,66 @@ public static String getFQN(String datasetProject, String datasetName, String ta
772777
return String.join(":", BigQueryConstants.BQ_FQN_PREFIX,
773778
datasetProject, datasetName, tableName);
774779
}
780+
781+
/**
782+
* Get the bucket prefix from the runtime arguments. If not set, it will be created and set.
783+
*
784+
* @param arguments settable arguments instance to verify
785+
* @return the bucket prefix to use for this pipeline
786+
*/
787+
@Nullable
788+
public static String getBucketPrefix(SettableArguments arguments) {
789+
// If the bucket prefix property is set, use it.
790+
if (arguments.has(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) {
791+
String bucketPrefix = arguments.get(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME);
792+
validateBucketPrefix(bucketPrefix);
793+
LOG.debug("Using bucket prefix for temporary buckets: {}", bucketPrefix);
794+
return bucketPrefix;
795+
}
796+
return null;
797+
}
798+
799+
/**
800+
* Ensures configured bucket prefix is valid per the GCS naming convention.
801+
*
802+
* @param bucketPrefix
803+
*/
804+
private static void validateBucketPrefix(String bucketPrefix) {
805+
if (!bucketPrefix.matches("^[a-z0-9-_.]+$")) {
806+
throw new IllegalArgumentException("The configured bucket prefix '" + bucketPrefix + "' is not a valid bucket " +
807+
"name. Bucket names can only contain lowercase letters, numeric " +
808+
"characters, dashes (-), underscores (_), and dots (.).");
809+
}
810+
811+
if (!bucketPrefix.contains(".") && bucketPrefix.length() > 50) {
812+
throw new IllegalArgumentException("The configured bucket prefix '" + bucketPrefix + "' should be 50 " +
813+
"characters or shorter.");
814+
}
815+
}
816+
817+
/**
818+
* Method to generate the CRC32 checksum for a location.
819+
* We use this to ensure location name length is constant (only 8 characters).
820+
*
821+
* @param location location to checksum
822+
* @return checksum value as an 8 character string (hex).
823+
*/
824+
@VisibleForTesting
825+
public static String crc32location(String location) {
826+
byte[] bytes = location.toLowerCase().getBytes();
827+
Checksum checksum = new CRC32();
828+
checksum.update(bytes, 0, bytes.length);
829+
return Long.toHexString(checksum.getValue());
830+
}
831+
832+
/**
833+
* Build bucket name concatenating the bucket prefix with the location crc32 hash using a hyphen (-)
834+
*
835+
* @param bucketPrefix Bucket prefix
836+
* @param location location to use.
837+
* @return String containing the bucket location.
838+
*/
839+
public static String getBucketNameForLocation(String bucketPrefix, String location) {
840+
return String.format("%s-%s", bucketPrefix, crc32location(location));
841+
}
775842
}

0 commit comments

Comments
 (0)