Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.dbcp.DBCPService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
Expand Down Expand Up @@ -114,7 +115,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();

public static final String[] VALID_FAIL_STRATEGY = {"rollback", "transfer to failure"};
static final String ROLLBACK = "rollback";
static final String TRANSFER_TO_FAILURE = "transfer to failure";
public static final PropertyDescriptor FAIL_STRATEGY = new PropertyDescriptor.Builder()
.name("Failure Strategy")
.description("What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`."
Expand All @@ -124,8 +126,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
+ " If the processor has no incoming connections then this parameter has no effect."
)
.required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(VALID_FAIL_STRATEGY)
.defaultValue(VALID_FAIL_STRATEGY[0])
.allowableValues(ROLLBACK, TRANSFER_TO_FAILURE)
.defaultValue(ROLLBACK)
.build();

public static final PropertyDescriptor ADD_CLASSPATH = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -444,10 +446,18 @@ private void onFailSQL(Map<String, Object> sqlMap) {

@Override
public void onTrigger(final ProcessContext context, final ProcessSession processSession) throws ProcessException {
boolean toFailureOnError = VALID_FAIL_STRATEGY[1].equals(context.getProperty(FAIL_STRATEGY).getValue());
boolean toFailureOnError = TRANSFER_TO_FAILURE.equals(context.getProperty(FAIL_STRATEGY).getValue());
//create wrapped session to control list of newly created and files got from this session.
//so transfer original input to failure will be possible
GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(processSession, toFailureOnError);
if (toFailureOnError) {
// FlowFile must be read otherwise if there is a failure before the script is executed, a
// never ending loop occurs since the GroovyProcessSessionWrap has nothing to send to the failure relationship.
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
}
Comment thread
dan-s1 marked this conversation as resolved.

Map<String, Object> ctl = new AccessMap("CTL");
Map<String, Object> sql = new AccessMap("SQL");
Expand Down Expand Up @@ -571,7 +581,8 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dynamic(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public void revertReceivedTo(Relationship r, Throwable t) {
session.transfer(f);
}
}
session.commit();
session.commitAsync();
onClear();
}
/*============================================= NATIVE METHODS ================================================*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.embedded.database.EmbeddedDatabaseConnectionService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.groovyx.flow.ProcessSessionWrap;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
Expand Down Expand Up @@ -142,7 +143,6 @@ public void setup() throws Exception {
@Test
public void testReadFlowFileContentAndStoreInFlowFileAttribute() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "def flowFile = session.get(); if(!flowFile)return; flowFile.testAttr = flowFile.read().getText('UTF-8'); REL_SUCCESS << flowFile;");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");

runner.assertValid();
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
Expand Down Expand Up @@ -187,7 +187,6 @@ void testNonExistingAdditionalClasspath() {
@Test
public void test_onTrigger_groovy() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();

runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
Expand All @@ -200,7 +199,6 @@ public void test_onTrigger_groovy() {
@Test
public void test_onTriggerX_groovy() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTriggerX.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();

runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));
Expand All @@ -213,7 +211,6 @@ public void test_onTriggerX_groovy() {
@Test
public void test_onTrigger_changeContent_groovy() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContent.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();

runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
Expand All @@ -229,7 +226,6 @@ public void test_onTrigger_changeContent_groovy() {
@Test
public void test_onTrigger_changeContentX_groovy() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_onTrigger_changeContentX.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();

runner.enqueue(TEST_CSV_DATA.getBytes(StandardCharsets.UTF_8));
Expand All @@ -245,7 +241,6 @@ public void test_onTrigger_changeContentX_groovy() {
@Test
public void test_no_input_groovy() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_no_input.groovy");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");
runner.assertValid();
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
Expand Down Expand Up @@ -369,7 +364,6 @@ public void test_record_reader_writer_access() {
@Test
public void test_filter_01() {
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;");
//runner.setProperty(proc.FAIL_STRATEGY, "rollback");

runner.assertValid();

Expand Down Expand Up @@ -566,6 +560,33 @@ public void test_sensitive_dynamic_property() {
runner.run();
}

@Test
void invalidExpressionLanguageInDynamicProperty() {
runner.setProperty("myProperty", "${myparam:isempty()}");
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "assert true == true");
runner.setProperty(ExecuteGroovyScript.FAIL_STRATEGY, ExecuteGroovyScript.TRANSFER_TO_FAILURE);
runner.assertNotValid();
}

@Test
void validExpressionLanguageInDynamicPropertyWithVariableValueWhichCannotBeUsedWithELExpression() {
runner.setProperty("myProperty", "${test:toDate('yyyy-MM-dd')}");
runner.setProperty(ExecuteGroovyScript.SCRIPT_BODY, "assert true == true");
runner.setProperty(ExecuteGroovyScript.FAIL_STRATEGY, ExecuteGroovyScript.TRANSFER_TO_FAILURE);
runner.assertValid();

runner.setEnvironmentVariableValue("test", "cannot be converted to a date");
runner.enqueue("test content".getBytes(StandardCharsets.UTF_8));

runner.run();

runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_FAILURE.getName(), 1);
final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_FAILURE).getFirst();
flowFile.assertAttributeExists(ProcessSessionWrap.ERROR_MESSAGE);
flowFile.assertAttributeExists(ProcessSessionWrap.ERROR_STACKTRACE);
assertTrue(flowFile.getAttribute(ProcessSessionWrap.ERROR_MESSAGE).contains("IllegalAttributeException"));
}

@Test
void testMigrateProperties() {
final Map<String, String> expectedRenamed = Map.ofEntries(
Expand Down
Loading