Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,28 @@

package io.cdap.plugin.gcp.gcs.sink;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Output Committer which creates and delegates operations to other GCS Output Committer instances.
Expand All @@ -34,11 +46,17 @@
*/
public class DelegatingGCSOutputCommitter extends OutputCommitter {
private final Map<String, OutputCommitter> committerMap;
private TaskAttemptContext tc;

public DelegatingGCSOutputCommitter() {
committerMap = new HashMap<>();
}

// Set Task Context
public void setTaskContext(TaskAttemptContext taskContext) {
tc = taskContext;
}

/**
* Add a new GCSOutputCommitter based on a supplied Output Format and Table Name.
*
Expand All @@ -59,11 +77,29 @@ public void addGCSOutputCommitterFromOutputFormat(OutputFormat outputFormat,
gcsOutputCommitter.setupJob(context);
gcsOutputCommitter.setupTask(context);
committerMap.put(tableName, gcsOutputCommitter);
writePartitionFile(context.getConfiguration().get(FileOutputFormat.OUTDIR), context);
}

@Override
public void setupJob(JobContext jobContext) throws IOException {
//no-op
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
Path tempPath = new Path(outputPath, FileOutputCommitter.PENDING_DIR_NAME);
fs.mkdirs(tempPath);
}

private void writePartitionFile(String path, TaskAttemptContext context) throws IOException {
Path outputPath = new Path(context.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
Path tempPath = new Path(outputPath, FileOutputCommitter.PENDING_DIR_NAME);
FileSystem fs = tempPath.getFileSystem(context.getConfiguration());
String taskId = context.getTaskAttemptID().getTaskID().toString();
Path taskPartitionFile = new Path(tempPath, taskId + "_partitions.txt");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a task retry we should overwrite it

if (!fs.exists(taskPartitionFile)) {
fs.createNewFile(taskPartitionFile);
}
DataOutputStream out = fs.append(taskPartitionFile);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use try with resources to ensure stream is always closed

out.writeBytes(path + "\n");
out.close();
}

@Override
Expand Down Expand Up @@ -95,9 +131,37 @@ public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException

@Override
public void commitJob(JobContext jobContext) throws IOException {
for (OutputCommitter committer : committerMap.values()) {
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
Path tempPath = new Path(outputPath, FileOutputCommitter.PENDING_DIR_NAME);
Set<String> outputPaths = new HashSet<>();

for (FileStatus status : fs.listStatus(tempPath)) {
if (status.getPath().getName().endsWith("_partitions.txt")) {
FSDataInputStream dis = fs.open(status.getPath());
DataInputStream in = new DataInputStream(new BufferedInputStream(dis));
BufferedReader br = new BufferedReader(new java.io.InputStreamReader(in));
String line;
while ((line = br.readLine()) != null) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should never be more than 1 line

Copy link
Collaborator

@vikasrathee-cs vikasrathee-cs Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can have multiple lines if that split has records for multiple table names. E.g. We have country as tableName, and split has records for both US and India, it will write path upto country/suffix into this file.

Copy link
Collaborator Author

@psainics psainics Apr 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one task can be responsible for processing multiple partitions, hence there can be multiple lines.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got you. Well, in this case we need to ensure file is removed on the beginning of attempt (in case of any attempt retries)

outputPaths.add(line);
}
in.close();
}
}
for (String output : outputPaths) {
tc.getConfiguration().set(FileOutputFormat.OUTDIR, output);
FileOutputCommitter committer = new FileOutputCommitter(new Path(output), tc);
committer.commitJob(jobContext);
}
cleanupJob(jobContext);
}


@Override
public void cleanupJob(JobContext jobContext) throws IOException {
Path outputPath = new Path(jobContext.getConfiguration().get(DelegatingGCSOutputFormat.OUTPUT_PATH_BASE_DIR));
FileSystem fs = outputPath.getFileSystem(jobContext.getConfiguration());
fs.delete(new Path(outputPath, FileOutputCommitter.PENDING_DIR_NAME), true);
}

@Override
Expand All @@ -124,6 +188,7 @@ public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException
@Override
public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
IOException ioe = null;
cleanupJob(jobContext);

for (OutputCommitter committer : committerMap.values()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public void checkOutputSpecs(JobContext context) throws IOException, Interrupted

@Override
public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
outputCommitter.setTaskContext(context);
return outputCommitter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package io.cdap.plugin.gcp.gcs.sink;

import io.cdap.cdap.api.data.format.StructuredRecord;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand Down Expand Up @@ -69,7 +72,6 @@ public void write(NullWritable key, StructuredRecord record) throws IOException,
delegate = format.getRecordWriter(context);
delegateMap.put(tableName, delegate);
}

delegate.write(key, record);
}

Expand All @@ -84,7 +86,7 @@ public void close(TaskAttemptContext context) throws IOException, InterruptedExc
// We need to do this at this stage because the OutputCommitter needs to be aware of the different partitions
// that have been stored so far.
delegatingGCSOutputCommitter.commitTask(context);
delegatingGCSOutputCommitter.commitJob(context);
// delegatingGCSOutputCommitter.commitJob(context);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.StorageClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
Expand Down