Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.flamingock.internal.core.external.store.lock.LockException;
import io.flamingock.internal.core.pipeline.execution.ExecutableStage;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import io.flamingock.internal.core.pipeline.run.PipelineRun;
import io.flamingock.internal.util.log.FlamingockLoggerFactory;
import org.slf4j.Logger;

Expand Down Expand Up @@ -78,7 +79,8 @@ public CloudExecutionPlanner(RunnerId runnerId,
}

@Override
public ExecutionPlan getNextExecution(List<AbstractLoadedStage> loadedStages) throws LockException {
public ExecutionPlan getNextExecution(PipelineRun pipelineRun) throws LockException {
List<AbstractLoadedStage> loadedStages = pipelineRun.getLoadedStages();

AuditMarkSnapshot snapshot = buildAuditMarkSnapshot();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.flamingock.internal.core.plan.ExecutionPlan;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import io.flamingock.internal.core.pipeline.loaded.stage.DefaultLoadedStage;
import io.flamingock.internal.core.pipeline.run.PipelineRun;
import io.flamingock.internal.util.TimeService;
import io.flamingock.internal.util.id.RunnerId;
import io.flamingock.core.cloud.changes._001__CloudChange1;
Expand Down Expand Up @@ -110,7 +111,7 @@ void shouldReturnAbortPlanWhenServerReturnsAbort() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

ExecutionPlan plan = planner.getNextExecution(stages);
ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(stages));

assertTrue(plan.isAborted());
assertThrows(ManualInterventionRequiredException.class, plan::validate);
Expand All @@ -131,7 +132,7 @@ void shouldReturnAbortPlanWhenServerReturnsAbortWithNoMIChanges() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

ExecutionPlan plan = planner.getNextExecution(stages);
ExecutionPlan plan = planner.getNextExecution(PipelineRun.of(stages));

assertTrue(plan.isAborted());
assertThrows(io.flamingock.internal.common.core.error.FlamingockException.class, plan::validate);
Expand Down Expand Up @@ -163,7 +164,7 @@ void shouldIncludeAuditMarksInExecutionRequest() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));

planner.getNextExecution(stages);
planner.getNextExecution(PipelineRun.of(stages));

ArgumentCaptor<ExecutionPlanRequest> requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class);
verify(client).createExecution(requestCaptor.capture(), any(), anyLong());
Expand Down Expand Up @@ -191,7 +192,7 @@ void shouldSendNoneStatusWhenNoMarks() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

planner.getNextExecution(stages);
planner.getNextExecution(PipelineRun.of(stages));

ArgumentCaptor<ExecutionPlanRequest> requestCaptor = ArgumentCaptor.forClass(ExecutionPlanRequest.class);
verify(client).createExecution(requestCaptor.capture(), any(), anyLong());
Expand Down Expand Up @@ -221,7 +222,7 @@ void shouldClearMarksWhenResponseHasSynchronizedMarksTrue() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));

planner.getNextExecution(stages);
planner.getNextExecution(PipelineRun.of(stages));

verify(marker1).clearMark(change1.getId());
verify(marker2).clearMark(change2.getId());
Expand All @@ -243,7 +244,7 @@ void shouldNotClearMarksWhenResponseHasSynchronizedMarksFalse() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

planner.getNextExecution(stages);
planner.getNextExecution(PipelineRun.of(stages));

verify(marker1, never()).clearMark(any());
}
Expand All @@ -264,7 +265,7 @@ void shouldClearMarksRegardlessOfResponseAction() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Collections.singletonList(change1)));

planner.getNextExecution(stages);
planner.getNextExecution(PipelineRun.of(stages));

