Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,15 @@ public final void prepareRun(BatchSinkContext context) throws Exception {

@Override
public void onRunFinish(boolean succeeded, BatchSinkContext context) {
Path gcsPath;
String gcsPath;
String bucket = getConfig().getBucket();
if (bucket == null) {
gcsPath = new Path(String.format("gs://%s", runUUID.toString()));
gcsPath = String.format("gs://%s", runUUID.toString());
} else {
gcsPath = new Path(String.format(gcsPathFormat, bucket, runUUID.toString()));
gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString());
}
try {
FileSystem fs = gcsPath.getFileSystem(baseConfiguration);
if (fs.exists(gcsPath)) {
fs.delete(gcsPath, true);
LOG.debug("Deleted temporary directory '{}'", gcsPath);
}
BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath);
} catch (IOException e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,26 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin
* @return bucket name
*/
public static String configureBucket(Configuration baseConfiguration, @Nullable String bucket, String runId) {
boolean deleteBucket = false;
// If the bucket is null, assign the run ID as the bucket name and mark the bucket for deletion.
if (bucket == null) {
bucket = runId;
deleteBucket = true;
}
return configureBucket(baseConfiguration, bucket, runId, deleteBucket);
}

/**
* Updates {@link Configuration} with bucket details.
* Uses provided bucket, otherwise uses provided runId as a bucket name.
*
* @return bucket name
*/
public static String configureBucket(Configuration baseConfiguration,
String bucket,
String runId,
boolean deleteBucket) {
if (deleteBucket) {
// By default, this option is false, meaning the job can not delete the bucket.
// So enable it only when bucket name is not provided.
baseConfiguration.setBoolean("fs.gs.bucket.delete.enable", true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ protected static BigQueryPushDataset getInstance(SQLPushRequest pushRequest,
String gcsPath = BigQuerySQLEngineUtils.getGCSPath(bucket, runId, table);
List<BigQueryTableFieldSchema> fields =
BigQuerySinkUtils.getBigQueryTableFieldsFromSchema(pushRequest.getDatasetSchema());
BigQuerySinkUtils.configureBucket(configuration, bucket, runId);
BigQuerySinkUtils.configureOutput(configuration, project, dataset, table, gcsPath, fields);

// Create empty table to store uploaded records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,30 @@ public void prepareRun(RuntimeContext context) throws Exception {
String cmekKey = context.getRuntimeArguments().get(GCPUtils.CMEK_KEY);
configuration = BigQueryUtil.getBigQueryConfig(sqlEngineConfig.getServiceAccount(), sqlEngineConfig.getProject(),
cmekKey, sqlEngineConfig.getServiceAccountType());
// Create resources needed for this execution
BigQuerySinkUtils.createResources(bigQuery, storage, dataset, bucket, sqlEngineConfig.getLocation(), cmekKey);
// Configure GCS bucket that is used to stage temporary files.
// If the bucket is created for this run, mar it for deletion after executon is completed
BigQuerySinkUtils.configureBucket(configuration, bucket, runId, sqlEngineConfig.getBucket() == null);
}

@Override
public void onRunFinish(boolean succeeded, RuntimeContext context) {
super.onRunFinish(succeeded, context);

String gcsPath;
// If the bucket was created for this run, we should delete it.
// Otherwise, just clean the directory within the provided bucket.
if (sqlEngineConfig.getBucket() == null) {
gcsPath = String.format("gs://%s", bucket);
} else {
gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId);
}
try {
BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath);
} catch (IOException e) {
LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage());
}
}

@Override
Expand Down