Skip to content

Commit 4870381

Browse files
committed
Add gRPC support (work in progress)
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent 9f56628 commit 4870381

36 files changed

+1647
-1
lines changed

impl/grpc/pom.xml

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>io.serverlessworkflow</groupId>
5+
<artifactId>serverlessworkflow-impl</artifactId>
6+
<version>8.0.0-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>serverlessworkflow-impl-grpc</artifactId>
9+
<name>Serverless Workflow :: Impl :: gRPC</name>
10+
11+
<dependencies>
12+
<dependency>
13+
<groupId>io.serverlessworkflow</groupId>
14+
<artifactId>serverlessworkflow-impl-core</artifactId>
15+
</dependency>
16+
<dependency>
17+
<groupId>io.serverlessworkflow</groupId>
18+
<artifactId>serverlessworkflow-api</artifactId>
19+
</dependency>
20+
<dependency>
21+
<groupId>io.serverlessworkflow</groupId>
22+
<artifactId>serverlessworkflow-impl-jackson</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.grpc</groupId>
26+
<artifactId>grpc-stub</artifactId>
27+
</dependency>
28+
<dependency>
29+
<groupId>com.google.protobuf</groupId>
30+
<artifactId>protobuf-java</artifactId>
31+
<version>3.25.8</version>
32+
</dependency>
33+
<dependency>
34+
<groupId>com.google.protobuf</groupId>
35+
<artifactId>protobuf-java-util</artifactId>
36+
<version>3.25.8</version>
37+
</dependency>
38+
<dependency>
39+
<groupId>com.github.os72</groupId>
40+
<artifactId>protoc-jar</artifactId>
41+
<version>${version.com.github.os72.protoc.jar}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>io.grpc</groupId>
45+
<artifactId>grpc-protobuf</artifactId>
46+
</dependency>
47+
</dependencies>
48+
</project>
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.google.protobuf.DescriptorProtos;
19+
20+
public record FileDescriptorContext(
21+
DescriptorProtos.FileDescriptorSet fileDescriptorSet, String inputProto) {}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
22+
@FunctionalInterface
23+
public interface FileDescriptorContextSupplier {
24+
25+
FileDescriptorContext get(
26+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input);
27+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.github.os72.protocjar.Protoc;
19+
import com.google.protobuf.DescriptorProtos;
20+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.UncheckedIOException;
24+
import java.nio.file.Files;
25+
import java.nio.file.Path;
26+
import java.nio.file.StandardCopyOption;
27+
import java.util.Optional;
28+
29+
public class FileDescriptorReader {
30+
31+
public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
32+
Path grpcDir =
33+
tryCreateTempGrpcDir()
34+
.orElseThrow(
35+
() -> new IllegalStateException("Could not create temporary gRPC directory"));
36+
37+
try (InputStream inputStream = externalResourceHandler.open()) {
38+
39+
Path protoFile = grpcDir.resolve(externalResourceHandler.name());
40+
if (!Files.exists(protoFile)) {
41+
Files.createDirectories(protoFile);
42+
}
43+
44+
Files.copy(inputStream, protoFile, StandardCopyOption.REPLACE_EXISTING);
45+
46+
Path descriptorOutput = grpcDir.resolve("descriptor.protobin");
47+
48+
try {
49+
50+
generateFileDescriptor(grpcDir, protoFile, descriptorOutput);
51+
52+
DescriptorProtos.FileDescriptorSet fileDescriptorSet =
53+
DescriptorProtos.FileDescriptorSet.newBuilder()
54+
.mergeFrom(Files.readAllBytes(descriptorOutput))
55+
.build();
56+
57+
return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name());
58+
59+
} catch (IOException e) {
60+
throw new UncheckedIOException(
61+
"Unable to read external resource handler: " + externalResourceHandler.name(), e);
62+
}
63+
} catch (IOException e) {
64+
throw new UncheckedIOException("Unable to read descriptor file", e);
65+
}
66+
}
67+
68+
private Optional<Path> tryCreateTempGrpcDir() {
69+
try {
70+
return Optional.of(Files.createTempDirectory("serverless-workflow-"));
71+
} catch (IOException e) {
72+
throw new UncheckedIOException("Error while creating temporary gRPC directory", e);
73+
}
74+
}
75+
76+
/**
77+
* Calls protoc binary with <code>--descriptor_set_out=</code> option set.
78+
*
79+
* @param grpcDir a temporary directory
80+
* @param protoFile the .proto file used by <code>protoc</code> to generate the file descriptor
81+
* @param descriptorOutput the output directory where the descriptor file will be generated
82+
*/
83+
private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) {
84+
String[] protocArgs =
85+
new String[] {
86+
"--include_imports",
87+
"--descriptor_set_out=" + descriptorOutput.toAbsolutePath(),
88+
"-I",
89+
grpcDir.toAbsolutePath().toString(),
90+
protoFile.toAbsolutePath().toString()
91+
};
92+
93+
try {
94+
95+
int status = Protoc.runProtoc(protocArgs);
96+
97+
// ProcessBuilder processBuilder = new ProcessBuilder(protocArgs);
98+
// int status = ScriptUtils.uncheckedStart(processBuilder).waitFor();
99+
100+
if (status != 0) {
101+
throw new RuntimeException(
102+
"Unable to generate file descriptor, 'protoc' execution failed with status " + status);
103+
}
104+
} catch (InterruptedException e) {
105+
Thread.currentThread().interrupt();
106+
throw new RuntimeException("Unable to generate file descriptor", e);
107+
} catch (IOException e) {
108+
throw new UncheckedIOException("Unable to generate file descriptor", e);
109+
}
110+
}
111+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import java.util.Map;
22+
23+
@FunctionalInterface
24+
public interface GrpcCallExecutor {
25+
26+
WorkflowModel apply(
27+
GrpcRequestContext requestContext,
28+
WorkflowContext workflowContext,
29+
TaskContext taskContext,
30+
WorkflowModel model,
31+
Map<String, Object> arguments);
32+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import io.grpc.Channel;
19+
import io.grpc.ManagedChannelBuilder;
20+
import io.serverlessworkflow.impl.TaskContext;
21+
import io.serverlessworkflow.impl.WorkflowApplication;
22+
import io.serverlessworkflow.impl.WorkflowContext;
23+
24+
public class GrpcChannelResolver {
25+
26+
public static final String GRPC_CHANNEL_PROVIDER = "grpcChannelProvider";
27+
28+
public static Channel channel(
29+
WorkflowContext workflowContext,
30+
TaskContext taskContext,
31+
GrpcRequestContext grpcRequestContext) {
32+
WorkflowApplication appl = workflowContext.definition().application();
33+
return appl.<Channel>additionalObject(GRPC_CHANNEL_PROVIDER, workflowContext, taskContext)
34+
.orElseGet(
35+
() ->
36+
ManagedChannelBuilder.forAddress(
37+
grpcRequestContext.address(), grpcRequestContext.port())
38+
.usePlaintext()
39+
.build());
40+
}
41+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import io.serverlessworkflow.impl.TaskContext;
19+
import io.serverlessworkflow.impl.WorkflowContext;
20+
import io.serverlessworkflow.impl.WorkflowModel;
21+
import io.serverlessworkflow.impl.WorkflowValueResolver;
22+
import io.serverlessworkflow.impl.executors.CallableTask;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
26+
public class GrpcExecutor implements CallableTask {
27+
28+
private final GrpcRequestContext requestContext;
29+
private final GrpcCallExecutor grpcCallExecutor;
30+
private final WorkflowValueResolver<Map<String, Object>> arguments;
31+
32+
public GrpcExecutor(
33+
GrpcRequestContext builder,
34+
GrpcCallExecutor grpcCallExecutor,
35+
WorkflowValueResolver<Map<String, Object>> arguments) {
36+
this.requestContext = builder;
37+
this.grpcCallExecutor = grpcCallExecutor;
38+
this.arguments = arguments;
39+
}
40+
41+
@Override
42+
public CompletableFuture<WorkflowModel> apply(
43+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
44+
45+
Map<String, Object> arguments = this.arguments.apply(workflowContext, taskContext, input);
46+
47+
return CompletableFuture.supplyAsync(
48+
() ->
49+
this.grpcCallExecutor.apply(
50+
requestContext, workflowContext, taskContext, input, arguments));
51+
}
52+
}

0 commit comments

Comments
 (0)