Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
482 changes: 482 additions & 0 deletions PRDs/20251217-parallel-constraint-validation/PRD.md

Large diffs are not rendered by default.

1,377 changes: 1,377 additions & 0 deletions PRDs/20251217-parallel-constraint-validation/implementation-plan.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class DynamicContext { // NOPMD - intentional data class
private final SharedState sharedState;
@Nullable
private final FocusContext focusContext;
@NonNull
private final Deque<IExpression> executionStack;

/**
* Construct a new dynamic context with a default static context.
Expand All @@ -72,6 +74,7 @@ public DynamicContext(@NonNull StaticContext staticContext) {
this.letVariableMap = new ConcurrentHashMap<>();
this.sharedState = new SharedState(staticContext);
this.focusContext = null;
this.executionStack = new ArrayDeque<>();
}

private DynamicContext(@NonNull DynamicContext context) {
Expand All @@ -82,6 +85,8 @@ private DynamicContext(@NonNull DynamicContext context, @Nullable FocusContext f
this.letVariableMap = new ConcurrentHashMap<>(context.letVariableMap);
this.sharedState = context.sharedState;
this.focusContext = focusContext;
// Copy parent's stack so error traces show full call chain
this.executionStack = new ArrayDeque<>(context.executionStack);
}

private static class SharedState {
Expand All @@ -98,8 +103,6 @@ private static class SharedState {
@NonNull
private final IMutableConfiguration<MetapathEvaluationFeature<?>> configuration;
@NonNull
private final Deque<IExpression> executionStack = new ArrayDeque<>();
@NonNull
private ZoneId implicitTimeZone;

public SharedState(@NonNull StaticContext staticContext) {
Expand All @@ -110,7 +113,7 @@ public SharedState(@NonNull StaticContext staticContext) {
this.implicitTimeZone = ObjectUtils.notNull(clock.getZone());

this.currentDateTime = ObjectUtils.notNull(ZonedDateTime.now(clock));
this.availableDocuments = new HashMap<>();
this.availableDocuments = new ConcurrentHashMap<>();
this.functionResultCache = ObjectUtils.notNull(Caffeine.newBuilder()
.maximumSize(5000)
.expireAfterAccess(10, TimeUnit.MINUTES)
Expand Down Expand Up @@ -416,7 +419,7 @@ public DynamicContext bindVariableValue(@NonNull IEnhancedQName name, @NonNull I
* the expression to push
*/
public void pushExecutionStack(@NonNull IExpression expression) {
this.sharedState.executionStack.push(expression);
this.executionStack.push(expression);
}

/**
Expand All @@ -426,7 +429,7 @@ public void pushExecutionStack(@NonNull IExpression expression) {
* the expected expression to be popped
*/
public void popExecutionStack(@NonNull IExpression expression) {
IExpression popped = this.sharedState.executionStack.pop();
IExpression popped = this.executionStack.pop();
if (!expression.equals(popped)) {
throw new IllegalStateException("Popped expression does not match expected expression");
}
Expand All @@ -439,7 +442,7 @@ public void popExecutionStack(@NonNull IExpression expression) {
*/
@NonNull
public Deque<IExpression> getExecutionStack() {
return new ArrayDeque<>(this.sharedState.executionStack);
return new ArrayDeque<>(this.executionStack);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import gov.nist.secauto.metaschema.core.metapath.item.node.IDefinitionNodeItem;
import gov.nist.secauto.metaschema.core.metapath.item.node.IFieldNodeItem;
import gov.nist.secauto.metaschema.core.metapath.item.node.IFlagNodeItem;
import gov.nist.secauto.metaschema.core.metapath.item.node.IModelNodeItem;
import gov.nist.secauto.metaschema.core.metapath.item.node.IModuleNodeItem;
import gov.nist.secauto.metaschema.core.metapath.item.node.INodeItem;
import gov.nist.secauto.metaschema.core.model.IAssemblyDefinition;
Expand All @@ -36,11 +37,13 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -51,7 +54,8 @@
/**
* Used to perform constraint validation over one or more node items.
* <p>
* This class is not thread safe.
* This class is thread-safe and can be used with parallel constraint
* validation.
*/
@SuppressWarnings({
"PMD.CouplingBetweenObjects",
Expand All @@ -62,7 +66,7 @@ public class DefaultConstraintValidator
private static final Logger LOGGER = LogManager.getLogger(DefaultConstraintValidator.class);

@NonNull
private final Map<INodeItem, ValueStatus> valueMap = new LinkedHashMap<>(); // NOPMD - intentional
private final Map<INodeItem, ValueStatus> valueMap = new ConcurrentHashMap<>();
@NonNull
private final Map<String, IIndex> indexNameToIndexMap = new ConcurrentHashMap<>();
@NonNull
Expand All @@ -71,17 +75,34 @@ public class DefaultConstraintValidator
private final IConstraintValidationHandler handler;
@NonNull
private final IMutableConfiguration<ValidationFeature<?>> configuration;
@NonNull
private final ParallelValidationConfig parallelConfig;

/**
* Construct a new constraint validation instance.
* Construct a new constraint validation instance with sequential execution.
*
* @param handler
* the validation handler to use for handling constraint violations
*/
public DefaultConstraintValidator(
@NonNull IConstraintValidationHandler handler) {
this(handler, ParallelValidationConfig.SEQUENTIAL);
}

/**
* Construct a new constraint validation instance with configurable parallelism.
*
* @param handler
* the validation handler to use for handling constraint violations
* @param parallelConfig
* the parallel execution configuration
*/
public DefaultConstraintValidator(
@NonNull IConstraintValidationHandler handler,
@NonNull ParallelValidationConfig parallelConfig) {
this.handler = handler;
this.configuration = new DefaultConfiguration<>();
this.parallelConfig = parallelConfig;
}

/**
Expand Down Expand Up @@ -667,11 +688,11 @@ private void validateIndexHasKey(
@NonNull ISequence<? extends INodeItem> targets) {
String indexName = constraint.getIndexName();

List<KeyRef> keyRefItems = indexNameToKeyRefMap.get(indexName);
if (keyRefItems == null) {
keyRefItems = new LinkedList<>();
indexNameToKeyRefMap.put(indexName, keyRefItems);
}
// Use computeIfAbsent for thread-safe lazy initialization
// The list is wrapped in synchronizedList to ensure thread-safe add operations
List<KeyRef> keyRefItems = indexNameToKeyRefMap.computeIfAbsent(
indexName,
k -> Collections.synchronizedList(new ArrayList<>()));

keyRefItems.add(new KeyRef(constraint, node, new ArrayList<>(targets)));
}
Expand Down Expand Up @@ -831,14 +852,8 @@ protected void updateValueStatus(
@NonNull INodeItem targetItem,
@NonNull IAllowedValuesConstraint allowedValues,
@NonNull IDefinitionNodeItem<?, ?> node) throws ConstraintValidationException {
// constraint.getAllowedValues().containsKey(value)

@Nullable
ValueStatus valueStatus = valueMap.get(targetItem);
if (valueStatus == null) {
valueStatus = new ValueStatus(targetItem);
valueMap.put(targetItem, valueStatus);
}
// Use computeIfAbsent for thread-safe lazy initialization
ValueStatus valueStatus = valueMap.computeIfAbsent(targetItem, ValueStatus::new);

valueStatus.registerAllowedValues(allowedValues, node);
}
Expand Down Expand Up @@ -927,23 +942,38 @@ private void validateKeyRef(
}
}

@SuppressWarnings("PMD.AvoidUsingVolatile") // Required for thread-safe visibility across threads
private class ValueStatus {
@NonNull
private final List<Pair<IAllowedValuesConstraint, IDefinitionNodeItem<?, ?>>> constraints = new LinkedList<>();
private final List<Pair<IAllowedValuesConstraint, IDefinitionNodeItem<?, ?>>> constraints
= Collections.synchronizedList(new ArrayList<>());
@NonNull
private final String value;
@NonNull
private final INodeItem item;
private boolean allowOthers = true;
private volatile boolean allowOthers = true;
@NonNull
private IAllowedValuesConstraint.Extensible extensible = IAllowedValuesConstraint.Extensible.EXTERNAL;
private volatile IAllowedValuesConstraint.Extensible extensible = IAllowedValuesConstraint.Extensible.EXTERNAL;

public ValueStatus(@NonNull INodeItem item) {
this.item = item;
this.value = item.toAtomicItem().asString();
}

public void registerAllowedValues(
/**
* Register allowed values constraint for this item.
* <p>
* This method is synchronized to ensure thread-safe updates to the
* extensibility and allowOthers state.
*
* @param allowedValues
* the allowed values constraint
* @param node
* the definition node
* @throws ConstraintValidationException
* if constraint registration fails
*/
public synchronized void registerAllowedValues(
@NonNull IAllowedValuesConstraint allowedValues,
@NonNull IDefinitionNodeItem<?, ?> node) throws ConstraintValidationException {
IAllowedValuesConstraint.Extensible newExtensible = allowedValues.getExtensible();
Expand Down Expand Up @@ -983,11 +1013,19 @@ public void registerAllowedValues(
}

public void validate(@NonNull DynamicContext dynamicContext) {
if (!constraints.isEmpty()) {
// Take a snapshot of the state for thread-safe validation
final boolean localAllowOthers;
final List<Pair<IAllowedValuesConstraint, IDefinitionNodeItem<?, ?>>> localConstraints;
synchronized (this) {
localAllowOthers = this.allowOthers;
localConstraints = new ArrayList<>(this.constraints);
}

if (!localConstraints.isEmpty()) {
boolean match = false;
List<IAllowedValuesConstraint> failedConstraints = new LinkedList<>();
List<IAllowedValuesConstraint> failedConstraints = new ArrayList<>();
IConstraintValidationHandler handler = getConstraintValidationHandler();
for (Pair<IAllowedValuesConstraint, IDefinitionNodeItem<?, ?>> pair : constraints) {
for (Pair<IAllowedValuesConstraint, IDefinitionNodeItem<?, ?>> pair : localConstraints) {
IAllowedValuesConstraint allowedValues = pair.getLeft();
IDefinitionNodeItem<?, ?> node = ObjectUtils.notNull(pair.getRight());
IAllowedValue matchingValue = allowedValues.getAllowedValue(value);
Expand All @@ -1005,7 +1043,7 @@ public void validate(@NonNull DynamicContext dynamicContext) {
}

// it's not a failure if allow others is true
if (!match && !allowOthers) {
if (!match && !localAllowOthers) {
handler.handleAllowedValuesViolation(failedConstraints, item, dynamicContext);
}
}
Expand All @@ -1015,6 +1053,11 @@ public void validate(@NonNull DynamicContext dynamicContext) {
class Visitor
extends AbstractNodeItemVisitor<DynamicContext, Void> {

/**
* Minimum number of model children required to enable parallel traversal.
*/
private static final int PARALLEL_THRESHOLD = 4;

@NonNull
private DynamicContext handleLetStatements(
@NonNull INodeItem focus,
Expand Down Expand Up @@ -1079,10 +1122,88 @@ public Void visitAssembly(@NonNull IAssemblyNodeItem item, DynamicContext contex
} catch (ConstraintValidationException ex) {
throw ExceptionUtils.wrap(ex);
}
super.visitAssembly(item, effectiveContext);

// Parallel or sequential child traversal
if (parallelConfig.isParallel() && shouldParallelize(item)) {
visitFlags(item, effectiveContext);
visitChildrenParallel(item, effectiveContext);
} else {
super.visitAssembly(item, effectiveContext);
}

return null;
}

/**
* Check if the item has enough children to benefit from parallel traversal.
*
* @param item
* the assembly item to check
* @return true if the item has at least PARALLEL_THRESHOLD model children
*/
private boolean shouldParallelize(@NonNull IAssemblyNodeItem item) {
return item.modelItems().count() >= PARALLEL_THRESHOLD;
}

/**
* Visit model children in parallel using the configured executor.
*
* @param item
* the parent assembly item
* @param context
* the dynamic context
*/
private void visitChildrenParallel(
@NonNull IAssemblyNodeItem item,
@NonNull DynamicContext context) {

ExecutorService executor = parallelConfig.getExecutor();
List<? extends IModelNodeItem<?, ?>> children = item.modelItems()
.collect(Collectors.toList());

List<Future<?>> futures = new ArrayList<>(children.size());
for (IModelNodeItem<?, ?> child : children) {
futures.add(executor.submit(() -> {
// Each parallel task gets its own subContext for isolated execution stack
DynamicContext childContext = context.subContext();
child.accept(this, childContext);
return null;
}));
}

// Wait for all children and propagate exceptions
try {
for (Future<?> future : futures) {
future.get();
}
} catch (ExecutionException e) {
cancelRemainingFutures(futures);
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw ExceptionUtils.wrap(new ConstraintValidationException("Error during parallel validation", cause));
} catch (InterruptedException e) {
cancelRemainingFutures(futures);
Thread.currentThread().interrupt();
throw ExceptionUtils.wrap(new ConstraintValidationException("Validation interrupted", e));
}
}

/**
* Cancel any futures that are still running.
*
* @param futures
* the list of futures to cancel
*/
private void cancelRemainingFutures(@NonNull List<Future<?>> futures) {
for (Future<?> future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
}

@Override
public Void visitMetaschema(@NonNull IModuleNodeItem item, DynamicContext context) {
throw new UnsupportedOperationException("Method not used.");
Expand Down
Loading
Loading