Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -5240,6 +5240,9 @@
"freeSlots" : {
"type" : "integer"
},
"assignedTasks" : {
"type" : "integer"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
Expand Down Expand Up @@ -5441,6 +5444,9 @@
"jobId" : {
"type" : "any"
},
"assignedTasks" : {
"type" : "integer"
},
"resource" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
Expand All @@ -5461,6 +5467,9 @@
"freeSlots" : {
"type" : "integer"
},
"assignedTasks" : {
"type" : "integer"
},
"hardware" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:instance:HardwareDescription",
Expand Down
9 changes: 9 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -3931,6 +3931,9 @@
"freeSlots" : {
"type" : "integer"
},
"assignedTasks" : {
"type" : "integer"
},
"totalResource" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
Expand Down Expand Up @@ -4093,6 +4096,9 @@
"freeSlots" : {
"type" : "integer"
},
"assignedTasks" : {
"type" : "integer"
},
"totalResource" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo",
Expand Down Expand Up @@ -4190,6 +4196,9 @@
"jobId" : {
"type" : "any"
},
"assignedTasks" : {
"type" : "integer"
},
"resource" : {
"type" : "object",
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:ResourceProfileInfo"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export interface TaskManagerDetail {
timeSinceLastHeartbeat: number;
slotsNumber: number;
freeSlots: number;
assignedTasks: number;
hardware: Hardware;
metrics: Metrics;
memoryConfiguration: MemoryConfiguration;
Expand All @@ -38,6 +39,7 @@ export interface TaskManagerDetail {

export interface AllocatedSlot {
jobId: string;
assignedTasks: number;
resource: Resources;
}

Expand Down Expand Up @@ -67,6 +69,7 @@ export interface TaskManagersItem {
timeSinceLastHeartbeat: number;
slotsNumber: number;
freeSlots: number;
assignedTasks: number;
hardware: Hardware;
blocked?: boolean;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<th [nzSortFn]="sortHeartBeatFn" [nzWidth]="'160px'">Last Heartbeat</th>
<th [nzSortFn]="sortSlotsNumberFn" [nzWidth]="'90px'">All Slots</th>
<th [nzSortFn]="sortFreeSlotsFn" [nzWidth]="'100px'">Free Slots</th>
<th [nzSortFn]="sortAssignedTasksFn" [nzWidth]="'100px'">Assigned Tasks</th>
<th [nzSortFn]="sortCpuCoresFn" [nzWidth]="'110px'">CPU Cores</th>
<th [nzSortFn]="sortPhysicalMemoryFn" [nzWidth]="'120px'">Physical MEM</th>
<th [nzSortFn]="sortFreeMemoryFn" [nzWidth]="'130px'">JVM Heap Size</th>
Expand All @@ -53,6 +54,7 @@
<td>{{ manager.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' }}</td>
<td>{{ manager.slotsNumber }}</td>
<td>{{ manager.freeSlots }}</td>
<td>{{ manager.assignedTasks }}</td>
<td>{{ manager.hardware.cpuCores }}</td>
<td [attr.title]="manager.hardware.physicalMemory + ' bytes'">
{{ manager.hardware.physicalMemory | humanizeBytes }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ export class TaskManagerListComponent implements OnInit, OnDestroy {
public readonly sortHeartBeatFn = createSortFn(item => item.timeSinceLastHeartbeat);
public readonly sortSlotsNumberFn = createSortFn(item => item.slotsNumber);
public readonly sortFreeSlotsFn = createSortFn(item => item.freeSlots);
public readonly sortAssignedTasksFn = createSortFn(item => item.assignedTasks);
public readonly sortCpuCoresFn = createSortFn(item => item.hardware?.cpuCores);
public readonly sortPhysicalMemoryFn = createSortFn(item => item.hardware?.physicalMemory);
public readonly sortFreeMemoryFn = createSortFn(item => item.hardware?.freeMemory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@
<tr>
<th>#</th>
<th>Job ID</th>
<th>Assigned Tasks</th>
<th>CPU (cores)</th>
<th>Task Heap memory (MB)</th>
<th>Task Off-Heap memory (MB)</th>
Expand All @@ -409,6 +410,7 @@
<strong>{{ i | number }}</strong>
</td>
<td>{{ slot.jobId }}</td>
<td>{{ slot.assignedTasks }}</td>
<td>{{ slot.resource.cpuCores | number }}</td>
<td>{{ slot.resource.taskHeapMemory | number }}</td>
<td>{{ slot.resource.taskOffHeapMemory | number }}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
<flink-blocked-badge *ngIf="taskManagerDetail?.blocked"></flink-blocked-badge>
</div>
<nz-descriptions *ngIf="taskManagerDetail" nzBordered nzSize="small">
<nz-descriptions-item [nzSpan]="2" nzTitle="Path">
<nz-descriptions-item [nzSpan]="1" nzTitle="Path">
{{ taskManagerDetail.path }}
</nz-descriptions-item>
<nz-descriptions-item [nzSpan]="1" nzTitle="Free/All Slots">
{{ taskManagerDetail.freeSlots }} / {{ taskManagerDetail.slotsNumber }}
</nz-descriptions-item>
<nz-descriptions-item [nzSpan]="1" nzTitle="Assigned Tasks">
{{ taskManagerDetail.assignedTasks }}
</nz-descriptions-item>
<nz-descriptions-item [nzSpan]="1" nzTitle="Last Heartbeat">
{{ taskManagerDetail.timeSinceLastHeartbeat | date: 'yyyy-MM-dd HH:mm:ss' }}
</nz-descriptions-item>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Dur
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(taskExecutor.getInstanceID()),
slotManager.getNumberFreeSlotsOf(taskExecutor.getInstanceID()),
slotManager.getAssignedTasksOf(taskExecutor.getInstanceID()),
slotManager.getRegisteredResourceOf(taskExecutor.getInstanceID()),
slotManager.getFreeResourceOf(taskExecutor.getInstanceID()),
taskExecutor.getHardwareDescription(),
Expand Down Expand Up @@ -717,6 +718,7 @@ public CompletableFuture<TaskManagerInfoWithSlots> requestTaskManagerDetailsInfo
taskManagerHeartbeatManager.getLastHeartbeatFrom(resourceId),
slotManager.getNumberRegisteredSlotsOf(instanceId),
slotManager.getNumberFreeSlotsOf(instanceId),
slotManager.getAssignedTasksOf(instanceId),
slotManager.getRegisteredResourceOf(instanceId),
slotManager.getFreeResourceOf(instanceId),
taskExecutor.getHardwareDescription(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
/** Provides statistics of cluster resources. */
public interface ClusterResourceStatisticsProvider {

/** Get total number of tasks assigned to the current instance. slots. */
int getAssignedTasks(InstanceID instanceId);

/** Get total number of registered slots. */
int getNumberRegisteredSlots();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -295,16 +296,15 @@ public boolean reportSlotStatus(InstanceID instanceId, SlotReport slotReport) {
private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo taskManager) {
final AllocationID allocationId = Preconditions.checkNotNull(slotStatus.getAllocationID());
final JobID jobId = Preconditions.checkNotNull(slotStatus.getJobID());
final int assignedTasks = slotStatus.getAssignedTasks();
try (MdcUtils.MdcCloseable ignored = MdcUtils.withContext(MdcUtils.asContextData(jobId))) {
final ResourceProfile resourceProfile =
Preconditions.checkNotNull(slotStatus.getResourceProfile());

if (taskManager.getAllocatedSlots().containsKey(allocationId)) {
if (taskManager.getAllocatedSlots().get(allocationId).getState()
== SlotState.PENDING) {
TaskManagerSlotInformation slot = taskManager.getAllocatedSlots().get(allocationId);
if (Objects.nonNull(slot)) {
if (slot.getState() == SlotState.PENDING) {
// Allocation Complete
final TaskManagerSlotInformation slot =
taskManager.getAllocatedSlots().get(allocationId);
pendingSlotAllocations.remove(slot.getAllocationId());
taskManagerTracker.notifySlotStatus(
slot.getAllocationId(),
Expand All @@ -313,6 +313,7 @@ private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo t
slot.getResourceProfile(),
SlotState.ALLOCATED);
}
slot.setAssignedTasks(assignedTasks);
return true;
} else {
Preconditions.checkState(
Expand All @@ -325,6 +326,7 @@ private boolean syncAllocatedSlotStatus(SlotStatus slotStatus, TaskManagerInfo t
resourceProfile,
SlotState.ALLOCATED);
resourceTracker.notifyAcquiredResource(jobId, resourceProfile);
taskManager.getAllocatedSlots().get(allocationId).setAssignedTasks(assignedTasks);
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,11 @@ private Set<PendingTaskManagerId> allocateTaskManagersAccordingTo(
// Legacy APIs
// ---------------------------------------------------------------------------------------------

@Override
public int getAssignedTasksOf(InstanceID instanceId) {
return taskManagerTracker.getAssignedTasks(instanceId);
}

@Override
public int getNumberRegisteredSlots() {
return taskManagerTracker.getNumberRegisteredSlots();
Expand Down Expand Up @@ -803,7 +808,12 @@ public Collection<SlotInfo> getAllocatedSlotsOf(InstanceID instanceID) {
.map(Map::values)
.orElse(Collections.emptyList())
.stream()
.map(slot -> new SlotInfo(slot.getJobId(), slot.getResourceProfile()))
.map(
slot ->
new SlotInfo(
slot.getJobId(),
slot.getResourceProfile(),
slot.getAssignedTasks()))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class FineGrainedTaskManagerSlot implements TaskManagerSlotInformation {
/** Current state of this slot. Should be either PENDING or ALLOCATED. */
private SlotState state;

private int assignedTasks = 0;

public FineGrainedTaskManagerSlot(
AllocationID allocationId,
JobID jobId,
Expand All @@ -72,6 +74,16 @@ public ResourceProfile getResourceProfile() {
return resourceProfile;
}

@Override
public void setAssignedTasks(int assignedTasks) {
this.assignedTasks = assignedTasks;
}

@Override
public int getAssignedTasks() {
return assignedTasks;
}

@Override
public SlotState getState() {
return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -315,6 +316,22 @@ public Collection<PendingTaskManager> getPendingTaskManagers() {
Collections.emptySet()));
}

@Override
public int getAssignedTasks(InstanceID instanceId) {
FineGrainedTaskManagerRegistration taskManagerRegistration =
taskManagerRegistrations.get(instanceId);
if (Objects.isNull(taskManagerRegistration)) {
return 0;
}
int totalAssignedTasks = 0;
for (TaskManagerSlotInformation slot :
taskManagerRegistration.getAllocatedSlots().values()) {
final int assignedTasks = slot.getAssignedTasks();
totalAssignedTasks = totalAssignedTasks + assignedTasks;
}
return totalAssignedTasks;
}

@Override
public int getNumberRegisteredSlots() {
return taskManagerRegistrations.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
* failure, respectively.
*/
public interface SlotManager extends AutoCloseable {

int getAssignedTasksOf(InstanceID instanceId);

int getNumberRegisteredSlots();

int getNumberRegisteredSlotsOf(InstanceID instanceId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,18 @@ default boolean isMatchingRequirement(ResourceProfile required) {
* @return resource profile of this slot
*/
ResourceProfile getResourceProfile();

/**
* Set the number of tasks assigned to the current slot.
*
* @param assignedTasks The number of tasks assigned to the current slot.
*/
void setAssignedTasks(int assignedTasks);

/**
* Get the number of tasks assigned to the current slot.
*
* @return The number of tasks assigned to the current slot.
*/
int getAssignedTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,31 +46,43 @@ public class SlotInfo implements ResponseBody, Serializable {

public static final String FIELD_NAME_JOB_ID = "jobId";

public static final String FIELD_NAME_ASSIGNED_TASKS = "assignedTasks";

@JsonProperty(FIELD_NAME_RESOURCE)
private final ResourceProfileInfo resource;

@JsonProperty(FIELD_NAME_JOB_ID)
@JsonSerialize(using = JobIDSerializer.class)
private final JobID jobId;

@JsonProperty(FIELD_NAME_ASSIGNED_TASKS)
private final int assignedTasks;

@JsonCreator
public SlotInfo(
@JsonDeserialize(using = JobIDDeserializer.class) @JsonProperty(FIELD_NAME_JOB_ID)
JobID jobId,
@JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource) {
@JsonProperty(FIELD_NAME_RESOURCE) ResourceProfileInfo resource,
@JsonProperty(FIELD_NAME_ASSIGNED_TASKS) int assignedTasks) {
this.jobId = Preconditions.checkNotNull(jobId);
this.resource = Preconditions.checkNotNull(resource);
this.assignedTasks = assignedTasks;
}

public SlotInfo(JobID jobId, ResourceProfile resource) {
this(jobId, ResourceProfileInfo.fromResourceProfile(resource));
public SlotInfo(JobID jobId, ResourceProfile resource, int assignedTasks) {
this(jobId, ResourceProfileInfo.fromResourceProfile(resource), assignedTasks);
}

@JsonIgnore
public JobID getJobId() {
return jobId;
}

@JsonIgnore
public int getAssignedTasks() {
return assignedTasks;
}

@JsonIgnore
public ResourceProfileInfo getResource() {
return resource;
Expand All @@ -85,11 +97,13 @@ public boolean equals(Object o) {
return false;
}
SlotInfo that = (SlotInfo) o;
return Objects.equals(jobId, that.jobId) && Objects.equals(resource, that.resource);
return Objects.equals(jobId, that.jobId)
&& Objects.equals(resource, that.resource)
&& Objects.equals(assignedTasks, that.assignedTasks);
}

@Override
public int hashCode() {
return Objects.hash(jobId, resource);
return Objects.hash(jobId, resource, assignedTasks);
}
}
Loading