Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ private void synchronizeChildGroups(final ProcessGroup group, final VersionedPro
}

private void synchronizeControllerServices(final ProcessGroup group, final VersionedProcessGroup proposed, final Map<String, ControllerServiceNode> servicesByVersionedId,
final ProcessGroup topLevelGroup) {
final ProcessGroup topLevelGroup) throws FlowSynchronizationException {
// Controller Services have to be handled a bit differently than other components. This is because Processors and Controller
// Services may reference other Controller Services. Since we may be adding Service A, which depends on Service B, before adding
// Service B, we need to ensure that we create all Controller Services first and then call updateControllerService for each
Expand Down Expand Up @@ -745,17 +745,47 @@ private void synchronizeControllerServices(final ProcessGroup group, final Versi
updateControllerService(addedService, proposedService, topLevelGroup);
}

// Update all of the Controller Services to match the VersionedControllerService
// Update all Controller Services to match the VersionedControllerService.
// Services may still be ENABLED here because not all callers disable them before sync-ing.
// We must disable before calling updateControllerService, which calls setProperties
// which calls verifyModifiable and throws IllegalStateException on ENABLED services.
for (final Map.Entry<ControllerServiceNode, VersionedControllerService> entry : services.entrySet()) {
final ControllerServiceNode service = entry.getKey();
final VersionedControllerService proposedService = entry.getValue();

if (updatedVersionedComponentIds.contains(proposedService.getIdentifier())) {
updateControllerService(service, proposedService, topLevelGroup);
// Any existing component that is modified during synchronization may have its properties reverted to a pre-migration state,
// so we then add it to the set to allow migrateProperties to be called again to get it back to the migrated state
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
LOG.info("Updated {}", service);
final long stopTimeout = System.currentTimeMillis() + syncOptions.getComponentStopTimeout().toMillis();
final Set<ComponentNode> referencesToRestart = new HashSet<>();
final Set<ControllerServiceNode> servicesToRestart = new HashSet<>();

try {
try {
stopControllerService(service, proposedService, stopTimeout,
syncOptions.getComponentStopTimeoutAction(),
referencesToRestart, servicesToRestart, syncOptions);
} catch (final TimeoutException e) {
throw new FlowSynchronizationException("Failed to stop Controller Service " + service + " in preparation for update", e);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new FlowSynchronizationException("Interrupted while stopping Controller Service " + service, e);
}
updateControllerService(service, proposedService, topLevelGroup);
createdAndModifiedExtensions.add(new CreatedOrModifiedExtension(service, getPropertyValues(service)));
LOG.info("Updated {}", service);
} finally {
// Re-enable services and restart components that were stopped for the update,
// restoring the controller to its pre-update running state.
if (proposedService.getScheduledState() != org.apache.nifi.flow.ScheduledState.DISABLED) {
// Use the component scheduler (not the provider directly) which has
// already been paused, avoiding a race with the enable loop below.
context.getComponentScheduler().enableControllerServicesAsync(servicesToRestart);
notifyScheduledStateChange(servicesToRestart, syncOptions, org.apache.nifi.flow.ScheduledState.ENABLED);
context.getControllerServiceProvider().scheduleReferencingComponents(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also call notifyScheduledStateChange for servicesToRestart (ENABLED) and referencesToRestart (RUNNING) here, to match the per-service synchronize(ControllerServiceNode, …) overload?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: d4666a4

service, referencesToRestart, context.getComponentScheduler());
referencesToRestart.forEach(componentNode ->
notifyScheduledStateChange(componentNode, syncOptions, org.apache.nifi.flow.ScheduledState.RUNNING));
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,75 @@ public void testTerminateReferenceOnTimeout() throws FlowSynchronizationExceptio
verify(service).setName("Hello");
}

private ControllerServiceNode createMappableControllerService(final ProcessGroup processGroup) {
final ControllerServiceNode service = createMockControllerService();
when(service.getProcessGroup()).thenReturn(processGroup);
when(service.getVersionedComponentId()).thenReturn(Optional.of(UUID.randomUUID().toString()));
when(service.getCanonicalClassName()).thenReturn("org.apache.nifi.services.Test");
when(service.getReferences()).thenReturn(Mockito.mock(ControllerServiceReference.class));
return service;
}

@Test
public void testGroupSynchronizeDisablesEnabledControllerServiceBeforeUpdate() {
final ProcessGroup processGroup = createMockProcessGroup();
final ControllerServiceNode service = createMappableControllerService(processGroup);
when(service.isActive()).thenReturn(true);
when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
when(processGroup.getControllerServices(false)).thenReturn(Set.of(service));

when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.emptyMap());
when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));

final VersionedControllerService versionedService = createMinimalVersionedControllerService();
versionedService.setIdentifier(service.getVersionedComponentId().orElse(service.getIdentifier()));
versionedService.setProperties(Collections.singletonMap("abc", "updated-value"));
versionedService.setScheduledState(ScheduledState.ENABLED);

final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
versionedGroup.setIdentifier("pg-v1");
versionedGroup.setControllerServices(Set.of(versionedService));

final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(versionedGroup);

assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions));

verify(controllerServiceProvider).disableControllerServicesAsync(anyCollection());
verify(service).setProperties(anyMap(), anyBoolean(), anySet());
verify(componentScheduler).enableControllerServicesAsync(anySet());
}

@Test
public void testGroupSynchronizeDoesNotReEnableControllerServiceWhenProposedStateDisabled() {
final ProcessGroup processGroup = createMockProcessGroup();
final ControllerServiceNode service = createMappableControllerService(processGroup);
when(service.isActive()).thenReturn(true);
when(service.getState()).thenReturn(ControllerServiceState.ENABLED);
when(processGroup.getControllerServices(false)).thenReturn(Set.of(service));

when(controllerServiceProvider.unscheduleReferencingComponents(service)).thenReturn(Collections.emptyMap());
when(controllerServiceProvider.disableControllerServicesAsync(anyCollection())).thenReturn(CompletableFuture.completedFuture(null));

final VersionedControllerService versionedService = createMinimalVersionedControllerService();
versionedService.setIdentifier(service.getVersionedComponentId().orElse(service.getIdentifier()));
versionedService.setProperties(Collections.singletonMap("abc", "updated-value"));
versionedService.setScheduledState(ScheduledState.DISABLED);

final VersionedProcessGroup versionedGroup = new VersionedProcessGroup();
versionedGroup.setIdentifier("pg-v1");
versionedGroup.setControllerServices(Set.of(versionedService));

final VersionedExternalFlow externalFlow = new VersionedExternalFlow();
externalFlow.setFlowContents(versionedGroup);

assertDoesNotThrow(() -> synchronizer.synchronize(processGroup, externalFlow, synchronizationOptions));

verify(controllerServiceProvider).disableControllerServicesAsync(anyCollection());
verify(service).setProperties(anyMap(), anyBoolean(), anySet());
verify(controllerServiceProvider, never()).enableControllerServicesAsync(anySet());
}

@Test
public void testCreatingParameterContext() throws FlowSynchronizationException, InterruptedException, TimeoutException {
final VersionedParameterContext proposed = createVersionedParameterContext(CONTEXT_NAME_1, INITIAL_PARAMETERS, SENSITIVE_PARAM_NAMES);
Expand Down
Loading