Skip to content

NIFI-14861: Add EnrichGraphRecord processor#10208

Open
mattyb149 wants to merge 6 commits intoapache:mainfrom
mattyb149:NIFI-14861
Open

NIFI-14861: Add EnrichGraphRecord processor#10208
mattyb149 wants to merge 6 commits intoapache:mainfrom
mattyb149:NIFI-14861

Conversation

@mattyb149
Copy link
Contributor

Summary

NIFI-14861 This PR adds an EnrichGraphRecord processor that uses records to match nodes or edges and set properties from record fields on those components. This processor requires no Graph Query Language knowledge unlike ExecuteGraphQuery(Record).

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattyb149 Given the lack of interaction on this pull request over the last number of months, it seems like it would be best to close it for now and revisit if there is interest on the Jira issue.

@mattyb149
Copy link
Contributor Author

I am actively working it and have a reviewer (although they are not a committer), I will get this updated this week. Please keep it open.

@exceptionfactory
Copy link
Contributor

Thanks for the update, if there are comments incoming, that is good to know. However, if there are not any current committers engaged, it doesn't seem like it will be in a position to go forward. I will leave it untouched for now.

Copy link

@hleonps hleonps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mattyb149 for working on this.

Most of the new processor is working fine during my tests, but I found some inconsistency with the mapping. Added a couple of suggestions.

Let me know if you need more details.

// Add all dynamic properties at the top level except the identifier field
List<String> fieldNames = record.getSchema().getFieldNames();
for (String fieldName : fieldNames) {
if (fieldName.equals(identifierField)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition won´t meet due the identifierField contains the raw value set in the processor's properties (RecordPath)

Comment on lines +276 to +299
if (RecordFieldType.ARRAY.equals(rawValueType)) {
DataType arrayElementType = ((ArrayDataType) rawDataType).getElementType();
if (RecordFieldType.RECORD.getDataType().equals(arrayElementType)) {
Object[] rawValueArray = (Object[]) rawValue;
Object[] mappedValueArray = new Object[rawValueArray.length];
for (int i = 0; i < rawValueArray.length; i++) {
MapRecord mapRecord = (MapRecord) rawValueArray[i];
mappedValueArray[i] = mapRecord.toMap(true);
}
dynamicPropertyMap.put(fieldName, mappedValueArray);
}
} else if (RecordFieldType.RECORD.equals(rawValueType)) {
MapRecord mapRecord = (MapRecord) rawValue;
dynamicPropertyMap.put(fieldName, mapRecord.toMap(true));
} else if (RecordFieldType.STRING.equals(rawValueType)) {
// Escape single quotes
String stringValue = (String) rawValue;
if (rawValue != null) {
stringValue = stringValue.replace('\'', '\\');
dynamicPropertyMap.put(fieldName, stringValue);
}
} else {
dynamicPropertyMap.put(fieldName, rawValue);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mapping is working fine, but there are combinations where it might not work. For example: if the ARRAY type is not at top level attribute (i.e the array is inside a record), it won't map correctly.

I suggest to move the mapping block to its own method and then process each attribute recursively like a tree. This way each nested attribute is mapped as expected.

throw new IOException("Dynamic property field(s) not found in record (check the RecordPath Expression), sending this record to failure");
}

dynamicPropertyMap.put(entry, propertyValues.getFirst().getValue());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add call to the suggested mapping method for consistent results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants