Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.apache.flink.annotation.PublicEvolving;

/**
* Strategy for handling unknown commit failures in sink operators.
*
* <p>This strategy applies to {@link
* org.apache.flink.api.connector.sink2.Committer.CommitRequest#signalFailedWithUnknownReason}. It
* does not affect known failures (which are always discarded) or committables that exhaust all
* retries.
*
* @see SinkOptions#COMMITTER_FAILURE_STRATEGY
*/
@PublicEvolving
public enum CommitFailureStrategy {

/** Fail the job on unknown commit errors (default, current behavior). */
FAIL,

/** Log a warning and skip the committable on unknown commit errors. */
WARN
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,18 @@ public class SinkOptions {
.withDescription(
"The number of retries a Flink application attempts for committable operations (such as transactions) on retriable errors, as specified by the sink connector, before Flink fails and potentially restarts.");

/** Strategy for handling commit failures on unknown errors. */
@Documentation.Section(Documentation.Sections.COMMON_MISCELLANEOUS)
public static final ConfigOption<CommitFailureStrategy> COMMITTER_FAILURE_STRATEGY =
key("sink.committer.failure-strategy")
.enumType(CommitFailureStrategy.class)
.defaultValue(CommitFailureStrategy.FAIL)
.withDescription(
"Strategy for handling commit failures on unknown errors. "
+ "FAIL (default) fails the job. "
+ "WARN logs the error and skips the committable, allowing recovery to proceed. "
+ "Note: this only applies to unknown failures signaled by the connector; "
+ "committables that exhaust all retries will still fail the job regardless of this setting.");

private SinkOptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.configuration.CommitFailureStrategy;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
Expand Down Expand Up @@ -78,6 +79,7 @@ class CommitterOperator<CommT> extends AbstractStreamOperator<CommittableMessage
private CommittableCollector<CommT> committableCollector;
private long lastCompletedCheckpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
private int maxRetries;
private CommitFailureStrategy failureStrategy;

/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC =
Expand Down Expand Up @@ -114,6 +116,7 @@ protected void setup(
metricGroup = InternalSinkCommitterMetricGroup.wrap(getMetricGroup());
committableCollector = CommittableCollector.of(metricGroup);
maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
failureStrategy = config.getConfiguration().get(SinkOptions.COMMITTER_FAILURE_STRATEGY);
}

@Override
Expand Down Expand Up @@ -175,7 +178,7 @@ private void commitAndEmitCheckpoints(long checkpointId)

private void commitAndEmit(CheckpointCommittableManager<CommT> committableManager)
throws IOException, InterruptedException {
committableManager.commit(committer, maxRetries);
committableManager.commit(committer, maxRetries, failureStrategy);
if (emitDownstream) {
emit(committableManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.configuration.CommitFailureStrategy;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
Expand Down Expand Up @@ -121,6 +122,7 @@ public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamO
private SimpleVersionedSerializer<CommT> committableSerializer;
private SinkCommitterMetricGroup metricGroup;
private int maxRetries;
private CommitFailureStrategy failureStrategy;

@Nullable private SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer;
private List<GlobalCommT> sinkV1State = new ArrayList<>();
Expand All @@ -144,6 +146,7 @@ protected void setup(
committableCollector = CommittableCollector.of(metricGroup);
committableSerializer = committableSerializerFactory.get();
maxRetries = config.getConfiguration().get(SinkOptions.COMMITTER_RETRIES);
failureStrategy = config.getConfiguration().get(SinkOptions.COMMITTER_FAILURE_STRATEGY);
}

@Override
Expand Down Expand Up @@ -211,7 +214,7 @@ private void commit(long checkpointIdOrEOI) throws IOException, InterruptedExcep
if (!checkpointManager.hasGloballyReceivedAll()) {
return;
}
checkpointManager.commit(committer, maxRetries);
checkpointManager.commit(committer, maxRetries, failureStrategy);
committableCollector.remove(checkpointManager);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.configuration.CommitFailureStrategy;

import java.io.IOException;
import java.util.Collection;
Expand Down Expand Up @@ -56,14 +57,28 @@ public interface CheckpointCommittableManager<CommT> {
*/
boolean hasGloballyReceivedAll();

/**
* Commits all due committables if all respective committables of the specific subtask and
* checkpoint have been received. Uses {@link CommitFailureStrategy#FAIL} as the default
* strategy.
*
* @param committer used to commit to the external system
* @param maxRetries
*/
default void commit(Committer<CommT> committer, int maxRetries)
throws IOException, InterruptedException {
commit(committer, maxRetries, CommitFailureStrategy.FAIL);
}

/**
* Commits all due committables if all respective committables of the specific subtask and
* checkpoint have been received.
*
* @param committer used to commit to the external system
* @param maxRetries
* @param failureStrategy strategy for handling unknown commit failures
*/
void commit(Committer<CommT> committer, int maxRetries)
void commit(Committer<CommT> committer, int maxRetries, CommitFailureStrategy failureStrategy)
throws IOException, InterruptedException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.configuration.CommitFailureStrategy;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
Expand Down Expand Up @@ -142,10 +143,12 @@ public boolean hasGloballyReceivedAll() {
}

@Override
public void commit(Committer<CommT> committer, int maxRetries)
public void commit(
Committer<CommT> committer, int maxRetries, CommitFailureStrategy failureStrategy)
throws IOException, InterruptedException {
Collection<CommitRequestImpl<CommT>> requests =
getPendingRequests().collect(Collectors.toList());
requests.forEach(r -> r.setFailureStrategy(failureStrategy));
for (int retry = 0; !requests.isEmpty() && retry <= maxRetries; retry++) {
requests.forEach(CommitRequestImpl::setSelected);
committer.commit(Collections.unmodifiableCollection(requests));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.configuration.CommitFailureStrategy;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/**
Expand All @@ -32,10 +36,13 @@
@Internal
public class CommitRequestImpl<CommT> implements Committer.CommitRequest<CommT> {

private static final Logger LOG = LoggerFactory.getLogger(CommitRequestImpl.class);

private CommT committable;
private int numRetries;
private CommitRequestState state;
private SinkCommitterMetricGroup metricGroup;
private CommitFailureStrategy failureStrategy = CommitFailureStrategy.FAIL;

protected CommitRequestImpl(CommT committable, SinkCommitterMetricGroup metricGroup) {
this.committable = committable;
Expand All @@ -54,6 +61,10 @@ protected CommitRequestImpl(
this.metricGroup = metricGroup;
}

void setFailureStrategy(CommitFailureStrategy failureStrategy) {
this.failureStrategy = failureStrategy;
}

boolean isFinished() {
return state.isFinalState();
}
Expand All @@ -76,15 +87,23 @@ public int getNumberOfRetries() {
public void signalFailedWithKnownReason(Throwable t) {
state = CommitRequestState.FAILED;
metricGroup.getNumCommittablesFailureCounter().inc();
// let the user configure a strategy for failing and apply it here
// Known failures are always discarded; see CommitFailureStrategy for unknown failures.
LOG.warn("Commit failed for committable [{}] with known reason. Discarding.", committable, t);
}

@Override
public void signalFailedWithUnknownReason(Throwable t) {
state = CommitRequestState.FAILED;
metricGroup.getNumCommittablesFailureCounter().inc();
// let the user configure a strategy for failing and apply it here
throw new IllegalStateException("Failed to commit " + committable, t);
if (failureStrategy == CommitFailureStrategy.WARN) {
LOG.warn(
"Commit failed for committable [{}] with unknown reason. "
+ "Skipping due to failure strategy WARN.",
committable,
t);
} else {
throw new IllegalStateException("Failed to commit " + committable, t);
}
}

@Override
Expand Down Expand Up @@ -118,7 +137,10 @@ void setCommittedIfNoError() {
}

CommitRequestImpl<CommT> copy() {
return new CommitRequestImpl<>(committable, numRetries, state, metricGroup);
CommitRequestImpl<CommT> copied =
new CommitRequestImpl<>(committable, numRetries, state, metricGroup);
copied.failureStrategy = this.failureStrategy;
return copied;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.runtime.operators.sink.committables;

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.configuration.CommitFailureStrategy;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
Expand Down Expand Up @@ -122,6 +123,57 @@ public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) {
.returns(checkpointId, CheckpointCommittableManagerImpl::getCheckpointId);
}

@Test
void testCommitWithWarnStrategySkipsUnknownFailures() throws IOException, InterruptedException {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables =
new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L, METRIC_GROUP);
checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0));
checkpointCommittables.addCommittable(new CommittableWithLineage<>(42, 1L, 1));

final Committer<Integer> failingCommitter = new FailWithUnknownReasonCommitter();
assertThatCode(
() ->
checkpointCommittables.commit(
failingCommitter,
MAX_RETRIES,
CommitFailureStrategy.WARN))
.doesNotThrowAnyException();
assertThat(checkpointCommittables.getSuccessfulCommittables()).isEmpty();
assertThat(checkpointCommittables.isFinished()).isTrue();
}

@Test
void testCommitWithFailStrategyThrowsOnUnknownFailures() {
final CheckpointCommittableManagerImpl<Integer> checkpointCommittables =
new CheckpointCommittableManagerImpl<>(new HashMap<>(), 1, 1L, METRIC_GROUP);
checkpointCommittables.addSummary(new CommittableSummary<>(1, 1, 1L, 1, 0));
checkpointCommittables.addCommittable(new CommittableWithLineage<>(42, 1L, 1));

final Committer<Integer> failingCommitter = new FailWithUnknownReasonCommitter();
assertThatThrownBy(
() ->
checkpointCommittables.commit(
failingCommitter,
MAX_RETRIES,
CommitFailureStrategy.FAIL))
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("Failed to commit");
}

private static class FailWithUnknownReasonCommitter implements Committer<Integer> {

@Override
public void commit(Collection<CommitRequest<Integer>> committables) {
committables.forEach(
c ->
c.signalFailedWithUnknownReason(
new RuntimeException("Unknown failure")));
}

@Override
public void close() throws Exception {}
}

private static class NoOpCommitter implements Committer<Integer> {

@Override
Expand Down
Loading