-
Notifications
You must be signed in to change notification settings - Fork 86
Enhance error handling in DelegatingMultiSinkOutputCommitter #1606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,6 +23,8 @@ | |||||||||||||||||||||||||||||
| import org.apache.hadoop.mapreduce.JobStatus; | ||||||||||||||||||||||||||||||
| import org.apache.hadoop.mapreduce.OutputCommitter; | ||||||||||||||||||||||||||||||
| import org.apache.hadoop.mapreduce.TaskAttemptContext; | ||||||||||||||||||||||||||||||
| import org.slf4j.Logger; | ||||||||||||||||||||||||||||||
| import org.slf4j.LoggerFactory; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import java.io.IOException; | ||||||||||||||||||||||||||||||
| import java.util.HashMap; | ||||||||||||||||||||||||||||||
|
|
@@ -35,6 +37,7 @@ | |||||||||||||||||||||||||||||
| * Delegated instances are supplied along with a schema, which is used to configure the commit operation. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
| public class DelegatingMultiSinkOutputCommitter extends OutputCommitter { | ||||||||||||||||||||||||||||||
| private static final Logger LOG = LoggerFactory.getLogger(DelegatingMultiSinkOutputCommitter.class); | ||||||||||||||||||||||||||||||
| private final Map<String, OutputCommitter> committerMap; | ||||||||||||||||||||||||||||||
| private final Map<String, Schema> schemaMap; | ||||||||||||||||||||||||||||||
| private final String projectName; | ||||||||||||||||||||||||||||||
|
|
@@ -99,18 +102,28 @@ public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOE | |||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||
| public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { | ||||||||||||||||||||||||||||||
| for (String tableName : committerMap.keySet()) { | ||||||||||||||||||||||||||||||
| configureContext(taskAttemptContext, tableName); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| committerMap.get(tableName).commitTask(taskAttemptContext); | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| configureContext(taskAttemptContext, tableName); | ||||||||||||||||||||||||||||||
| committerMap.get(tableName).commitTask(taskAttemptContext); | ||||||||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||||||||
| LOG.warn("BigQuery multi-sink table '{}' failed during task commit. Reason: {}", | ||||||||||||||||||||||||||||||
| tableName, getFailureReason(e), e); | ||||||||||||||||||||||||||||||
| throw e; | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| @Override | ||||||||||||||||||||||||||||||
| public void commitJob(JobContext jobContext) throws IOException { | ||||||||||||||||||||||||||||||
| for (String tableName : committerMap.keySet()) { | ||||||||||||||||||||||||||||||
| configureContext(jobContext, tableName); | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| committerMap.get(tableName).commitJob(jobContext); | ||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||
| configureContext(jobContext, tableName); | ||||||||||||||||||||||||||||||
| committerMap.get(tableName).commitJob(jobContext); | ||||||||||||||||||||||||||||||
| } catch (IOException e) { | ||||||||||||||||||||||||||||||
| LOG.warn("BigQuery multi-sink table '{}' failed during job commit. Reason: {}", | ||||||||||||||||||||||||||||||
| tableName, getFailureReason(e), e); | ||||||||||||||||||||||||||||||
| throw e; | ||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. logging and re-throwing an error is considered an anti-pattern. Isn't this error propagated upwards and logged somewhere in pipeline run? |
||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
118
to
127
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to IOException ioe = null;
for (String tableName : committerMap.keySet()) {
try {
configureContext(jobContext, tableName);
committerMap.get(tableName).commitJob(jobContext);
} catch (IOException e) {
LOG.error("BigQuery multi-sink table '{}' failed during job commit. Reason: {}",
tableName, getFailureReason(e), e);
if (ioe == null) {
ioe = e;
} else {
ioe.addSuppressed(e);
}
}
}
if (ioe != null) {
throw ioe;
} |
||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
|
@@ -168,4 +181,12 @@ public void configureContext(JobContext context, String tableName) throws IOExce | |||||||||||||||||||||||||||||
| gcsPath, | ||||||||||||||||||||||||||||||
| fields); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| private String getFailureReason(IOException exception) { | ||||||||||||||||||||||||||||||
| Throwable rootCause = exception; | ||||||||||||||||||||||||||||||
| while (rootCause.getCause() != null) { | ||||||||||||||||||||||||||||||
| rootCause = rootCause.getCause(); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return rootCause.getMessage() == null ? exception.getMessage() : rootCause.getMessage(); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+185
to
+191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of A more robust approach is to fall back to
Suggested change
|
||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a multi-sink scenario, failing fast on the first error prevents other sinks from attempting to commit their data, which can lead to partial and inconsistent job states.
It is better to attempt the commit for all sinks and collect any exceptions using
addSuppressed, similar to the implementation inabortTaskandabortJob. Additionally, since a commit failure is a fatal event for the task,LOG.erroris more appropriate thanLOG.warn.