Skip to content

Commit 3efccff

Browse files
committed
WIP
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent 1f4aaa3 commit 3efccff

File tree

2 files changed

+81
-75
lines changed

2 files changed

+81
-75
lines changed

impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/FileDescriptorReader.java

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,20 @@
1515
*/
1616
package io.serverlessworkflow.impl.executors.grpc;
1717

18-
import com.github.os72.protocjar.Protoc;
1918
import com.google.protobuf.DescriptorProtos;
2019
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
20+
import io.serverlessworkflow.impl.scripts.ScriptUtils;
21+
2122
import java.io.IOException;
2223
import java.io.InputStream;
2324
import java.io.UncheckedIOException;
2425
import java.nio.file.Files;
2526
import java.nio.file.Path;
2627
import java.util.Optional;
27-
import java.util.stream.Stream;
28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
3028

3129
public class FileDescriptorReader {
3230

33-
private static final Logger LOGGER = LoggerFactory.getLogger(FileDescriptorReader.class);
34-
35-
public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
31+
public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
3632
Path grpcDir =
3733
tryCreateTempGrpcDir()
3834
.orElseThrow(
@@ -93,34 +89,16 @@ private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path de
9389
};
9490

9591
try {
96-
int status = Protoc.runProtoc(protocArgs);
92+
93+
ProcessBuilder processBuilder = new ProcessBuilder(protocArgs);
94+
int status = ScriptUtils.uncheckedStart(processBuilder).waitFor();
9795
if (status != 0) {
9896
throw new RuntimeException(
9997
"Unable to generate file descriptor, 'protoc' execution failed with status " + status);
10098
}
101-
} catch (IOException e) {
102-
throw new UncheckedIOException("Unable to generate file descriptor", e);
10399
} catch (InterruptedException e) {
104100
Thread.currentThread().interrupt();
105101
throw new RuntimeException("Unable to generate file descriptor", e);
106-
} finally {
107-
cleanUp(grpcDir);
108-
}
109-
}
110-
111-
private static void cleanUp(Path grpcDir) {
112-
try (Stream<Path> pathStream = Files.walk(grpcDir)) {
113-
pathStream
114-
.map(Path::toFile)
115-
.forEach(
116-
file -> {
117-
boolean deleted = file.delete();
118-
if (!deleted) {
119-
LOGGER.warn("Unable to deleted the file: {}", file.getName());
120-
}
121-
});
122-
} catch (IOException ignored) {
123-
LOGGER.warn("Unable to clean temporary directory: {}", grpcDir);
124102
}
125103
}
126104
}

impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutorBuilder.java

Lines changed: 75 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -150,53 +150,17 @@ public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePo
150150
.build(),
151151
CallOptions.DEFAULT.withWaitForReady());
152152

153-
Map<String, Object> parameters = Map.of();
154-
155-
if (methodType == MethodDescriptor.MethodType.CLIENT_STREAMING) {
156-
JsonNode jsonNode =
157-
ProtobufMessageUtils.asyncStreamingCall(
158-
parameters,
159-
methodDescriptor,
160-
responseObserver ->
161-
ClientCalls.asyncClientStreamingCall(call, responseObserver),
162-
nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0));
163-
return workflowContext.definition().application().modelFactory().fromAny(jsonNode);
164-
165-
} else if (methodType == MethodDescriptor.MethodType.BIDI_STREAMING) {
166-
return workflowContext
167-
.definition()
168-
.application()
169-
.modelFactory()
170-
.fromAny(
171-
ProtobufMessageUtils.asyncStreamingCall(
172-
parameters,
173-
methodDescriptor,
174-
responseObserver ->
175-
ClientCalls.asyncBidiStreamingCall(call, responseObserver),
176-
v -> {
177-
Collection<JsonNode> nodes = v;
178-
List<JsonNode> list = new ArrayList<>(nodes);
179-
return JsonUtils.fromValue(list);
180-
}));
181-
} else if (methodType == MethodDescriptor.MethodType.SERVER_STREAMING) {
182-
Message.Builder builder =
183-
ProtobufMessageUtils.buildMessage(methodDescriptor, parameters);
184-
List<JsonNode> nodes = new ArrayList<>();
185-
ClientCalls.blockingServerStreamingCall(call, builder.build())
186-
.forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message)));
187-
return workflowContext.definition().application().modelFactory().fromAny(nodes);
188-
} else {
189-
190-
Message.Builder builder =
191-
ProtobufMessageUtils.buildMessage(methodDescriptor, parameters);
192-
193-
Message message = ClientCalls.blockingUnaryCall(call, builder.build());
194-
return workflowContext
195-
.definition()
196-
.application()
197-
.modelFactory()
198-
.fromAny(ProtobufMessageUtils.convert(message));
199-
}
153+
return switch (methodType) {
154+
case CLIENT_STREAMING ->
155+
handleClientStreaming(workflowContext, arguments, methodDescriptor, call);
156+
case BIDI_STREAMING ->
157+
handleBidiStreaming(workflowContext, arguments, methodDescriptor, call);
158+
case SERVER_STREAMING ->
159+
handleServerStreaming(workflowContext, methodDescriptor, arguments, call);
160+
case UNARY, UNKNOWN ->
161+
handleBlockingUnary(workflowContext, methodDescriptor, arguments, call);
162+
};
163+
200164
} catch (Descriptors.DescriptorValidationException
201165
| InvalidProtocolBufferException
202166
| JsonProcessingException e) {
@@ -205,6 +169,70 @@ public void init(CallGRPC task, WorkflowDefinition definition, WorkflowMutablePo
205169
};
206170
}
207171

