Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 227 additions & 21 deletions config/schema/artifacts/datastore_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1236,6 +1236,9 @@ index_templates:
type: integer
nested_fields2|the_seasons:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1310,6 +1313,9 @@ index_templates:
type: integer
widget_options|colors:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1476,6 +1482,9 @@ index_templates:
type: integer
fees|amount_cents:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1527,6 +1536,9 @@ indices:
type: integer
shapes|coordinates:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1593,6 +1605,9 @@ indices:
type: integer
owner_ids:
type: integer
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1625,6 +1640,9 @@ indices:
type: keyword
__typename:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1654,6 +1672,9 @@ indices:
type: integer
manufacturer_id:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1690,6 +1711,9 @@ indices:
type: keyword
nationality:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1722,6 +1746,9 @@ indices:
type: keyword
manufacturer_id:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1753,6 +1780,9 @@ indices:
type: keyword
__typename:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand All @@ -1778,6 +1808,9 @@ indices:
format: strict_date
active:
type: boolean
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand All @@ -1803,6 +1836,9 @@ indices:
type: keyword
name:
type: keyword
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -1835,6 +1871,9 @@ indices:
created_at:
type: date
format: strict_date_time
__nested_sourced_data:
type: object
dynamic: 'false'
__sources:
type: keyword
__versions:
Expand Down Expand Up @@ -2156,13 +2195,116 @@ scripts:

// No timestamp values matched the params, so return `false`.
return false;
update_index_data_b9e2b105d736d8d16ae269ab6ff81e4d:
update_index_data_079bafcf4d739acd8659a631377fa9c8:
context: update
script:
lang: painless
source: |-
// --- Helper Functions --- //
void setup(Map source, String relationship, Map counts) {
// ============================================================
// Helper Functions
// ============================================================

// Encodes a list of strings into a length-prefixed string.
// Each part is encoded as "length:value" and parts are concatenated directly.
// This encoding is unambiguous regardless of what characters the values contain.
String encodeKey(List parts) {
StringBuilder sb = new StringBuilder();
for (String part : parts) {
sb.append(part.length());
sb.append(':');
sb.append(part);
}
return sb.toString();
}

// Decodes a length-prefixed string back into a list of strings.
List decodeKey(String key) {
List parts = new ArrayList();
int i = 0;
while (i < key.length()) {
int colonPos = key.indexOf(":", i);
int length = Integer.parseInt(key.substring(i, colonPos));
int valueStart = colonPos + 1;
parts.add(key.substring(valueStart, valueStart + length));
i = valueStart + length;
}
return parts;
}

// Builds a nested element key (as an encoded string) from path segments.
// List segments contribute their matched identifier value; object segments contribute their field name.
// Returns "" if no path segments are configured (i.e., this is not a nested sourced event).
String buildNestedElementKey(String relationship, Map nestedSourcedPaths, Map pathIdentifiers) {
List pathSegments = (List) nestedSourcedPaths.get(relationship);
if (pathSegments == null) {
return "";
}
List parts = new ArrayList();
for (Map segment : pathSegments) {
if ("list".equals(segment.get("type"))) {
parts.add(pathIdentifiers[segment.sourceField]);
} else {
parts.add(segment.get("field"));
}
}
return encodeKey(parts);
}

// Builds the versions key by combining the relationship name with the element key parts.
// For top-level events (empty element key), returns just the relationship name.
String buildVersionsKey(String relationship, String nestedElementKey) {
if (nestedElementKey.isEmpty()) {
return relationship;
}
List parts = decodeKey(nestedElementKey);
parts.add(0, relationship);
return encodeKey(parts);
}

// Finds an element in a list where element[matchField] equals matchValue. Returns null if not found.
def findInList(List elements, String matchField, String matchValue) {
for (Map element : elements) {
if (matchValue.equals(element[matchField])) {
return element;
}
}
return null;
}

// Navigates from `source` through `pathSegments` to find the target nested element.
// Returns the matched element, or null if the path doesn't exist or no match is found.
def navigateToNestedElement(Map source, List pathSegments, List keyParts) {
Map current = source;

for (int i = 0; i < pathSegments.size(); i++) {
Map segment = (Map) pathSegments.get(i);
String field = (String) segment.get("field");

if (!current.containsKey(field)) {
return null;
}

if ("list".equals(segment.get("type"))) {
current = (Map) findInList((List) current.get(field), (String) segment.get("matchField"), (String) keyParts.get(i));
} else {
current = (Map) current.get(field);
}

if (current == null) {
return null;
}
}

return current;
}


// ============================================================
// Main Functions
// ============================================================

// Initializes internal bookkeeping structures (__sources, __versions, __counts, __nested_sourced_data).
void setup(Map source, String versionsKey, String relationship, String nestedElementKey, Map counts) {
if (source.__sources == null) {
source.__sources = [];
}
Expand All @@ -2171,32 +2313,42 @@ scripts:
source.__versions = [:];
}

if (source.__versions[relationship] == null) {
source.__versions[relationship] = [:];
if (source.__versions[versionsKey] == null) {
source.__versions[versionsKey] = [:];
}

if (!nestedElementKey.isEmpty()) {
if (source.__nested_sourced_data == null) {
source.__nested_sourced_data = [:];
}
if (source.__nested_sourced_data[relationship] == null) {
source.__nested_sourced_data[relationship] = [:];
}
}

if (counts != null && source.__counts == null) {
source.__counts = [:];
}
}

void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion) {
Map relationshipVersionsMap = source.__versions.get(relationship);
List previousSourceIdsForRelationship = relationshipVersionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList());
// Validates that this event is allowed: no relationship mutation and no stale version.
void validateSource(Map source, String id, String relationship, String sourceId, long eventVersion, String versionsKey) {
Map versionsMap = source.__versions[versionsKey];

if (previousSourceIdsForRelationship.size() > 0) {
// Check that no other source ID has previously written to this target.
List previousSourceIds = versionsMap.keySet().stream().filter(key -> key != sourceId).collect(Collectors.toList());
if (previousSourceIds.size() > 0) {
throw new IllegalArgumentException(
"Cannot update document " + id + " " +
"with data from related " + relationship + " " + sourceId + " " +
"because the related " + relationship + " has apparently changed (was: " + previousSourceIdsForRelationship + "), " +
"because the related " + relationship + " has apparently changed (was: " + previousSourceIds + "), " +
"but mutations of relationships used with `sourced_from` are not supported because " +
"allowing it could break ElasticGraph's out-of-order processing guarantees."
);
);
}

Number maybeDocVersion = relationshipVersionsMap.get(sourceId);

// Our JSON schema requires event versions to be non-negative, so we can safely use Long.MIN_VALUE as a stand-in when the value is null.
// Check that the event version is newer than what we've already seen.
Number maybeDocVersion = versionsMap.get(sourceId);
long docVersion = maybeDocVersion == null ? Long.MIN_VALUE : maybeDocVersion.longValue();

if (docVersion >= eventVersion) {
Expand All @@ -2207,6 +2359,7 @@ scripts:
}
}

