Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public class WorkerModel {
String cpu;
int cpuNb;
int memorySize;
boolean teeEnabled;
boolean gpuEnabled;
// TODO remove or rename to sgxEnabled in the future
boolean teeEnabled;
boolean tdxEnabled;

@JsonPOJOBuilder(withPrefix = "")
public static class WorkerModelBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void shouldSerializeAndDeserialize() throws JsonProcessingException {
WorkerModel model = WorkerModel.builder().build();
String jsonString = mapper.writeValueAsString(model);
assertThat(jsonString).isEqualTo("{\"name\":null,\"walletAddress\":null,\"os\":null,\"cpu\":null," +
"\"cpuNb\":0,\"memorySize\":0,\"teeEnabled\":false,\"gpuEnabled\":false}");
"\"cpuNb\":0,\"memorySize\":0,\"gpuEnabled\":false,\"teeEnabled\":false,\"tdxEnabled\":false}");
WorkerModel parsedModel = mapper.readValue(jsonString, WorkerModel.class);
assertThat(parsedModel).isEqualTo(model);
}
Expand Down
36 changes: 11 additions & 25 deletions src/main/java/com/iexec/core/replicate/ReplicateSupplyService.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,52 +98,38 @@ Optional<ReplicateTaskSummary> getAvailableReplicateTaskSummary(long workerLastB
return Optional.empty();
}

final Optional<Worker> optional = workerService.getWorker(walletAddress);
if (optional.isEmpty()) {
final Worker worker = workerService.getWorker(walletAddress).orElse(null);
// return empty if the worker is not found or if max computing task is reached
if (worker == null || worker.hasNoRemainingComputingSlot()) {
return Optional.empty();
}
final Worker worker = optional.get();

// return empty if max computing task is reached or if the worker is not found
if (!workerService.canAcceptMoreWorks(worker)) {
return Optional.empty();
}

return getReplicateTaskSummaryForAnyAvailableTask(
walletAddress,
worker.isTeeEnabled()
);
return getReplicateTaskSummaryForAnyAvailableTask(worker);
}

