Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/conf/agent.properties
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ resource=com.cloud.hypervisor.kvm.resource.LibvirtComputingResource
# The number of threads running in the agent.
workers=5
# number of worker threads dedicated to Stats commands (default: workers)
stats.workers=3
stats.workers=2
# number of worker threads dedicated to HA health-check commands (default: workers)
ha.workers=3
ha.workers=1

# The IP address of the management server.
host=localhost
Expand Down
201 changes: 193 additions & 8 deletions agent/src/main/java/com/cloud/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,21 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -66,8 +71,10 @@
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckOnHostCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.CronCommand;
import com.cloud.agent.api.CheckVMActivityOnStoragePoolCommand;
import com.cloud.agent.api.MaintainAnswer;
import com.cloud.agent.api.MaintainCommand;
import com.cloud.agent.api.MigrateAgentConnectionAnswer;
Expand Down Expand Up @@ -154,12 +161,20 @@ public int value() {

private final AtomicReference<StartupTask> startupTask = new AtomicReference<>();
private static final long DEFAULT_STARTUP_WAIT = 180;
private static final long EXECUTOR_MONITOR_INTERVAL_MS = 10000L;
private static final String ANSI_GREEN = "\u001B[92m";
private static final String ANSI_RED = "\u001B[31m";
private static final String ANSI_RESET = "\u001B[0m";
long startupWait = DEFAULT_STARTUP_WAIT;
boolean reconnectAllowed = true;
Timer executorMonitorTimer;
private final Set<String> executorMonitorContexts = ConcurrentHashMap.newKeySet();

//For time sensitive task, e.g. PingTask
ThreadPoolExecutor outRequestHandler;
ExecutorService requestHandler;
ThreadPoolExecutor basicExecutor;
ThreadPoolExecutor statsExecutor;
ThreadPoolExecutor haExecutor;

Thread shutdownThread = new ShutdownThread(this);

Expand Down Expand Up @@ -190,11 +205,21 @@ protected String getAgentName() {
protected void setupShutdownHookAndInitExecutors() {
logger.trace("Adding shutdown hook");
Runtime.getRuntime().addShutdownHook(shutdownThread);
int statsWorkers = resolveStatsWorkers();
int haWorkers = resolveHaWorkers();
selfTaskExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Agent-SelfTask"));
outRequestHandler = new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES,
new SynchronousQueue<>(), new NamedThreadFactory("AgentOutRequest-Handler"));
requestHandler = new ThreadPoolExecutor(shell.getWorkers(), 5 * shell.getWorkers(), 1, TimeUnit.DAYS,
new LinkedBlockingQueue<>(), new NamedThreadFactory("AgentRequest-Handler"));
basicExecutor = new ThreadPoolExecutor(shell.getWorkers(), 5 * shell.getWorkers(), 10, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("Basic-Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
statsExecutor = new ThreadPoolExecutor(statsWorkers, 5 * statsWorkers, 10, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("Stats-Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
haExecutor = new ThreadPoolExecutor(haWorkers, 5 * haWorkers, 10, TimeUnit.SECONDS,
new SynchronousQueue<>(), new NamedThreadFactory("HA-Worker"), new ThreadPoolExecutor.CallerRunsPolicy());
executorMonitorTimer = new Timer("AgentTaskCheckTimer");
scheduleExecutorMonitoring("Basic-Worker", basicExecutor);
scheduleExecutorMonitoring("Stats-Worker", statsExecutor);
scheduleExecutorMonitoring("HA-Worker", haExecutor);
}

/**
Expand Down Expand Up @@ -290,13 +315,135 @@ public String getResourceName() {
return serverResource.getClass().getSimpleName();
}

private int resolveStatsWorkers() {
String statsWorkersConfig = shell.getProperties() != null ? shell.getProperties().getProperty("stats.workers") : null;
return Math.max(1, NumbersUtil.parseInt(statsWorkersConfig, shell.getWorkers()));
}

private int resolveHaWorkers() {
String haWorkersConfig = shell.getProperties() != null ? shell.getProperties().getProperty("ha.workers") : null;
return Math.max(1, NumbersUtil.parseInt(haWorkersConfig, shell.getWorkers()));
}

private void scheduleExecutorMonitoring(String context, ExecutorService executorService) {
if (!(executorService instanceof ThreadPoolExecutor)) {
return;
}
if (executorMonitorTimer == null) {
executorMonitorTimer = new Timer("AgentTaskCheckTimer");
}
if (!executorMonitorContexts.add(context)) {
return;
}
ThreadPoolExecutor executor = (ThreadPoolExecutor) executorService;
logAgentExecutorMetrics(context, executor);
executorMonitorTimer.scheduleAtFixedRate(new AgentExecutorMonitorTask(context, executor), EXECUTOR_MONITOR_INTERVAL_MS, EXECUTOR_MONITOR_INTERVAL_MS);
}

private void logAgentExecutorMetrics(String context, ThreadPoolExecutor executor) {
BlockingQueue<?> queue = executor.getQueue();
int queueSize = queue.size();
long taskCount = executor.getTaskCount();
long completedTasks = executor.getCompletedTaskCount();
long pendingTasks = Math.max(0, taskCount - completedTasks);
if (queueSize > 0 || executor.getActiveCount() > executor.getPoolSize()) {
logger.warn("{}Executor load warning [{}]: Workers={} | Active={} | QueueSize={} | PendingTasks={} | CompletedTasks={}{}",
ANSI_RED, context, executor.getPoolSize(), executor.getActiveCount(), queueSize, pendingTasks, completedTasks, ANSI_RESET);
logPendingTaskStacks(context, pendingTasks);
} else {
logger.info("{}Executor status [{}]: Workers={} | Active={} | QueueSize={} | PendingTasks={} | CompletedTasks={}{}",
ANSI_GREEN, context, executor.getPoolSize(), executor.getActiveCount(), queueSize, pendingTasks, completedTasks, ANSI_RESET);
if (pendingTasks > 0) {
logPendingTaskStacks(context, pendingTasks);
}
}
}

private void logPendingTaskStacks(String context, long pendingTasks) {
if (pendingTasks <= 0) {
return;
}
EnumSet<Thread.State> pendingStates = EnumSet.of(Thread.State.BLOCKED, Thread.State.TIMED_WAITING);
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
Thread thread = entry.getKey();
if (!thread.getName().startsWith(context)) {
continue;
}
if (!pendingStates.contains(thread.getState())) {
continue;
}
StringBuilder trace = new StringBuilder();
StackTraceElement[] stack = entry.getValue();
int limit = Math.min(stack.length, 8);
for (int i = 0; i < limit; i++) {
trace.append(stack[i].toString());
if (i < limit - 1) {
trace.append(" <- ");
}
}
logger.warn("{}Pending task state [{}]: State={} | Stack={}{}", ANSI_RED, thread.getName(), thread.getState(), trace, ANSI_RESET);
}
}

private ThreadPoolExecutor selectExecutorForRequest(Request request) {
if (requestContainsHaCommand(request) && haExecutor != null) {
return haExecutor;
}
if (requestContainsStatsCommand(request) && statsExecutor != null) {
return statsExecutor;
}
if (basicExecutor != null) {
return basicExecutor;
}
if (statsExecutor != null) {
return statsExecutor;
}
return haExecutor;
}

private boolean requestContainsStatsCommand(Request request) {
if (request == null) {
return false;
}
Command[] commands = request.getCommands();
if (commands == null) {
return false;
}
for (Command command : commands) {
if (command != null && command.getClass().getSimpleName().contains("StatsCommand")) {
return true;
}
}
return false;
}

private boolean requestContainsHaCommand(Request request) {
if (request == null) {
return false;
}
Command[] commands = request.getCommands();
if (commands == null) {
return false;
}
for (Command command : commands) {
if (command instanceof CheckOnHostCommand || command instanceof CheckVMActivityOnStoragePoolCommand) {
return true;
}
}
return false;
}

/**
* In case of a software based agent restart, this method
* can help to perform explicit garbage collection of any old
* agent instances and its inner objects.
*/
private void scavengeOldAgentObjects() {
requestHandler.submit(() -> {
ExecutorService executorService = selectExecutorForRequest(null);
if (executorService == null || executorService.isShutdown()) {
return;
}
executorService.submit(() -> {
try {
Thread.sleep(2000L);
} catch (final InterruptedException ignored) {
Expand Down Expand Up @@ -385,9 +532,26 @@ public void stop(final String reason, final String detail) {
outRequestHandler = null;
}

if (requestHandler != null) {
requestHandler.shutdown();
requestHandler = null;
if (basicExecutor != null) {
basicExecutor.shutdown();
basicExecutor = null;
}

if (statsExecutor != null) {
statsExecutor.shutdown();
statsExecutor = null;
}

if (haExecutor != null) {
haExecutor.shutdown();
haExecutor = null;
}

if (executorMonitorTimer != null) {
executorMonitorTimer.cancel();
executorMonitorTimer.purge();
executorMonitorTimer = null;
executorMonitorContexts.clear();
}

if (selfTaskExecutor != null) {
Expand Down Expand Up @@ -1226,6 +1390,21 @@ public void run() {
}
}

private class AgentExecutorMonitorTask extends ManagedContextTimerTask {
private final String context;
private final ThreadPoolExecutor executor;

AgentExecutorMonitorTask(String context, ThreadPoolExecutor executor) {
this.context = context;
this.executor = executor;
}

@Override
protected void runInContext() {
logAgentExecutorMetrics(context, executor);
}
}

public class WatchTask implements Runnable {
protected Request _request;
protected Agent _agent;
Expand Down Expand Up @@ -1323,7 +1502,13 @@ public void doTask(final Task task) throws TaskExecutionException {
} else {
//put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool
//processRequest(request, task.getLink());
requestHandler.submit(new AgentRequestHandler(getType(), getLink(), request));
ThreadPoolExecutor executor = selectExecutorForRequest(request);
if (executor == null) {
logger.warn("No executor available to process request {}; running inline", request);
processRequest(request, task.getLink());
return;
}
executor.submit(new AgentRequestHandler(getType(), getLink(), request));
}
} catch (final ClassNotFoundException e) {
logger.error("Unable to find this request ");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ public class AgentProperties{
* Data type: Integer.<br>
* Default value: workers
*/
public static final Property<Integer> STATS_WORKERS = new Property<>("stats.workers", 3, Integer.class);
public static final Property<Integer> STATS_WORKERS = new Property<>("stats.workers", 2, Integer.class);

/**
* The number of threads dedicated to HA health-check commands.<br>
* Data type: Integer.<br>
* Default value: workers
*/
public static final Property<Integer> HA_WORKERS = new Property<>("ha.workers", 3, Integer.class);
public static final Property<Integer> HA_WORKERS = new Property<>("ha.workers", 1, Integer.class);

/**
* The IP address of the management server.<br>
Expand Down
4 changes: 3 additions & 1 deletion agent/src/test/java/com/cloud/agent/AgentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand Down Expand Up @@ -117,7 +119,7 @@ public void testAgentInitialization() {
agent.setupShutdownHookAndInitExecutors();
assertNotNull(agent.selfTaskExecutor);
assertNotNull(agent.outRequestHandler);
assertNotNull(agent.requestHandler);
assertNotNull(agent.basicExecutor);
}

@Test
Expand Down
1 change: 0 additions & 1 deletion api/src/main/java/com/cloud/event/EventTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.cloudstack.storage.sharedfs.SharedFS;
import org.apache.cloudstack.storage.object.Bucket;
import org.apache.cloudstack.storage.object.ObjectStore;
import org.apache.cloudstack.storage.sharedfs.SharedFS;
import org.apache.cloudstack.usage.Usage;
import org.apache.cloudstack.vm.schedule.VMSchedule;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import com.cloud.upgrade.dao.Upgrade42200to42210;
import com.cloud.upgrade.dao.Upgrade420to421;
import com.cloud.upgrade.dao.Upgrade421to430;
import com.cloud.upgrade.dao.Upgrade42200to42300;
import com.cloud.upgrade.dao.Upgrade430to440;
import com.cloud.upgrade.dao.Upgrade431to440;
import com.cloud.upgrade.dao.Upgrade432to440;
Expand Down Expand Up @@ -239,7 +238,6 @@ public DatabaseUpgradeChecker() {
.next("4.20.1.0", new Upgrade42010to42100())
.next("4.21.0.0", new Upgrade42100to42200())
.next("4.22.0.0", new Upgrade42200to42210())
.next("4.22.0.0", new Upgrade42200to42300())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,15 @@ UPDATE `cloud`.`configuration` SET `group_id` = (SELECT `id` FROM `cloud`.`confi
UPDATE `cloud`.`configuration` SET `description`='whether volume snapshot is enabled on running instances on KVM hosts' WHERE `name`='kvm.snapshot.enabled';

-- Modify index for mshost_peer
DELETE FROM `cloud`.`mshost_peer`;
-- Only purge entries when the unique key is not yet present (prevents repeated truncation on reruns)
SET @need_mshost_peer_rebuild := (
SELECT COUNT(*) = 0
FROM information_schema.statistics
WHERE table_schema = 'cloud'
AND table_name = 'mshost_peer'
AND index_name = 'i_mshost_peer__owner_peer'
);
DELETE FROM `cloud`.`mshost_peer` WHERE @need_mshost_peer_rebuild;
CALL `cloud`.`IDEMPOTENT_DROP_FOREIGN_KEY`('cloud.mshost_peer','fk_mshost_peer__owner_mshost');
CALL `cloud`.`IDEMPOTENT_DROP_INDEX`('i_mshost_peer__owner_peer_runid','mshost_peer');
CALL `cloud`.`IDEMPOTENT_ADD_UNIQUE_KEY`('cloud.mshost_peer', 'i_mshost_peer__owner_peer', '(owner_mshost, peer_mshost)');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ public TemplateProfile prepare(GetUploadParamsForIsoCmd cmd) throws ResourceAllo
BooleanUtils.toBoolean(cmd.isExtractable()), cmd.getOsTypeId(),
cmd.getZoneId(), BooleanUtils.toBoolean(cmd.isBootable()), cmd.getEntityOwnerId(),
BooleanUtils.toBoolean(cmd.isDynamicallyScalable()));
return prepareUploadParamsInternal(params);
return prepareUploadParamsInternal(cmd, params);
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion ui/src/components/header/AutoAlertBanner.vue
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ export default {

const hostIndexCache = { until: 0, byIp: new Map(), byName: new Map() }
const vmIndexCache = { until: 0, byIp: new Map(), byName: new Map(), byInstanceName: new Map() }
const vmIndexReady = ref(false)

const ensureHostIndex = async () => {
const now = Date.now()
Expand Down Expand Up @@ -1186,7 +1187,7 @@ export default {
})
}
const hostEntityLinks = (it) => entityLinksForAlert(it).filter((x) => x.kind === 'host')
const vmEntityLinks = (it) => entityLinksForAlert(it).filter((x) => x.kind === 'vm')
const vmEntityLinks = (it) => filterKnownVmLinks(it)
const storageEntityLinks = (it) => entityLinksForAlert(it).filter((x) => x.kind === 'storage')
const cloudEntityLinks = (it) => entityLinksForAlert(it).filter((x) => x.kind === 'cloud')
const hostLinkList = (it) => hostEntityLinks(it).slice(0, MAX_LINKS)
Expand Down
Loading
Loading