NIFI-14861: Add EnrichGraphRecord processor#10208
NIFI-14861: Add EnrichGraphRecord processor#10208mattyb149 wants to merge 6 commits intoapache:mainfrom
Conversation
exceptionfactory
left a comment
There was a problem hiding this comment.
@mattyb149 Given the lack of interaction on this pull request over the last number of months, it seems like it would be best to close it for now and revisit if there is interest on the Jira issue.
|
I am actively working it and have a reviewer (although they are not a committer), I will get this updated this week. Please keep it open. |
|
Thanks for the update, if there are comments incoming, that is good to know. However, if there are not any current committers engaged, it doesn't seem like it will be in a position to go forward. I will leave it untouched for now. |
hleonps
left a comment
There was a problem hiding this comment.
Thank you @mattyb149 for working on this.
Most of the new processor is working fine during my tests, but I found some inconsistency with the mapping. Added a couple of suggestions.
Let me know if you need more details.
| // Add all dynamic properties at the top level except the identifier field | ||
| List<String> fieldNames = record.getSchema().getFieldNames(); | ||
| for (String fieldName : fieldNames) { | ||
| if (fieldName.equals(identifierField)) { |
There was a problem hiding this comment.
This condition won´t meet due the identifierField contains the raw value set in the processor's properties (RecordPath)
| if (RecordFieldType.ARRAY.equals(rawValueType)) { | ||
| DataType arrayElementType = ((ArrayDataType) rawDataType).getElementType(); | ||
| if (RecordFieldType.RECORD.getDataType().equals(arrayElementType)) { | ||
| Object[] rawValueArray = (Object[]) rawValue; | ||
| Object[] mappedValueArray = new Object[rawValueArray.length]; | ||
| for (int i = 0; i < rawValueArray.length; i++) { | ||
| MapRecord mapRecord = (MapRecord) rawValueArray[i]; | ||
| mappedValueArray[i] = mapRecord.toMap(true); | ||
| } | ||
| dynamicPropertyMap.put(fieldName, mappedValueArray); | ||
| } | ||
| } else if (RecordFieldType.RECORD.equals(rawValueType)) { | ||
| MapRecord mapRecord = (MapRecord) rawValue; | ||
| dynamicPropertyMap.put(fieldName, mapRecord.toMap(true)); | ||
| } else if (RecordFieldType.STRING.equals(rawValueType)) { | ||
| // Escape single quotes | ||
| String stringValue = (String) rawValue; | ||
| if (rawValue != null) { | ||
| stringValue = stringValue.replace('\'', '\\'); | ||
| dynamicPropertyMap.put(fieldName, stringValue); | ||
| } | ||
| } else { | ||
| dynamicPropertyMap.put(fieldName, rawValue); | ||
| } |
There was a problem hiding this comment.
The mapping is working fine, but there are combinations where it might not work. For example: if the ARRAY type is not at top level attribute (i.e the array is inside a record), it won't map correctly.
I suggest to move the mapping block to its own method and then process each attribute recursively like a tree. This way each nested attribute is mapped as expected.
| throw new IOException("Dynamic property field(s) not found in record (check the RecordPath Expression), sending this record to failure"); | ||
| } | ||
|
|
||
| dynamicPropertyMap.put(entry, propertyValues.getFirst().getValue()); |
There was a problem hiding this comment.
Add call to the suggested mapping method for consistent results.
Summary
NIFI-14861 This PR adds an EnrichGraphRecord processor that uses records to match nodes or edges and set properties from record fields on those components. This processor requires no Graph Query Language knowledge unlike ExecuteGraphQuery(Record).
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation