Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ subprojects {
}

test {
jvmArgs = [ "-javaagent:${configurations.agent.singleFile}" ]
jvmArgs "-javaagent:${configurations.agent.singleFile}"
}
}

Expand Down
7 changes: 7 additions & 0 deletions plugin-transform-json/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
project.description = 'Kestra Plugin Transformation for Json.'

// -Xss512k matches the constrained stack that triggered the production crash (Windows default is
// ~320 KB; 512k is slightly above the HotSpot minimum and safely above the Kestra framework needs).
// Without this, the Linux default thread stack (~8 MB) is far too large to reproduce the overflow.
test {
jvmArgs '-Xss512k'
}

jar {
manifest {
attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import lombok.experimental.SuperBuilder;

import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import io.kestra.core.models.enums.MonacoLanguages;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -32,16 +37,52 @@
public abstract class Transform<T extends Output> extends Task implements JSONataInterface, RunnableTask<T> {

private static final ObjectMapper MAPPER = JacksonMapper.ofJson();
// 4 MB: fits default maxDepth=50 × ~8 JVM frames/level with large headroom.
// Also isolates StackOverflowError inside the eval thread so the worker thread never crashes.
private static final long EVAL_THREAD_STACK_SIZE = 4 * 1024 * 1024;

@PluginProperty(language = MonacoLanguages.JAVASCRIPT, group = "advanced")
private Property<String> expression;

// Default 50: each JSONata recursion level pushes ~8 JVM frames; 256 KB worker stacks
// (~300 usable frames) overflow before maxDepth fires at 200. 50 × 8 = 400 frames — safe.
// Users needing deeper recursion should increase both this value and the JVM stack size.
@Builder.Default
private Property<Integer> maxDepth = Property.ofValue(200);
private Property<Integer> maxDepth = Property.ofValue(50);

@Getter(AccessLevel.PRIVATE)
private Jsonata parsedExpression;

// Lazy-initialized; lifecycle managed by evalExecutor() / shutdownEvalExecutor().
// Assumption: Flux pipelines in subclasses are sequential (no parallel()/publishOn).
@Getter(AccessLevel.NONE)
@ToString.Exclude
@EqualsAndHashCode.Exclude
private transient ExecutorService evalExecutor;

private ExecutorService evalExecutor() {
if (this.evalExecutor == null) {
this.evalExecutor = Executors.newSingleThreadExecutor(r -> {
var t = new Thread(null, r, "jsonata-eval", EVAL_THREAD_STACK_SIZE);
t.setDaemon(true);
return t;
});
}
return this.evalExecutor;
}

protected void shutdownEvalExecutor() {
if (this.evalExecutor != null) {
this.evalExecutor.shutdown();
try {
this.evalExecutor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
this.evalExecutor = null;
}
}

public void init(RunContext runContext) throws Exception {
var exprString = runContext.render(this.expression).as(String.class).orElseThrow();
try {
Expand All @@ -62,12 +103,43 @@ protected JsonNode evaluateExpression(RunContext runContext, JsonNode jsonNode)
var frame = this.parsedExpression.createFrame();
frame.setRuntimeBounds(timeoutInMilli, rMaxDepth);

var result = this.parsedExpression.evaluate(data, frame);
if (result == null) {
return NullNode.getInstance();
var resultRef = new AtomicReference<JsonNode>();
var errorRef = new AtomicReference<Throwable>();

// Eval runs on a dedicated executor thread (4 MB stack) that is reused across all records
// in the same task run. This serves two purposes:
// 1. Normal case: worker stack size (e.g. 256 KB on Windows) cannot constrain the evaluator.
// 2. Edge case (user sets very high maxDepth): if a StackOverflowError occurs in the eval
// thread, it is contained there. The worker thread reads the stored error and throws a
// clean RuntimeException — the worker never crashes.
// The catch is intentionally Throwable: this is a throwaway-thread sandbox, so every escape
// (including Errors like StackOverflowError and OutOfMemoryError) must land in errorRef.
// A narrower catch would let some Errors escape, leaving both refs null and producing a
// silent-null return after future.get().
var future = evalExecutor().submit(() -> {
try {
var result = this.parsedExpression.evaluate(data, frame);
resultRef.set(result != null ? MAPPER.valueToTree(result) : NullNode.getInstance());
} catch (Throwable t) {
Comment thread
Malaydewangan09 marked this conversation as resolved.
errorRef.set(t);
}
return null;
});

try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException("Failed to evaluate expression", e.getCause());
}

if (errorRef.get() != null) {
throw new RuntimeException("Failed to evaluate expression", errorRef.get());
}
return MAPPER.valueToTree(result);
} catch (JException | IllegalVariableEvaluationException e) {
return resultRef.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("JSONata evaluation interrupted", e);
} catch (IllegalVariableEvaluationException e) {
throw new RuntimeException("Failed to evaluate expression", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,40 +120,44 @@ public Output run(RunContext runContext) throws Exception {

init(runContext);

final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());

try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)) {
Flux<JsonNode> flux = FileSerde.readAll(reader, new TypeReference<>() {
});
final Path outputFilePath = runContext.workingDir().createTempFile(".ion");
try (Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(outputFilePath)))) {

// transform
Flux<JsonNode> values = flux.map(node -> this.evaluateExpression(runContext, node));

if (runContext.render(explodeArray).as(Boolean.class).orElseThrow()) {
values = values.flatMap(jsonNode -> {
if (jsonNode.isArray()) {
Iterable<JsonNode> iterable = jsonNode::elements;
return Flux.fromStream(StreamSupport.stream(iterable.spliterator(), false));
}
return Mono.just(jsonNode);
});
try {
final URI from = new URI(runContext.render(this.from).as(String.class).orElseThrow());

try (Reader reader = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)), FileSerde.BUFFER_SIZE)) {
Flux<JsonNode> flux = FileSerde.readAll(reader, new TypeReference<>() {
});
final Path outputFilePath = runContext.workingDir().createTempFile(".ion");
try (Writer writer = new BufferedWriter(new OutputStreamWriter(Files.newOutputStream(outputFilePath)))) {

// transform
Flux<JsonNode> values = flux.map(node -> this.evaluateExpression(runContext, node));

if (runContext.render(explodeArray).as(Boolean.class).orElseThrow()) {
values = values.flatMap(jsonNode -> {
if (jsonNode.isArray()) {
Iterable<JsonNode> iterable = jsonNode::elements;
return Flux.fromStream(StreamSupport.stream(iterable.spliterator(), false));
}
return Mono.just(jsonNode);
});
}

Long processedItemsTotal = FileSerde.writeAll(writer, values).block();

URI uri = runContext.storage().putFile(outputFilePath.toFile());

// output
return Output
.builder()
.uri(uri)
.processedItemsTotal(processedItemsTotal)
.build();
} finally {
Files.deleteIfExists(outputFilePath); // ensure temp file is deleted in case of error
}

Long processedItemsTotal = FileSerde.writeAll(writer, values).block();

URI uri = runContext.storage().putFile(outputFilePath.toFile());

// output
return Output
.builder()
.uri(uri)
.processedItemsTotal(processedItemsTotal)
.build();
} finally {
Files.deleteIfExists(outputFilePath); // ensure temp file is deleted in case of error
}
} finally {
shutdownEvalExecutor();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,17 @@ public class TransformValue extends Transform<TransformValue.Output> implements
public Output run(RunContext runContext) throws Exception {
init(runContext);

final JsonNode from = parseJson(runContext.render(this.from).as(String.class).orElseThrow());
try {
final JsonNode from = parseJson(runContext.render(this.from).as(String.class).orElseThrow());

// transform
JsonNode transformed = evaluateExpression(runContext, from);
// transform
JsonNode transformed = evaluateExpression(runContext, from);

// output
return Output.builder().value(transformed).build();
// output
return Output.builder().value(transformed).build();
} finally {
shutdownEvalExecutor();
}
}

private static JsonNode parseJson(String from) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,38 @@ void shouldGetSingleRecordForValidExprReturningArrayGivenExplodeFalse() throws E
Assertions.assertEquals(2, transformationResult.getFirst().size());
}

@Test
void shouldReuseEvalThreadAcrossRecords() throws Exception {
// Verifies executor reuse: after run() completes, awaitTermination in shutdownEvalExecutor()
// guarantees the jsonata-eval thread is gone. If the old per-call new Thread() approach were
// used, 3 threads would be started and could still be alive briefly, making liveAfter > 0
// probabilistically — so this assertion is a reliable regression guard.
RunContext runContext = runContextFactory.of();
final Path outputFilePath = runContext.workingDir().createTempFile(".ion");
try (final Writer writer = new OutputStreamWriter(Files.newOutputStream(outputFilePath))) {
FileSerde.writeAll(writer, Flux.just(
Map.of("v", 1),
Map.of("v", 2),
Map.of("v", 3)
)).block();
writer.flush();
}
URI uri = runContext.storage().putFile(outputFilePath.toFile());

TransformItems task = TransformItems.builder()
.from(Property.ofValue(uri.toString()))
.expression(Property.ofValue("$"))
.build();

task.run(runContext);

long liveAfter = Thread.getAllStackTraces().keySet().stream()
.filter(t -> "jsonata-eval".equals(t.getName()))
.count();

Assertions.assertEquals(0, liveAfter, "jsonata-eval thread should be terminated after run()");
}

@Test
void shouldTransformJsonInputWithDefaultIonMapper() throws Exception {
// Given
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.transform.jsonata;

import com.dashjoin.jsonata.JException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.kestra.core.junit.annotations.KestraTest;
Expand All @@ -9,8 +10,11 @@
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

@KestraTest
class TransformValueTest {
Expand Down Expand Up @@ -133,4 +137,93 @@ void shouldHandleNestedArrayExpressionFromIssue40() throws Exception {
assertThat(result.get(1).isArray()).isTrue();
assertThat(result.get(1).get(2).asText()).isEqualTo("8796977843745/8796995341857/8796999765537");
}

// Regression tests for StackOverflow protection in evaluateExpression().
//
// Root cause: each JSONata recursion level pushes ~8 JVM frames. On 256 KB worker stacks
// (~300 usable frames), even maxDepth=200 allows 200 × 8 = 1600 frames — far past overflow.
//
// Fix (two layers):
// 1. Default maxDepth lowered to 50 (50 × 8 = 400 frames — safe on 256 KB stacks).
// Bounds check fires and throws JException before any stack risk.
// 2. Evaluation runs on a dedicated thread with a 4 MB stack. If the user sets a high
// maxDepth that allows overflow, the StackOverflowError is caught as Throwable inside
// the throwaway eval thread. The worker thread reads the stored error and throws a
// clean RuntimeException — the worker never crashes.
//
// Production crash: Windows worker default stack ~256 KB, crashed at depth=999.
// Test JVM is pinned to -Xss512k (see build.gradle).
// "+ 0" makes the expression non-tail-recursive, preventing TCO, so frames stay live.

@ParameterizedTest
@ValueSource(ints = {50, 200, 500, 1000})
void shouldNeverThrowStackOverflowForCommonMaxDepthValues(int maxDepth) throws Exception {
// Each maxDepth value runs on a 4 MB eval thread. The bounds check fires at maxDepth
// (JException) well before the stack could overflow, regardless of worker stack size.
RunContext runContext = runContextFactory.of();
TransformValue task = TransformValue.builder()
.from(Property.ofValue("{}"))
.expression(Property.ofValue(
"($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(10000))"
))
.maxDepth(Property.ofValue(maxDepth))
.build();

assertThatThrownBy(() -> task.run(runContext))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to evaluate expression")
.hasCauseInstanceOf(JException.class);
}

@Test
void shouldContinueWorkingAfterStackOverflowError() throws Exception {
// Validates that a StackOverflowError in one run does not poison the executor or the task.
// Each call to run() creates a fresh executor (via init + shutdownEvalExecutor in finally),
// so the second run always gets a clean state.
RunContext runContext = runContextFactory.of();

TransformValue taskWithHighDepth = TransformValue.builder()
.from(Property.ofValue("{}"))
.expression(Property.ofValue(
"($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(49999))"
))
.maxDepth(Property.ofValue(50000))
.build();

assertThatThrownBy(() -> taskWithHighDepth.run(runContext))
.isInstanceOf(RuntimeException.class)
.hasCauseInstanceOf(StackOverflowError.class);

// Second run with a simple expression must succeed — no lingering poisoned state.
RunContext runContext2 = runContextFactory.of();
TransformValue simpleTask = TransformValue.builder()
.from(Property.ofValue("{\"x\": 42}"))
.expression(Property.ofValue("x"))
.build();

TransformValue.Output output = simpleTask.run(runContext2);
assertThat(output.getValue()).isNotNull();
assertThat(output.getValue().toString()).isEqualTo("42");
}

@Test
void shouldIsolateStackOverflowInEvalThreadWhenMaxDepthExceedsStackCapacity() throws Exception {
// User sets maxDepth high enough that bounds check never fires before stack exhaustion.
// On 4 MB eval thread (~40k safe levels), $f(49999) overflows the eval thread.
// StackOverflowError is caught as Throwable inside the eval thread; worker thread gets
// a clean RuntimeException instead of crashing.
RunContext runContext = runContextFactory.of();
TransformValue task = TransformValue.builder()
.from(Property.ofValue("{}"))
.expression(Property.ofValue(
"($f := function($n) { $n > 0 ? $f($n - 1) + 0 : 0 }; $f(49999))"
))
.maxDepth(Property.ofValue(50000))
.build();

assertThatThrownBy(() -> task.run(runContext))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to evaluate expression")
.hasCauseInstanceOf(StackOverflowError.class);
}
}
Loading