Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,62 @@
import org.apache.nifi.web.api.dto.StateMapDTO;
import org.apache.nifi.web.api.entity.ComponentStateEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public abstract class AbstractStateKeyDropIT extends NiFiSystemIT {

private static final Logger logger = LoggerFactory.getLogger(AbstractStateKeyDropIT.class);
private static final long RETRY_TIMEOUT_SECONDS = 30;
private static final long RETRY_DELAY_MILLIS = 500;

/**
* Retrieves the state for a given processor.
* Retrieves the state for a given processor, retrying on transient cluster errors.
*
* @param processorId the ID of the processor
* @param scope the scope of the state to retrieve (LOCAL or CLUSTER)
* @return a map containing the cluster state key-value pairs
* @throws IOException IO exception
* @throws NiFiClientException NiFi Client Exception
* @return a map containing the state key-value pairs for the given scope
* @throws NiFiClientException NiFi Client Exception
* @throws IOException IO exception
* @throws InterruptedException if the thread is interrupted while retrying
*/
protected Map<String, String> getProcessorState(final String processorId, final Scope scope) throws NiFiClientException, IOException {
final ComponentStateDTO componentState = getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState();
final Map<String, String> state = new HashMap<>();

switch (scope) {
case LOCAL:
if (componentState != null && componentState.getLocalState() != null && componentState.getLocalState().getState() != null) {
componentState.getLocalState().getState().forEach(entry -> state.put(entry.getKey(), entry.getValue()));
protected Map<String, String> getProcessorState(final String processorId, final Scope scope) throws NiFiClientException, IOException, InterruptedException {
final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(RETRY_TIMEOUT_SECONDS);

while (true) {
try {
final ComponentStateDTO componentState = getNifiClient().getProcessorClient().getProcessorState(processorId).getComponentState();
final Map<String, String> state = new HashMap<>();

switch (scope) {
case LOCAL:
if (componentState != null && componentState.getLocalState() != null && componentState.getLocalState().getState() != null) {
componentState.getLocalState().getState().forEach(entry -> state.put(entry.getKey(), entry.getValue()));
}
break;
case CLUSTER:
if (componentState != null && componentState.getClusterState() != null && componentState.getClusterState().getState() != null) {
componentState.getClusterState().getState().forEach(entry -> state.put(entry.getKey(), entry.getValue()));
}
break;
}
break;
case CLUSTER:
if (componentState != null && componentState.getClusterState() != null && componentState.getClusterState().getState() != null) {
componentState.getClusterState().getState().forEach(entry -> state.put(entry.getKey(), entry.getValue()));

return state;
} catch (final NiFiClientException e) {
if (!isTransientClusterError(e) || System.currentTimeMillis() > maxTime) {
throw e;
}
break;
logger.info("Transient cluster error getting processor state for {}, retrying: {}", processorId, e.getMessage());
Thread.sleep(RETRY_DELAY_MILLIS);
}
}

return state;
}

/**
Expand All @@ -69,8 +89,8 @@ protected Map<String, String> getProcessorState(final String processorId, final
* @param newState a map containing the new state key-value pairs, or null to
* drop the state without setting a new one
* @return the response from the NiFi API
* @throws IOException IO exception
* @throws NiFiClientException NiFi Client Exception
* @throws IOException IO exception
*/
protected ComponentStateEntity dropProcessorState(final String processorId, final Map<String, String> newState) throws NiFiClientException, IOException {
final ComponentStateEntity entity = new ComponentStateEntity();
Expand All @@ -95,15 +115,34 @@ protected ComponentStateEntity dropProcessorState(final String processorId, fina
}

/**
* Runs a processor once and waits for it to stop.
* Runs a processor once and waits for it to stop, retrying on transient cluster errors.
*
* @param processor the processor entity to run
* @throws NiFiClientException if there is an error with the NiFi client
* @throws IOException if there is an IO error
* @throws InterruptedException if the thread is interrupted while waiting
*/
protected void runProcessorOnce(final ProcessorEntity processor) throws NiFiClientException, IOException, InterruptedException {
getNifiClient().getProcessorClient().runProcessorOnce(processor);
final long maxTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(RETRY_TIMEOUT_SECONDS);

while (true) {
try {
getNifiClient().getProcessorClient().runProcessorOnce(processor);
break;
} catch (final NiFiClientException e) {
if (!isTransientClusterError(e) || System.currentTimeMillis() > maxTime) {
throw e;
}
logger.info("Transient cluster error running processor {} once, retrying: {}", processor.getId(), e.getMessage());
Thread.sleep(RETRY_DELAY_MILLIS);
}
}

getClientUtil().waitForStoppedProcessor(processor.getId());
}

private boolean isTransientClusterError(final NiFiClientException exception) {
final String message = exception.getMessage();
return message != null && message.contains("not connected");
}
}
Loading