172+
private static WorkflowModel handleClientStreaming(
173+
WorkflowContext workflowContext,
174+
Map<String, Object> parameters,
175+
Descriptors.MethodDescriptor methodDescriptor,
176+
ClientCall<Message, Message> call) {
177+
JsonNode jsonNode =
178+
ProtobufMessageUtils.asyncStreamingCall(
179+
parameters,
180+
methodDescriptor,
181+
responseObserver -> ClientCalls.asyncClientStreamingCall(call, responseObserver),
182+
nodes -> nodes.isEmpty() ? NullNode.instance : nodes.get(0));
183+
return workflowContext.definition().application().modelFactory().fromAny(jsonNode);
184+
}
185+
186+
private static WorkflowModel handleServerStreaming(
187+
WorkflowContext workflowContext,
188+
Descriptors.MethodDescriptor methodDescriptor,
189+
Map<String, Object> parameters,
190+
ClientCall<Message, Message> call)
191+
throws InvalidProtocolBufferException, JsonProcessingException {
192+
Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters);
193+
List<JsonNode> nodes = new ArrayList<>();
194+
ClientCalls.blockingServerStreamingCall(call, builder.build())
195+
.forEachRemaining(message -> nodes.add(ProtobufMessageUtils.convert(message)));
196+
return workflowContext.definition().application().modelFactory().fromAny(nodes);
197+
}
198+
199+
private static WorkflowModel handleBlockingUnary(
200+
WorkflowContext workflowContext,
201+
Descriptors.MethodDescriptor methodDescriptor,
202+
Map<String, Object> parameters,
203+
ClientCall<Message, Message> call)
204+
throws InvalidProtocolBufferException, JsonProcessingException {
205+
Message.Builder builder = ProtobufMessageUtils.buildMessage(methodDescriptor, parameters);
206+
207+
Message message = ClientCalls.blockingUnaryCall(call, builder.build());
208+
return workflowContext
209+
.definition()
210+
.application()
211+
.modelFactory()
212+
.fromAny(ProtobufMessageUtils.convert(message));
213+
}
214+
215+
private static WorkflowModel handleBidiStreaming(
216+
WorkflowContext workflowContext,
217+
Map<String, Object> parameters,
218+
Descriptors.MethodDescriptor methodDescriptor,
219+
ClientCall<Message, Message> call) {
220+
return workflowContext
221+
.definition()
222+
.application()
223+
.modelFactory()
224+
.fromAny(
225+
ProtobufMessageUtils.asyncStreamingCall(
226+
parameters,
227+
methodDescriptor,
228+
responseObserver -> ClientCalls.asyncBidiStreamingCall(call, responseObserver),
229+
v -> {
230+
Collection<JsonNode> nodes = v;
231+
List<JsonNode> list = new ArrayList<>(nodes);
232+
return JsonUtils.fromValue(list);
233+
}));
234+
}
235+
208236
@Override
209237
public CallableTask build() {
210238
return new GrpcExecutor(

0 commit comments

Comments
 (0)