Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ jobs:
with:
skip-test: ${{ github.event.inputs.skip-test == 'true' }}
kestra-version: ${{ github.event.inputs.kestra-version }}
java-version: '25'
secrets: inherit
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ tasks.withType(JavaCompile) {
options.compilerArgs.add("-parameters")
}

final targetJavaVersion = JavaVersion.VERSION_21
final targetJavaVersion = JavaVersion.VERSION_25
group "io.kestra.plugin"

allprojects {
Expand Down Expand Up @@ -121,6 +121,7 @@ subprojects {
testImplementation group: "io.kestra", name: "repository-memory", version: kestraVersion
testImplementation group: "io.kestra", name: "runner-memory", version: kestraVersion
testImplementation group: "io.kestra", name: "storage-local", version: kestraVersion
testImplementation group: "io.kestra", name: "indexer", version: kestraVersion

// test
testImplementation "org.junit.jupiter:junit-jupiter-engine"
Expand Down
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
version=1.5.2-SNAPSHOT
kestraVersion=1.3.0
kestraVersion=2.0.0-SNAPSHOT
org.gradle.jvmargs=-Xmx2g
7 changes: 7 additions & 0 deletions plugin-transform-grok/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,10 @@ kestra:
type: local
local:
base-path: /tmp/unittest
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost

6 changes: 6 additions & 0 deletions plugin-transform-json/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: local
local:
base-path: /tmp/unittest
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.kestra.plugin.transform;

import io.kestra.core.exceptions.InternalException;
import io.kestra.core.junit.annotations.ExecuteFlow;
import io.kestra.core.junit.annotations.KestraTest;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.executions.TaskRun;
import io.kestra.core.models.flows.State;
import io.kestra.core.services.TaskOutputService;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Test;

import java.util.List;
Expand All @@ -16,14 +19,17 @@

@KestraTest(startRunner = true)
class MapFlowTest {
@Inject
private TaskOutputService taskOutputService;

@Test
@ExecuteFlow("flows/map_flow.yaml")
void executesFlow(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("map");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) outputs.get("records");

assertThat(records.size(), is(1));
Expand All @@ -35,12 +41,12 @@ void executesFlow(Execution execution) {

@Test
@ExecuteFlow("flows/map_flow_store.yaml")
void executesStoreFlow(Execution execution) {
void executesStoreFlow(Execution execution) throws InternalException {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("map");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);

assertThat(outputs.containsKey("records"), is(false));
Object uri = outputs.get("uri");
Expand All @@ -55,7 +61,7 @@ void executesUnnestStoreFlow(Execution execution) {

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("explode");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);

assertThat(outputs.containsKey("records"), is(false));
Object uri = outputs.get("uri");
Expand All @@ -70,7 +76,7 @@ void executesFilterStoreFlow(Execution execution) {

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("filter");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);

assertThat(outputs.containsKey("records"), is(false));
Object uri = outputs.get("uri");
Expand All @@ -85,7 +91,7 @@ void executesAggregateStoreFlow(Execution execution) {

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("aggregate");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);

assertThat(outputs.containsKey("records"), is(false));
Object uri = outputs.get("uri");
Expand All @@ -100,7 +106,7 @@ void executesZipFlow(Execution execution) {

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("zip");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) outputs.get("records");

assertThat(records.size(), is(2));
Expand All @@ -116,7 +122,7 @@ void executesZipStoreFlow(Execution execution) {

List<TaskRun> taskRuns = execution.findTaskRunsByTaskId("zip");
TaskRun taskRun = taskRuns.getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);

assertThat(outputs.containsKey("records"), is(false));
Object uri = outputs.get("uri");
Expand All @@ -130,7 +136,7 @@ void executesSelectFlow(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) outputs.get("records");

assertThat(records, hasSize(1));
Expand All @@ -147,14 +153,14 @@ void executesSelectStoreFlow(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

TaskRun selectRun = execution.findTaskRunsByTaskId("select").getFirst();
Map<String, Object> selectOutputs = (Map<String, Object>) selectRun.getOutputs();
Map<String, Object> selectOutputs = (Map<String, Object>) outputsOf(selectRun);
assertThat(selectOutputs.containsKey("records"), is(false));
Object uri = selectOutputs.get("uri");
assertThat(uri != null, is(true));
assertThat(uri.toString().startsWith("kestra://"), is(true));

TaskRun readBackRun = execution.findTaskRunsByTaskId("read_back").getFirst();
Map<String, Object> readBackOutputs = (Map<String, Object>) readBackRun.getOutputs();
Map<String, Object> readBackOutputs = (Map<String, Object>) outputsOf(readBackRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) readBackOutputs.get("records");
assertThat(records, hasSize(2));
assertThat(((Number) records.getFirst().get("a")).longValue(), is(1L));
Expand All @@ -169,13 +175,13 @@ void executesSelectBinaryStoreFlow(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

TaskRun selectRun = execution.findTaskRunsByTaskId("select").getFirst();
Map<String, Object> selectOutputs = (Map<String, Object>) selectRun.getOutputs();
Map<String, Object> selectOutputs = (Map<String, Object>) outputsOf(selectRun);
Object uri = selectOutputs.get("uri");
assertThat(uri != null, is(true));
assertThat(uri.toString().startsWith("kestra://"), is(true));

TaskRun readBackRun = execution.findTaskRunsByTaskId("read_back").getFirst();
Map<String, Object> readBackOutputs = (Map<String, Object>) readBackRun.getOutputs();
Map<String, Object> readBackOutputs = outputsOf(readBackRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) readBackOutputs.get("records");
assertThat(records, hasSize(1));
assertThat(((Number) records.getFirst().get("a")).longValue(), is(1L));
Expand All @@ -188,7 +194,7 @@ void executesSelectLengthMismatchSkipFlow(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) outputs.get("records");

assertThat(records, hasSize(1));
Expand All @@ -202,10 +208,18 @@ void executesSelectOnErrorKeepFlow(Execution execution) {
assertThat(execution.getState().getCurrent(), is(State.Type.SUCCESS));

TaskRun taskRun = execution.findTaskRunsByTaskId("select").getFirst();
Map<String, Object> outputs = (Map<String, Object>) taskRun.getOutputs();
Map<String, Object> outputs = outputsOf(taskRun);
List<Map<String, Object>> records = (List<Map<String, Object>>) outputs.get("records");

assertThat(records, hasSize(1));
assertThat(records.getFirst().get("total_spent"), is("not-a-number"));
}

protected Map<String, Object> outputsOf(TaskRun taskRun) {
try {
return taskOutputService.getOutputs(taskRun);
} catch (InternalException e) {
throw new RuntimeException(e);
}
}
}
6 changes: 6 additions & 0 deletions plugin-transform-records/src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ kestra:
type: local
local:
base-path: /tmp/unittest
worker:
controllers:
type: STATIC
static:
endpoints:
- host: localhost
Loading