Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ <h4 class="content-wrap">
<xhtml:div class="detail">{{ operator }}</xhtml:div>
<xhtml:div class="detail description" *ngIf="!pending">{{ description }}</xhtml:div>
<xhtml:div class="node-label">Parallelism: {{ parallelism }}</xhtml:div>
<xhtml:div class="node-label" *ngIf="slotSharingGroupName && !pending">
Slot Sharing Group: {{ slotSharingGroupName }}
</xhtml:div>
<xhtml:div
class="node-label metric"
title="Maximum back pressured percentage across all subtasks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export class NodeComponent {
backPressuredPercentage: number | undefined = NaN;
busyPercentage: number | undefined = NaN;
dataSkewPercentage: number | undefined = NaN;
slotSharingGroupId: string | null | undefined;
slotSharingGroupName: string | null | undefined;
pending: boolean = true;
backgroundColor: string;
borderColor: string;
Expand Down Expand Up @@ -67,6 +69,8 @@ export class NodeComponent {
this.operatorStrategy = this.decodeHTML(value.operator_strategy);
this.parallelism = value.parallelism;
this.lowWatermark = value.lowWatermark;
this.slotSharingGroupId = value.detail?.slotSharingGroupId;
this.slotSharingGroupName = value.detail?.slotSharingGroupName;
if (value?.job_vertex_id) {
this.pending = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ export interface VerticesItem {
duration: number;
tasks: TasksStatus;
metrics: MetricsStatus;
slotSharingGroupId?: string | null;
slotSharingGroupName?: string | null;
}

export interface VerticesItemRange extends VerticesItem {
Expand Down Expand Up @@ -170,4 +172,4 @@ export interface JobDetailCorrect extends JobDetail {
streamNodes: NodesItemCorrect[];
streamLinks: NodesItemLink[];
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,16 @@ private static JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(
counts.getAccumulateIdleTime(),
counts.getAccumulateBusyTime());

// Get slot sharing group name, default to "default" if not set
String slotSharingGroupName = ejv.getSlotSharingGroup().getSlotSharingGroupName();
if (slotSharingGroupName == null || slotSharingGroupName.isEmpty()) {
slotSharingGroupName = "default";
}

return new JobDetailsInfo.JobVertexDetailsInfo(
ejv.getJobVertexId(),
ejv.getSlotSharingGroup().getSlotSharingGroupId(),
slotSharingGroupName,
ejv.getName(),
ejv.getMaxParallelism(),
ejv.getParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,8 @@ public static final class JobVertexDetailsInfo {

public static final String FIELD_NAME_SLOT_SHARING_GROUP_ID = "slotSharingGroupId";

public static final String FIELD_NAME_SLOT_SHARING_GROUP_NAME = "slotSharingGroupName";

public static final String FIELD_NAME_JOB_VERTEX_NAME = "name";

public static final String FIELD_NAME_MAX_PARALLELISM = "maxParallelism";
Expand All @@ -371,6 +373,9 @@ public static final class JobVertexDetailsInfo {
@JsonSerialize(using = SlotSharingGroupIDSerializer.class)
private final SlotSharingGroupId slotSharingGroupId;

@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME)
private final String slotSharingGroupName;

@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME)
private final String name;

Expand Down Expand Up @@ -406,6 +411,7 @@ public JobVertexDetailsInfo(
@JsonDeserialize(using = SlotSharingGroupIDDeserializer.class)
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_ID)
SlotSharingGroupId slotSharingGroupId,
@JsonProperty(FIELD_NAME_SLOT_SHARING_GROUP_NAME) String slotSharingGroupName,
@JsonProperty(FIELD_NAME_JOB_VERTEX_NAME) String name,
@JsonProperty(FIELD_NAME_MAX_PARALLELISM) int maxParallelism,
@JsonProperty(FIELD_NAME_PARALLELISM) int parallelism,
Expand All @@ -418,6 +424,7 @@ public JobVertexDetailsInfo(
@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) {
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.slotSharingGroupId = Preconditions.checkNotNull(slotSharingGroupId);
this.slotSharingGroupName = slotSharingGroupName;
this.name = Preconditions.checkNotNull(name);
this.maxParallelism = maxParallelism;
this.parallelism = parallelism;
Expand All @@ -439,6 +446,11 @@ public SlotSharingGroupId getSlotSharingGroupId() {
return slotSharingGroupId;
}

@JsonIgnore
public String getSlotSharingGroupName() {
return slotSharingGroupName;
}

@JsonIgnore
public String getName() {
return name;
Expand Down Expand Up @@ -500,6 +512,7 @@ public boolean equals(Object o) {
&& duration == that.duration
&& Objects.equals(jobVertexID, that.jobVertexID)
&& Objects.equals(slotSharingGroupId, that.slotSharingGroupId)
&& Objects.equals(slotSharingGroupName, that.slotSharingGroupName)
&& Objects.equals(name, that.name)
&& executionState == that.executionState
&& Objects.equals(tasksPerState, that.tasksPerState)
Expand All @@ -511,6 +524,7 @@ public int hashCode() {
return Objects.hash(
jobVertexID,
slotSharingGroupId,
slotSharingGroupName,
name,
maxParallelism,
parallelism,
Expand All @@ -522,4 +536,4 @@ public int hashCode() {
jobVertexMetrics);
}
}
}
}