Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import com.google.auth.Credentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.common.base.Strings;
import io.cdap.cdap.api.data.batch.Output;
import io.cdap.cdap.api.data.batch.OutputFormatProvider;
Expand All @@ -32,7 +35,6 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
Expand Down Expand Up @@ -88,14 +90,23 @@ public final void prepareRun(BatchSinkContext context) throws Exception {
CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector);
collector.getOrThrowException();
baseConfiguration = getBaseConfiguration(cmekKeyName);
String bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, config.getBucket(), runUUID.toString());

// Get required dataset ID and dataset instance (if it exists)
DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset());
Dataset dataset = bigQuery.getDataset(datasetId);

// Get the required bucket name and bucket instance (if it exists)
Storage storage = GCPUtils.getStorage(project, credentials);
String bucketName = getStagingBucketName(context, config, dataset);
Bucket bucket = storage.get(bucketName);

if (!context.isPreviewEnabled()) {
BigQuerySinkUtils.createResources(bigQuery, GCPUtils.getStorage(project, credentials),
DatasetId.of(config.getDatasetProject(), config.getDataset()),
bucket, config.getLocation(), cmekKeyName);
BigQuerySinkUtils.createResources(bigQuery, dataset, datasetId,
storage, bucket, bucketName,
config.getLocation(), cmekKeyName);
}

prepareRunInternal(context, bigQuery, bucket);
prepareRunInternal(context, bigQuery, bucketName);
}

@Override
Expand Down Expand Up @@ -291,4 +302,34 @@ protected Configuration getOutputConfiguration() throws IOException {
return configuration;
}

/**
* Identify a stating bucket name from the pipeline context and sink configuration
* @param context Sink Context
* @param config Sink Configuration
* @return Bucket name to use for this sink.
*/
protected String getStagingBucketName(BatchSinkContext context,
AbstractBigQuerySinkConfig config,
@Nullable Dataset dataset) {
// Get temporary bucket name from sink configuration
String bucketName = config.getBucket();

// Get Bucket Prefix from configuration
String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments());

// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
// we must set this prefix along with the destination location in order to create/reuse the bucket.
// Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare
// for a new bucket creation.
if (bucketName == null && bucketPrefix != null) {
// If the destination dataset exists, use the dataset location. Otherwise, use location from configuration.
String datasetLocation = dataset != null ? dataset.getLocation() : config.getLocation();

// Get the bucket name for the specified location.
bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation);
}

return BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -111,25 +111,50 @@ public final class BigQuerySinkUtils {
public static void createResources(BigQuery bigQuery, Storage storage,
DatasetId datasetId, String bucketName, @Nullable String location,
@Nullable CryptoKeyName cmekKeyName) throws IOException {
// Get the required Dataset and bucket instances using the supplied clients
Dataset dataset = bigQuery.getDataset(datasetId);
Bucket bucket = storage.get(bucketName);

createResources(bigQuery, dataset, datasetId, storage, bucket, bucketName, location, cmekKeyName);
}

/**
* Creates the given dataset and bucket if the supplied ones are null.
*
* If the dataset already exists but the
* bucket does not, the bucket will be created in the same location as the dataset. If the bucket already exists
* but the dataset does not, the dataset will attempt to be created in the same location. This may fail if the bucket
* is in a location that BigQuery does not yet support.
*
* @param bigQuery the bigquery client for the project
* @param dataset the bigquery dataset instance (may be null)
* @param datasetId the Id of the dataset
* @param storage the storage client for the project
* @param bucket the storage bucket instance (may be null)
* @param bucketName the name of the bucket
* @param location the location of the resources, this is only applied if both the bucket and dataset do not exist
* @param cmekKey the name of the cmek key
* @throws IOException
*/
public static void createResources(BigQuery bigQuery, @Nullable Dataset dataset, DatasetId datasetId,
Storage storage, @Nullable Bucket bucket, String bucketName,
@Nullable String location, @Nullable CryptoKeyName cmekKey) throws IOException {
if (dataset == null && bucket == null) {
createBucket(storage, bucketName, location, cmekKeyName,
createBucket(storage, bucketName, location, cmekKey,
() -> String.format("Unable to create Cloud Storage bucket '%s'", bucketName));
createDataset(bigQuery, datasetId, location, cmekKeyName,
createDataset(bigQuery, datasetId, location, cmekKey,
() -> String.format("Unable to create BigQuery dataset '%s.%s'", datasetId.getProject(),
datasetId.getDataset()));
} else if (bucket == null) {
createBucket(
storage, bucketName, dataset.getLocation(), cmekKeyName,
storage, bucketName, dataset.getLocation(), cmekKey,
() -> String.format(
"Unable to create Cloud Storage bucket '%s' in the same location ('%s') as BigQuery dataset '%s'. "
+ "Please use a bucket that is in the same location as the dataset.",
bucketName, dataset.getLocation(), datasetId.getProject() + "." + datasetId.getDataset()));
} else if (dataset == null) {
createDataset(
bigQuery, datasetId, bucket.getLocation(), cmekKeyName,
bigQuery, datasetId, bucket.getLocation(), cmekKey,
() -> String.format(
"Unable to create BigQuery dataset '%s' in the same location ('%s') as Cloud Storage bucket '%s'. "
+ "Please use a bucket that is in a supported location.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
Expand All @@ -52,11 +51,9 @@
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.common.ReferenceNames;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
Expand Down Expand Up @@ -150,10 +147,12 @@ public void prepareRun(BatchSourceContext context) throws Exception {
configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName,
config.getServiceAccountType());

String bucketName = getBucketName(context, dataset);

// Configure GCS Bucket to use
String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration,
storage,
config.getBucket(),
bucketName,
dataset,
bucketPath,
cmekKeyName);
Expand Down Expand Up @@ -393,4 +392,20 @@ private void emitLineage(BatchSourceContext context, Schema schema, Type sourceT
schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList()));
}
}

@Nullable
private String getBucketName(BatchSourceContext context, Dataset dataset) {
// Get the bucket name from configuration, and the bucket prefix if defined.
String bucketName = config.getBucket();
String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments());

// If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified,
// we must set this prefix along with the source dataset location.
// Otherwise, return the original bucket name (may be null).
if (bucketName == null && bucketPrefix != null) {
bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, dataset.getLocation());
}

return bucketName;
}
}
67 changes: 67 additions & 0 deletions src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.etl.api.FailureCollector;
import io.cdap.cdap.etl.api.action.SettableArguments;
import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException;
import io.cdap.cdap.etl.api.validation.InvalidStageException;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
Expand Down Expand Up @@ -63,6 +65,8 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import java.util.zip.CRC32;
import java.util.zip.Checksum;
import javax.annotation.Nullable;