/**
* Loops through available tasks
* and finds the first one that needs a new {@link Replicate}.
*
* @param walletAddress Wallet address of the worker asking for work.
* @param isTeeEnabled Whether this worker supports TEE.
* @param worker scheduler model of the worker asking for work
* @return An {@link Optional} containing a {@link ReplicateTaskSummary}
* if any {@link Task} is available and can be handled by this worker,
* {@link Optional#empty()} otherwise.
*/
private Optional<ReplicateTaskSummary> getReplicateTaskSummaryForAnyAvailableTask(
String walletAddress,
boolean isTeeEnabled) {
private Optional<ReplicateTaskSummary> getReplicateTaskSummaryForAnyAvailableTask(final Worker worker) {
final List<String> alreadyScannedTasks = new ArrayList<>();
final List<String> excludedTags = worker.getExcludedTags();

Optional<ReplicateTaskSummary> replicateTaskSummary = Optional.empty();
while (replicateTaskSummary.isEmpty()) {
final Optional<Task> oTask = taskService.getPrioritizedInitializedOrRunningTask(
!isTeeEnabled,
alreadyScannedTasks
);
if (oTask.isEmpty()) {
final Task task = taskService.getPrioritizedInitializedOrRunningTask(
excludedTags, alreadyScannedTasks).orElse(null);
if (task == null) {
// No more tasks waiting for a new replicate.
return Optional.empty();
}

final Task task = oTask.get();
alreadyScannedTasks.add(task.getChainTaskId());
replicateTaskSummary = getReplicateTaskSummary(task, walletAddress);
replicateTaskSummary = getReplicateTaskSummary(task, worker.getWalletAddress());
}
return replicateTaskSummary;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/iexec/core/task/Task.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -146,7 +146,7 @@ public boolean inCompletionPhase() {
}

public boolean isTeeTask() {
return TeeUtils.isTeeTag(getTag());
return TeeUtils.getTeeFramework(tag) != null;
}

TaskModel generateModel() {
Expand Down
15 changes: 5 additions & 10 deletions src/main/java/com/iexec/core/task/TaskService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.iexec.commons.poco.chain.ChainTask;
import com.iexec.commons.poco.chain.ChainTaskStatus;
import com.iexec.commons.poco.tee.TeeUtils;
import com.iexec.core.chain.IexecHubService;
import com.iexec.core.replicate.ReplicatesList;
import com.iexec.core.task.event.TaskCreatedEvent;
Expand Down Expand Up @@ -229,21 +228,17 @@ public List<Task> findByCurrentStatus(List<TaskStatus> statusList) {
* <p>
* Tasks can be excluded with {@code excludedChainTaskIds}.
*
* @param shouldExcludeTeeTasks Whether TEE tasks should be retrieved
* as well as standard tasks.
* @param excludedChainTaskIds Tasks to exclude from retrieval.
* @param excludedTags Whether some tags should not be eligible, it is focused on TEE tags at the moment
* @param excludedChainTaskIds Tasks to exclude from retrieval.
* @return The first task which is {@link TaskStatus#INITIALIZED}
* or {@link TaskStatus#RUNNING},
* or {@link Optional#empty()} if no task meets the requirements.
*/
public Optional<Task> getPrioritizedInitializedOrRunningTask(
boolean shouldExcludeTeeTasks,
List<String> excludedChainTaskIds) {
final List<String> excludedTags = shouldExcludeTeeTasks
? List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG)
: null;
final List<String> excludedTags,
final List<String> excludedChainTaskIds) {
return findPrioritizedTask(
Arrays.asList(INITIALIZED, RUNNING),
List.of(INITIALIZED, RUNNING),
excludedTags,
excludedChainTaskIds,
Sort.by(Sort.Order.desc(CURRENT_STATUS_FIELD_NAME),
Expand Down
44 changes: 36 additions & 8 deletions src/main/java/com/iexec/core/worker/Worker.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2024 IEXEC BLOCKCHAIN TECH
* Copyright 2020-2025 IEXEC BLOCKCHAIN TECH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,22 +16,24 @@

package com.iexec.core.worker;

import com.iexec.commons.poco.tee.TeeUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.index.Indexed;
import org.springframework.data.mongodb.core.mapping.Document;

import java.util.Date;
import java.util.List;

@Slf4j
@Document
@Data
@Builder
@AllArgsConstructor
public class Worker {

@Id
private String id;
private String name;
Expand All @@ -44,8 +46,10 @@ public class Worker {
private int cpuNb;
private int maxNbTasks;
private int memorySize;
private boolean teeEnabled;
private boolean gpuEnabled;
// TODO remove or rename to sgxEnabled in the future
private boolean teeEnabled;
private boolean tdxEnabled;
@Builder.Default
private List<String> participatingChainTaskIds = List.of();
@Builder.Default
Expand All @@ -60,12 +64,36 @@ void addChainTaskId(String chainTaskId) {
computingChainTaskIds.add(chainTaskId);
}

void removeChainTaskId(String chainTaskId) {
participatingChainTaskIds.remove(chainTaskId);
computingChainTaskIds.remove(chainTaskId);
/**
* Returns excluded tags depending on worker configuration
*
* @return The list of excluded tags
*/
public List<String> getExcludedTags() {
if (!teeEnabled && !tdxEnabled) {
return List.of(TeeUtils.TEE_TDX_ONLY_TAG, TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG);
} else if (!teeEnabled) {
return List.of(TeeUtils.TEE_SCONE_ONLY_TAG, TeeUtils.TEE_GRAMINE_ONLY_TAG);
} else if (!tdxEnabled) {
return List.of(TeeUtils.TEE_TDX_ONLY_TAG);
} else {
// /!\ teeEnabled and tdxEnabled are both true in this branch
log.warn("Worker seems to support both SGX and TDX, this should not happen [wallet:{}]", walletAddress);
return List.of();
}
}

void removeComputedChainTaskId(String chainTaskId) {
computingChainTaskIds.remove(chainTaskId);
/**
* Returns whether the worker can accept more work or not.
*
* @return {@literal true} when the worker is at max capacity, {@literal false} otherwise
*/
public boolean hasNoRemainingComputingSlot() {
final boolean areAllComputingSlotsInUse = computingChainTaskIds.size() >= maxNbTasks;
if (areAllComputingSlotsInUse) {
log.debug("Worker is computing at max capacity [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
walletAddress, computingChainTaskIds.size(), maxNbTasks);
}
return areAllComputingSlotsInUse;
}
}
15 changes: 8 additions & 7 deletions src/main/java/com/iexec/core/worker/WorkerController.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,33 @@ public ResponseEntity<String> getToken(@RequestParam(name = "walletAddress") Str
}

@PostMapping(path = "/workers/register")
public ResponseEntity<Worker> registerWorker(@RequestHeader("Authorization") String bearerToken,
@RequestBody WorkerModel model) {
String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
public ResponseEntity<Worker> registerWorker(@RequestHeader("Authorization") final String bearerToken,
@RequestBody final WorkerModel model) {
final String workerWalletAddress = jwtTokenProvider.getWalletAddressFromBearerToken(bearerToken);
if (workerWalletAddress.isEmpty()) {
WorkerUtils.emitWarnOnUnAuthorizedAccess("");
return ResponseEntity.status(HttpStatus.UNAUTHORIZED).build();
}

// if it is a GPU worker, it can process only 1 task at a time, otherwise it can process cpuNb
int maxNbTasks = model.isGpuEnabled() ? 1 : model.getCpuNb();
final int maxNbTasks = model.isGpuEnabled() ? 1 : model.getCpuNb();

Worker worker = Worker.builder()
final Worker worker = Worker.builder()
.name(model.getName())
.walletAddress(workerWalletAddress)
.os(model.getOs())
.cpu(model.getCpu())
.cpuNb(model.getCpuNb())
.maxNbTasks(maxNbTasks)
.memorySize(model.getMemorySize())
.teeEnabled(model.isTeeEnabled())
.gpuEnabled(model.isGpuEnabled())
.teeEnabled(model.isTeeEnabled())
.tdxEnabled(model.isTdxEnabled())
.participatingChainTaskIds(new ArrayList<>())
.computingChainTaskIds(new ArrayList<>())
.build();

Worker savedWorker = workerService.addWorker(worker);
final Worker savedWorker = workerService.addWorker(worker);
log.info("Worker ready [worker:{}]", savedWorker);
return ok(savedWorker);
}
Expand Down
40 changes: 13 additions & 27 deletions src/main/java/com/iexec/core/worker/WorkerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,6 @@ public List<Worker> getAliveWorkers() {
.toList();
return workerRepository.findByWalletAddressIn(aliveWorkers);
}

public boolean canAcceptMoreWorks(Worker worker) {
int workerMaxNbTasks = worker.getMaxNbTasks();
int runningReplicateNb = worker.getComputingChainTaskIds().size();

if (runningReplicateNb >= workerMaxNbTasks) {
log.debug("Worker asking for too many replicates [walletAddress:{}, runningReplicateNb:{}, workerMaxNbTasks:{}]",
worker.getWalletAddress(), runningReplicateNb, workerMaxNbTasks);
return false;
}

return true;
}
// endregion

// region Read-and-write methods
Expand Down Expand Up @@ -257,21 +244,20 @@ public Optional<Worker> addChainTaskIdToWorker(String chainTaskId, String wallet
}

private Optional<Worker> addChainTaskIdToWorkerWithoutThreadSafety(String chainTaskId, String walletAddress) {
final Optional<Worker> optional = workerRepository.findByWalletAddress(walletAddress);
if (optional.isPresent()) {
final Worker worker = optional.get();
if (!canAcceptMoreWorks(worker)) {
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerAddress:{}]",
chainTaskId, walletAddress);
return Optional.empty();
}
worker.addChainTaskId(chainTaskId);
log.info("Added chainTaskId to worker [chainTaskId:{}, workerAddress:{}]", chainTaskId, walletAddress);
return Optional.of(workerRepository.save(worker));
final Worker worker = workerRepository.findByWalletAddress(walletAddress).orElse(null);
if (worker == null) {
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerAddress:{}]",
chainTaskId, walletAddress);
return Optional.empty();
}
if (worker.hasNoRemainingComputingSlot()) {
log.warn("Can't add chainTaskId to worker when already full [chainTaskId:{}, workerAddress:{}]",
chainTaskId, walletAddress);
return Optional.empty();
}
log.warn("Can't add chainTaskId to worker when unknown worker [chainTaskId:{}, workerAddress:{}]",
chainTaskId, walletAddress);
return Optional.empty();
worker.addChainTaskId(chainTaskId);
log.info("Added chainTaskId to worker [chainTaskId:{}, workerAddress:{}]", chainTaskId, walletAddress);
return Optional.of(workerRepository.save(worker));
}

public Optional<Worker> removeChainTaskIdFromWorker(String chainTaskId, String walletAddress) {
Expand Down
Loading