// Applies top-level fields to the document via putAll, and merges __counts.
void applyTopLevelFields(Map source, String id, Map topLevelFields, Map counts) {
source.id = id;
source.putAll(topLevelFields);
Expand All @@ -2216,8 +2369,50 @@ scripts:
}
}

void recordSource(Map source, String relationship, String sourceId, long eventVersion) {
source.__versions[relationship][sourceId] = eventVersion;
// Stores nested sourced fields in the __nested_sourced_data buffer for later application.
void storeNestedSourcedData(Map source, String relationship, Map nestedSourcedFields, String nestedElementKey) {
if (nestedSourcedFields.isEmpty()) {
return;
}

((Map) source.__nested_sourced_data[relationship]).put(nestedElementKey, nestedSourcedFields);
}

// Applies nested sourced data from the __nested_sourced_data buffer to matched nested elements.
// Reads path config from the nestedSourcedPaths param.
// Called after every event so that after a self-event's putAll overwrites nested arrays,
// the buffered data gets re-applied.
void applyNestedSourcedData(Map source, Map nestedSourcedPaths) {
if (source.__nested_sourced_data == null) {
return;
}

for (sourcedEntry in source.__nested_sourced_data.entrySet()) {
String relationship = (String) sourcedEntry.getKey();
Map dataByKey = (Map) sourcedEntry.getValue();
List pathSegments = (List) nestedSourcedPaths.get(relationship);

if (pathSegments == null || dataByKey == null) {
continue;
}

for (elementEntry in dataByKey.entrySet()) {
List keyParts = decodeKey((String) elementEntry.getKey());
if (keyParts.size() != pathSegments.size()) {
continue;
}

Map target = (Map) navigateToNestedElement(source, pathSegments, keyParts);
if (target != null) {
target.putAll((Map) elementEntry.getValue());
}
}
}
}

// Records the event version in __versions and adds the relationship to __sources.
void recordSource(Map source, String versionsKey, String relationship, String sourceId, long eventVersion) {
source.__versions[versionsKey][sourceId] = eventVersion;

// Record the relationship in `__sources` if it's not already there. We maintain it as an append-only set using a sorted list.
// This ensures deterministic ordering of its elements regardless of event ingestion order, and lets us check membership in O(log N) time.
Expand All @@ -2234,15 +2429,26 @@ scripts:
}
}

// --- Main script body --- //
// ============================================================
// Main Execution
// ============================================================

Map source = ctx._source;
String id = params.id;
String relationship = params.relationship;
String sourceId = params.sourceId;
long eventVersion = (long) params.version; // Cast to long since JSON parses numbers as doubles
long eventVersion = (long) params.version;
Map counts = params.__counts;
Map nestedSourcedFields = params.nestedSourcedFields;
Map nestedSourcedPathIdentifiers = params.nestedSourcedPathIdentifiers;
Map nestedSourcedPaths = params.nestedSourcedPaths;

String nestedElementKey = buildNestedElementKey(relationship, nestedSourcedPaths, nestedSourcedPathIdentifiers);
String versionsKey = buildVersionsKey(relationship, nestedElementKey);

setup(source, relationship, counts);
validateSource(source, id, relationship, sourceId, eventVersion);
setup(source, versionsKey, relationship, nestedElementKey, counts);
validateSource(source, id, relationship, sourceId, eventVersion, versionsKey);
applyTopLevelFields(source, id, params.topLevelFields, counts);
recordSource(source, relationship, sourceId, eventVersion);
storeNestedSourcedData(source, relationship, nestedSourcedFields, nestedElementKey);
applyNestedSourcedData(source, nestedSourcedPaths);
recordSource(source, versionsKey, relationship, sourceId, eventVersion);
Loading
Loading