/**
Expand All @@ -73,6 +77,7 @@ public final class BigQueryUtil {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class);

private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix";

public static final String BUCKET_PATTERN = "[a-z0-9._-]+";
public static final String DATASET_PATTERN = "[A-Za-z0-9_]+";
Expand Down Expand Up @@ -772,4 +777,66 @@ public static String getFQN(String datasetProject, String datasetName, String ta
return String.join(":", BigQueryConstants.BQ_FQN_PREFIX,
datasetProject, datasetName, tableName);
}

/**
* Get the bucket prefix from the runtime arguments. If not set, it will be created and set.
*
* @param arguments settable arguments instance to verify
* @return the bucket prefix to use for this pipeline
*/
@Nullable
public static String getBucketPrefix(SettableArguments arguments) {
// If the bucket prefix property is set, use it.
if (arguments.has(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) {
String bucketPrefix = arguments.get(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME);
validateBucketPrefix(bucketPrefix);
LOG.debug("Using bucket prefix for temporary buckets: {}", bucketPrefix);
return bucketPrefix;
}
return null;
}

/**
* Ensures configured bucket prefix is valid per the GCS naming convention.
*
* @param bucketPrefix
*/
private static void validateBucketPrefix(String bucketPrefix) {
if (!bucketPrefix.matches("^[a-z0-9-_.]+$")) {
throw new IllegalArgumentException("The configured bucket prefix '" + bucketPrefix + "' is not a valid bucket " +
"name. Bucket names can only contain lowercase letters, numeric " +
"characters, dashes (-), underscores (_), and dots (.).");
}

if (!bucketPrefix.contains(".") && bucketPrefix.length() > 50) {
throw new IllegalArgumentException("The configured bucket prefix '" + bucketPrefix + "' should be 50 " +
"characters or shorter.");
}
}

/**
* Method to generate the CRC32 checksum for a location.
* We use this to ensure location name length is constant (only 8 characters).
*
* @param location location to checksum
* @return checksum value as an 8 character string (hex).
*/
@VisibleForTesting
public static String crc32location(String location) {
byte[] bytes = location.toLowerCase().getBytes();
Checksum checksum = new CRC32();
checksum.update(bytes, 0, bytes.length);
return Long.toHexString(checksum.getValue());
}

/**
* Build bucket name concatenating the bucket prefix with the location crc32 hash using a hyphen (-)
*
* @param bucketPrefix Bucket prefix
* @param location location to use.
* @return String containing the bucket location.
*/
public static String getBucketNameForLocation(String bucketPrefix, String location) {
return String.format("%s-%s", bucketPrefix, crc32location(location));
}
}
Loading