Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ export interface CompletedSubTaskCheckpointStatistics {
start_delay: number;
unaligned_checkpoint: boolean;
aborted: boolean;
ip: string | null;
}

export interface PendingSubTaskCheckpointStatistics {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
<thead>
<tr>
<th><strong>ID</strong></th>
<th [nzSortFn]="sortIPAddressFn"><strong>IP Address</strong></th>
<th [nzSortFn]="sortAckTimestampFn"><strong>Acknowledged</strong></th>
<th [nzSortFn]="sortEndToEndDurationFn"><strong>End to End Duration</strong></th>
<th [nzSortFn]="sortCheckpointedSizeFn">
Expand All @@ -166,6 +167,7 @@
<tbody>
<tr *ngFor="let subTask of table.data">
<td>{{ subTask['index'] }}</td>
<td>{{ subTask['ip'] || '-' }}</td>
<ng-container *ngIf="subTask['status'] === 'completed'">
<td>{{ subTask['ack_timestamp'] | date: 'yyyy-MM-dd HH:mm:ss.SSS' }}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ import { NzTableModule, NzTableSortFn } from 'ng-zorro-antd/table';
import { JobLocalService } from '../../job-local.service';

function createSortFn(
selector: (item: CompletedSubTaskCheckpointStatistics) => number | boolean
selector: (item: CompletedSubTaskCheckpointStatistics) => number | boolean | string | null
): NzTableSortFn<SubTaskCheckpointStatisticsItem> {
// FIXME This type-asserts that pre / next are a specific subtype.
return (pre, next) =>
selector(pre as CompletedSubTaskCheckpointStatistics) > selector(next as CompletedSubTaskCheckpointStatistics)
? 1
: -1;
return (pre, next) => {
const a = selector(pre as CompletedSubTaskCheckpointStatistics) ?? '';
const b = selector(next as CompletedSubTaskCheckpointStatistics) ?? '';
return a > b ? 1 : -1;
};
}

@Component({
Expand All @@ -77,6 +77,17 @@ export class JobCheckpointsSubtaskComponent implements OnInit, OnChanges, OnDest
public mapOfSubtask: Map<number, JobVertexSubTaskData> = new Map();

public readonly sortAckTimestampFn = createSortFn(item => item.ack_timestamp);
public readonly sortIPAddressFn: NzTableSortFn<SubTaskCheckpointStatisticsItem> = (a, b) => {
const ipA = (a as CompletedSubTaskCheckpointStatistics).ip ?? '';
const ipB = (b as CompletedSubTaskCheckpointStatistics).ip ?? '';
const normalize = (ip: string): string =>
ip
.split('.')
.map(seg => seg.padStart(3, '0'))
.join('.');

return normalize(ipA) > normalize(ipB) ? 1 : normalize(ipA) < normalize(ipB) ? -1 : 0;
};
public readonly sortEndToEndDurationFn = createSortFn(item => item.end_to_end_duration);
public readonly sortCheckpointedSizeFn = createSortFn(item => item.checkpointed_size);
public readonly sortStateSizeFn = createSortFn(item => item.state_size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,8 @@ public void reportIncompleteStats(
metrics.getAlignmentDurationNanos() / 1_000_000,
metrics.getCheckpointStartDelayNanos() / 1_000_000,
metrics.getUnalignedCheckpoint(),
false));
false,
null));
dirty = true;
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -427,6 +428,14 @@ public TaskAcknowledgeResult acknowledgeTask(
long checkpointStartDelayMillis =
metrics.getCheckpointStartDelayNanos() / 1_000_000;

String taskManagerIp = null;
if (vertex.getCurrentExecutionAttempt() != null) {
TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation();
if (location != null) {
taskManagerIp = location.address().getHostAddress();
}
}

SubtaskStateStats subtaskStateStats =
new SubtaskStateStats(
vertex.getParallelSubtaskIndex(),
Expand All @@ -440,7 +449,8 @@ public TaskAcknowledgeResult acknowledgeTask(
alignmentDurationMillis,
checkpointStartDelayMillis,
metrics.getUnalignedCheckpoint(),
true);
true,
taskManagerIp);

LOG.trace(
"Checkpoint {} stats for {}: size={}Kb, duration={}ms, sync part={}ms, async part={}ms",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public class SubtaskStateStats implements Serializable {
/** Is the checkpoint completed by this subtask. */
private final boolean completed;

private final String ip;

SubtaskStateStats(int subtaskIndex, long ackTimestamp) {
this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true);
this(subtaskIndex, ackTimestamp, 0, 0, 0, 0, 0, 0, 0, 0, false, true, null);
}

SubtaskStateStats(
Expand All @@ -82,8 +84,8 @@ public class SubtaskStateStats implements Serializable {
long alignmentDuration,
long checkpointStartDelay,
boolean unalignedCheckpoint,
boolean completed) {

boolean completed,
String ip) {
checkArgument(subtaskIndex >= 0, "Negative subtask index");
this.subtaskIndex = subtaskIndex;
checkArgument(checkpointedSize >= 0, "Negative incremental state size");
Expand All @@ -99,6 +101,7 @@ public class SubtaskStateStats implements Serializable {
this.checkpointStartDelay = checkpointStartDelay;
this.unalignedCheckpoint = unalignedCheckpoint;
this.completed = completed;
this.ip = ip;
}

public int getSubtaskIndex() {
Expand Down Expand Up @@ -194,4 +197,8 @@ public boolean getUnalignedCheckpoint() {
public boolean isCompleted() {
return completed;
}

public String getIp() {
return ip;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ private static List<SubtaskCheckpointStatistics> createSubtaskCheckpointStatisti
subtask.getAlignmentDuration()),
subtask.getCheckpointStartDelay(),
subtask.getUnalignedCheckpoint(),
!subtask.isCompleted()));
!subtask.isCompleted(),
subtask.getIp()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ public static final class CompletedSubtaskCheckpointStatistics

public static final String FIELD_NAME_CHECKPOINTED_SIZE = "checkpointed_size";

public static final String FIELD_NAME_IP_ADDRESS = "ip";

/**
* The accurate name of this field should be 'checkpointed_data_size', keep it as before to
* not break backwards compatibility for old web UI.
Expand Down Expand Up @@ -156,6 +158,9 @@ public static final class CompletedSubtaskCheckpointStatistics
@JsonProperty(FIELD_NAME_ABORTED)
private final boolean aborted;

@JsonProperty(value = FIELD_NAME_IP_ADDRESS, required = false)
private final String ip;

@JsonCreator
public CompletedSubtaskCheckpointStatistics(
@JsonProperty(FIELD_NAME_INDEX) int index,
Expand All @@ -167,7 +172,8 @@ public CompletedSubtaskCheckpointStatistics(
@JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment,
@JsonProperty(FIELD_NAME_START_DELAY) long startDelay,
@JsonProperty(FIELD_NAME_UNALIGNED_CHECKPOINT) boolean unalignedCheckpoint,
@JsonProperty(FIELD_NAME_ABORTED) boolean aborted) {
@JsonProperty(FIELD_NAME_ABORTED) boolean aborted,
@JsonProperty(FIELD_NAME_IP_ADDRESS) String ip) {
super(index, "completed");
this.ackTimestamp = ackTimestamp;
this.duration = duration;
Expand All @@ -178,6 +184,7 @@ public CompletedSubtaskCheckpointStatistics(
this.startDelay = startDelay;
this.unalignedCheckpoint = unalignedCheckpoint;
this.aborted = aborted;
this.ip = ip;
}

public long getAckTimestamp() {
Expand Down Expand Up @@ -216,6 +223,10 @@ public boolean isAborted() {
return aborted;
}

public String getIp() {
return ip;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -233,7 +244,8 @@ public boolean equals(Object o) {
&& Objects.equals(alignment, that.alignment)
&& startDelay == that.startDelay
&& unalignedCheckpoint == that.unalignedCheckpoint
&& aborted == that.aborted;
&& aborted == that.aborted
&& Objects.equals(ip, that.ip);
}

@Override
Expand All @@ -247,7 +259,8 @@ public int hashCode() {
alignment,
startDelay,
unalignedCheckpoint,
aborted);
aborted,
ip);
}

/** Duration of the checkpoint. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ void testIsJavaSerializable() throws Exception {
44L,
true,
new SubtaskStateStats(
123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true),
123, 213123, 123123, 123123, 0, 0, 0, 0, 0, 0, false, true, null),
null);

CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,13 +673,14 @@ public void addEvent(EventBuilder eventBuilder) {
subtasksByVertex);

pending.reportSubtaskStats(
jobVertexID0, new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true));
jobVertexID0,
new SubtaskStateStats(0, 1, 2, 3, 24, 5, 6, 7, 28, 9, false, true, null));
pending.reportSubtaskStats(
jobVertexID0,
new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true));
new SubtaskStateStats(1, 12, 13, 14, 15, 16, 17, 18, 19, 20, false, true, null));
pending.reportSubtaskStats(
jobVertexID1,
new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true));
new SubtaskStateStats(0, 21, 22, 23, 4, 25, 26, 27, 8, 29, true, true, null));
// Complete checkpoint => new snapshot
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null, 1984));
return reportedSpansOut;
Expand Down Expand Up @@ -984,7 +985,8 @@ public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
ignored,
ignored,
false,
true);
true,
null);

assertThat(pending.reportSubtaskStats(jobVertexID, subtaskStats)).isTrue();

Expand Down Expand Up @@ -1071,7 +1073,7 @@ private SubtaskStateStats createSubtaskStats(int index) {
}

private SubtaskStateStats createSubtaskStats(int index, boolean unaligned) {
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true);
return new SubtaskStateStats(index, 0, 0, 0, 0, 0, 0, 0, 0, 0, unaligned, true, null);
}

private void reportRestoredCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ private SubtaskStateStats createSubtaskStats(int index, boolean unalignedCheckpo
Integer.MAX_VALUE + (long) index,
Integer.MAX_VALUE + (long) index,
unalignedCheckpoint,
true);
true,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.StringSerializer;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint.TaskAcknowledgeResult;
import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.operators.coordination.TestingOperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.concurrent.Executors;

Expand All @@ -49,11 +52,13 @@

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -509,6 +514,85 @@ void testReportTaskFinishedOperators() throws IOException {
.contains(ACK_TASKS.get(0).getVertex());
}

@Test
void testAcknowledgeTaskCapturesTaskManagerIp() throws Exception {
final String expectedIp = "10.0.0.1";
final ExecutionAttemptID ipTestAttemptId = createExecutionAttemptId();
final JobVertexID jobVertexId = new JobVertexID();

ExecutionJobVertex ejv = mock(ExecutionJobVertex.class);
when(ejv.getOperatorIDs())
.thenReturn(Collections.singletonList(OperatorIDPair.generatedIDOnly(OPERATOR_ID)));

TaskManagerLocation location =
new TaskManagerLocation(
ResourceID.generate(), InetAddress.getByName(expectedIp), 6121);

Execution currentAttempt = mock(Execution.class);
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getMaxParallelism()).thenReturn(MAX_PARALLELISM);
when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(PARALLELISM);
when(vertex.getJobVertex()).thenReturn(ejv);
when(vertex.getJobvertexId()).thenReturn(jobVertexId);
when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("test-task (0/1)");
when(vertex.getCurrentExecutionAttempt()).thenReturn(currentAttempt);
when(vertex.getCurrentAssignedResourceLocation()).thenReturn(location);

Execution execution = mock(Execution.class);
when(execution.getAttemptId()).thenReturn(ipTestAttemptId);
when(execution.getVertex()).thenReturn(vertex);

Map<JobVertexID, Integer> taskStatsCounts = new HashMap<>();
taskStatsCounts.put(jobVertexId, PARALLELISM);
PendingCheckpointStats pendingStats =
new PendingCheckpointStats(
0,
1,
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
taskStatsCounts);

CheckpointPlan plan =
new DefaultCheckpointPlan(
Collections.emptyList(),
Collections.singletonList(execution),
Collections.singletonList(vertex),
Collections.emptyList(),
Collections.emptyList(),
true);

final Path checkpointDir = new Path(TempDirUtils.newFolder(tmpFolder).toURI());
final FsCheckpointStorageLocation storageLocation =
new FsCheckpointStorageLocation(
LocalFileSystem.getSharedInstance(),
checkpointDir,
checkpointDir,
checkpointDir,
CheckpointStorageLocationReference.getDefault(),
1024,
4096);

PendingCheckpoint checkpoint =
new PendingCheckpoint(
new JobID(),
0,
1,
plan,
Collections.emptyList(),
Collections.emptyList(),
CheckpointProperties.forCheckpoint(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
new CompletableFuture<>(),
pendingStats,
new CompletableFuture<>());
checkpoint.setCheckpointTargetLocation(storageLocation);

checkpoint.acknowledgeTask(ipTestAttemptId, null, new CheckpointMetrics());

assertThat(pendingStats.getLatestAcknowledgedSubtaskStats()).isNotNull();
assertThat(pendingStats.getLatestAcknowledgedSubtaskStats().getIp()).isEqualTo(expectedIp);
}

// ------------------------------------------------------------------------

private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props)
Expand Down
Loading