verify(marker1).clearMark(change1.getId());
}
Expand All @@ -286,7 +287,7 @@ void shouldNotClearNewMarksWrittenAfterRequest() {
List<AbstractLoadedStage> stages = Collections.singletonList(
new DefaultLoadedStage("stage-1", StageType.DEFAULT, Arrays.asList(change1, change2)));

planner.getNextExecution(stages);
planner.getNextExecution(PipelineRun.of(stages));

// Only change1 should be cleared (was in snapshot), not change2
verify(marker1).clearMark(change1.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import io.flamingock.internal.common.core.error.FlamingockException;
import io.flamingock.internal.common.core.recovery.ManualInterventionRequiredException;
import io.flamingock.internal.common.core.error.PendingChangesException;
import io.flamingock.internal.common.core.response.data.ErrorInfo;
import io.flamingock.internal.common.core.response.data.ExecuteResponseData;
import io.flamingock.internal.common.core.response.data.StageResult;
import io.flamingock.internal.core.event.EventPublisher;
import io.flamingock.internal.core.event.model.impl.PipelineCompletedEvent;
import io.flamingock.internal.core.event.model.impl.PipelineFailedEvent;
Expand All @@ -30,14 +28,14 @@
import io.flamingock.internal.core.event.model.impl.StageStartedEvent;
import io.flamingock.internal.core.operation.execute.ExecuteArgs;
import io.flamingock.internal.core.operation.execute.ExecuteResult;
import io.flamingock.internal.core.operation.result.ExecutionResultBuilder;
import io.flamingock.internal.core.pipeline.execution.ExecutableStage;
import io.flamingock.internal.core.pipeline.execution.ExecutionContext;
import io.flamingock.internal.core.pipeline.execution.OrphanExecutionContext;
import io.flamingock.internal.core.pipeline.execution.StageExecutionException;
import io.flamingock.internal.core.pipeline.execution.StageExecutor;
import io.flamingock.internal.core.pipeline.loaded.LoadedPipeline;
import io.flamingock.internal.core.pipeline.loaded.stage.AbstractLoadedStage;
import io.flamingock.internal.core.pipeline.run.PipelineRun;
import io.flamingock.internal.core.plan.ExecutionPlan;
import io.flamingock.internal.core.plan.ExecutionPlanner;
import io.flamingock.internal.core.external.store.lock.Lock;
Expand Down Expand Up @@ -96,7 +94,7 @@ public ExecuteResult execute(ExecuteArgs args) {
} catch (OperationException operationException) {
throw operationException;
} catch (Throwable throwable) {
throw processAndGetFlamingockException(throwable, null);
throw processAndGetFlamingockException(throwable);
} finally {
this.finalizer.run();
}
Expand All @@ -122,11 +120,13 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx
logger.info("Flamingock execution started [stages={} changes={}]", stageCount, changeCount);

eventPublisher.publish(new PipelineStartedEvent());
ExecutionResultBuilder resultBuilder = new ExecutionResultBuilder().startTimer();

PipelineRun pipelineRun = PipelineRun.of(pipeline);
pipelineRun.start();

do {
List<AbstractLoadedStage> stages = validateAndGetExecutableStages(pipeline);
try (ExecutionPlan execution = executionPlanner.getNextExecution(stages)) {
validateAndGetExecutableStages(pipeline);
try (ExecutionPlan execution = executionPlanner.getNextExecution(pipelineRun)) {
// Validate execution plan for manual intervention requirements
// This centralized validation ensures both community and cloud paths are validated
execution.validate();
Expand All @@ -135,10 +135,8 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx
if (validateOnlyMode()) {
throw new PendingChangesException();
}
execution.applyOnEach((executionId, lock, executableStage) -> {
StageResult stageResult = runStage(executionId, lock, executableStage);
resultBuilder.addStage(stageResult);
});
execution.applyOnEach((executionId, lock, executableStage) ->
runStage(executionId, lock, executableStage, pipelineRun));
} else {
break;
}
Expand All @@ -154,15 +152,24 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx
}
break;
} catch (StageExecutionException e) {
resultBuilder.addStage(e.getResult());
resultBuilder.stopTimer().failed();
resultBuilder.error(ErrorInfo.fromThrowable(e.getCause(), e.getFailedChangeId(), e.getResult().getStageId()));
throw OperationException.fromExisting(e.getCause(), resultBuilder.build());
// Defensive: runStage normally records the failure into pipelineRun before
// rethrowing. If for some reason this exception arrives here without that
// having happened (e.g. thrown by something other than runStage), make sure
// the failure is reflected in the response.
String stageName = e.getResult() != null ? e.getResult().getStageName() : null;
if (stageName != null) {
io.flamingock.internal.core.pipeline.run.StageRun stageRun = pipelineRun.getStageRun(stageName);
if (stageRun != null && !stageRun.getState().isFailed()) {
pipelineRun.markStageFailed(stageName, e);
}
}
pipelineRun.stop();
throw OperationException.fromExisting(e.getCause(), pipelineRun.toResponse());
}
} while (true);

resultBuilder.stopTimer().success();
ExecuteResponseData result = resultBuilder.build();
pipelineRun.stop();
ExecuteResponseData result = pipelineRun.toResponse();

logger.info("Flamingock execution completed [duration={}ms applied={} skipped={}]",
result.getTotalDurationMs(), result.getAppliedChanges(), result.getSkippedChanges());
Expand All @@ -172,29 +179,31 @@ private ExecuteResponseData execute(LoadedPipeline pipeline) throws FlamingockEx
return result;
}

private StageResult runStage(String executionId, Lock lock, ExecutableStage executableStage) {
private void runStage(String executionId, Lock lock, ExecutableStage executableStage, PipelineRun pipelineRun) {
try {
return startStage(executionId, lock, executableStage);
startStage(executionId, lock, executableStage, pipelineRun);
} catch (StageExecutionException exception) {
pipelineRun.markStageFailed(executableStage.getName(), exception);
eventPublisher.publish(new StageFailedEvent(exception));
eventPublisher.publish(new PipelineFailedEvent(exception));
throw exception;
} catch (Throwable generalException) {
throw processAndGetFlamingockException(generalException, null);
throw processAndGetFlamingockException(generalException);
}
}

private StageResult startStage(String executionId, Lock lock, ExecutableStage executableStage) throws StageExecutionException {
private void startStage(String executionId, Lock lock, ExecutableStage executableStage, PipelineRun pipelineRun) throws StageExecutionException {
pipelineRun.markStageStarted(executableStage.getName());
eventPublisher.publish(new StageStartedEvent());
logger.debug("Applied state to process:\n{}", executableStage);

ExecutionContext executionContext = new ExecutionContext(executionId, orphanExecutionContext.getHostname(), orphanExecutionContext.getMetadata());
StageExecutor.Output executionOutput = stageExecutor.executeStage(executableStage, executionContext, lock);
pipelineRun.markStageCompleted(executableStage.getName(), executionOutput.getResult());
eventPublisher.publish(new StageCompletedEvent(executionOutput));
return executionOutput.getResult();
}

private FlamingockException processAndGetFlamingockException(Throwable exception, ExecutionResultBuilder resultBuilder) throws FlamingockException {
private FlamingockException processAndGetFlamingockException(Throwable exception) throws FlamingockException {
FlamingockException flamingockException;
if (exception instanceof OperationException) {
OperationException pipelineException = (OperationException) exception;
Expand Down

This file was deleted.

Loading
Loading