Skip to content

Commit 1f4aaa3

Browse files
committed
WIP
Signed-off-by: Matheus Cruz <matheuscruz.dev@gmail.com>
1 parent 217bae0 commit 1f4aaa3

File tree

5 files changed

+351
-307
lines changed

5 files changed

+351
-307
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.executors.grpc;
17+
18+
import com.github.os72.protocjar.Protoc;
19+
import com.google.protobuf.DescriptorProtos;
20+
import io.serverlessworkflow.impl.resources.ExternalResourceHandler;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.UncheckedIOException;
24+
import java.nio.file.Files;
25+
import java.nio.file.Path;
26+
import java.util.Optional;
27+
import java.util.stream.Stream;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
public class FileDescriptorReader {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(FileDescriptorReader.class);
34+
35+
public FileDescriptorContext readDescriptor(ExternalResourceHandler externalResourceHandler) {
36+
Path grpcDir =
37+
tryCreateTempGrpcDir()
38+
.orElseThrow(
39+
() -> new IllegalStateException("Could not create temporary gRPC directory"));
40+
41+
try (InputStream inputStream = externalResourceHandler.open()) {
42+
43+
Path protoFile = grpcDir.resolve(externalResourceHandler.name());
44+
45+
Files.copy(inputStream, protoFile);
46+
47+
Path descriptorOutput = grpcDir.resolve("descriptor.protobin");
48+
49+
try {
50+
51+
generateFileDescriptor(grpcDir, protoFile, descriptorOutput);
52+
53+
DescriptorProtos.FileDescriptorSet fileDescriptorSet =
54+
DescriptorProtos.FileDescriptorSet.newBuilder()
55+
.mergeFrom(Files.readAllBytes(descriptorOutput))
56+
.build();
57+
58+
return new FileDescriptorContext(fileDescriptorSet, externalResourceHandler.name());
59+
60+
} catch (IOException e) {
61+
throw new UncheckedIOException(
62+
"Unable to read external resource handler: " + externalResourceHandler.name(), e);
63+
}
64+
} catch (IOException e) {
65+
throw new RuntimeException(e);
66+
}
67+
}
68+
69+
private Optional<Path> tryCreateTempGrpcDir() {
70+
try {
71+
return Optional.of(Files.createTempDirectory("serverless-workflow-"));
72+
} catch (IOException e) {
73+
throw new UncheckedIOException("Error while creating temporary gRPC directory", e);
74+
}
75+
}
76+
77+
/**
78+
* Calls protoc binary with <code>--descriptor_set_out=</code> option set.
79+
*
80+
* @param grpcDir a temporary directory
81+
* @param protoFile the .proto file used by <code>protoc</code> to generate the file descriptor
82+
* @param descriptorOutput the output directory where the descriptor file will be generated
83+
*/
84+
private static void generateFileDescriptor(Path grpcDir, Path protoFile, Path descriptorOutput) {
85+
String[] protocArgs =
86+
new String[] {
87+
"protoc",
88+
"--include_imports",
89+
"--descriptor_set_out=" + descriptorOutput.toAbsolutePath(),
90+
"-I",
91+
grpcDir.toAbsolutePath().toString(),
92+
protoFile.toAbsolutePath().toString()
93+
};
94+
95+
try {
96+
int status = Protoc.runProtoc(protocArgs);
97+
if (status != 0) {
98+
throw new RuntimeException(
99+
"Unable to generate file descriptor, 'protoc' execution failed with status " + status);
100+
}
101+
} catch (IOException e) {
102+
throw new UncheckedIOException("Unable to generate file descriptor", e);
103+
} catch (InterruptedException e) {
104+
Thread.currentThread().interrupt();
105+
throw new RuntimeException("Unable to generate file descriptor", e);
106+
} finally {
107+
cleanUp(grpcDir);
108+
}
109+
}
110+
111+
private static void cleanUp(Path grpcDir) {
112+
try (Stream<Path> pathStream = Files.walk(grpcDir)) {
113+
pathStream
114+
.map(Path::toFile)
115+
.forEach(
116+
file -> {
117+
boolean deleted = file.delete();
118+
if (!deleted) {
119+
LOGGER.warn("Unable to deleted the file: {}", file.getName());
120+
}
121+
});
122+
} catch (IOException ignored) {
123+
LOGGER.warn("Unable to clean temporary directory: {}", grpcDir);
124+
}
125+
}
126+
}

impl/grpc/src/main/java/io/serverlessworkflow/impl/executors/grpc/GrpcExecutor.java

Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -15,46 +15,19 @@
1515
*/
1616
package io.serverlessworkflow.impl.executors.grpc;
1717

18-
import com.fasterxml.jackson.core.JsonProcessingException;
19-
import com.fasterxml.jackson.databind.JsonNode;
20-
import com.google.protobuf.DescriptorProtos;
21-
import com.google.protobuf.Descriptors;
22-
import com.google.protobuf.DynamicMessage;
23-
import com.google.protobuf.InvalidProtocolBufferException;
24-
import com.google.protobuf.Message;
25-
import com.google.protobuf.util.JsonFormat;
26-
import io.grpc.MethodDescriptor;
27-
import io.grpc.Status;
28-
import io.grpc.stub.StreamObserver;
29-
import io.serverlessworkflow.api.WorkflowFormat;
30-
import io.serverlessworkflow.api.types.ExternalResource;
3118
import io.serverlessworkflow.impl.TaskContext;
3219
import io.serverlessworkflow.impl.WorkflowContext;
3320
import io.serverlessworkflow.impl.WorkflowModel;
3421
import io.serverlessworkflow.impl.executors.CallableTask;
35-
import java.io.IOException;
36-
import java.io.UncheckedIOException;
37-
import java.util.ArrayList;
38-
import java.util.List;
39-
import java.util.Map;
4022
import java.util.concurrent.CompletableFuture;
41-
import java.util.concurrent.CompletionException;
42-
import java.util.concurrent.ExecutionException;
43-
import java.util.concurrent.TimeUnit;
44-
import java.util.concurrent.TimeoutException;
45-
import org.slf4j.Logger;
46-
import org.slf4j.LoggerFactory;
4723

4824
public class GrpcExecutor implements CallableTask {
4925

50-
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcExecutor.class);
51-
5226
private final GrpcRequestContext requestContext;
5327
private final GrpcCallExecutor grpcCallExecutor;
5428
private final FileDescriptorContextSupplier fileDescriptorContextSupplier;
5529

5630
public GrpcExecutor(
57-
ExternalResource proto,
5831
GrpcRequestContext builder,
5932
GrpcCallExecutor grpcCallExecutor,
6033
FileDescriptorContextSupplier fileDescriptorContextSupplier) {
@@ -75,98 +48,4 @@ public CompletableFuture<WorkflowModel> apply(
7548
this.grpcCallExecutor.apply(
7649
fileDescriptorContext, this.requestContext, workflowContext, taskContext, input));
7750
}
78-
79-
public static Message.Builder buildMessage(Object object, Message.Builder builder)
80-
throws InvalidProtocolBufferException, JsonProcessingException {
81-
JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(object), builder);
82-
return builder;
83-
}
84-
85-
public static Message.Builder buildMessage(
86-
Descriptors.MethodDescriptor methodDescriptor, Map<String, Object> parameters)
87-
throws InvalidProtocolBufferException, JsonProcessingException {
88-
DynamicMessage.Builder builder = DynamicMessage.newBuilder(methodDescriptor.getInputType());
89-
JsonFormat.parser().merge(WorkflowFormat.JSON.mapper().writeValueAsString(parameters), builder);
90-
return builder;
91-
}
92-
93-
public static JsonNode convert(Message message) {
94-
StringBuilder str = new StringBuilder();
95-
try {
96-
JsonFormat.printer().appendTo(message, str);
97-
return WorkflowFormat.JSON.mapper().readTree(str.toString());
98-
} catch (IOException e) {
99-
throw new UncheckedIOException("Error converting protobuf message to JSON", e);
100-
}
101-
}
102-
103-
public static MethodDescriptor.MethodType getMethodType(
104-
com.google.protobuf.Descriptors.MethodDescriptor methodDesc) {
105-
DescriptorProtos.MethodDescriptorProto methodDescProto = methodDesc.toProto();
106-
if (methodDescProto.getClientStreaming()) {
107-
if (methodDescProto.getServerStreaming()) {
108-
return MethodDescriptor.MethodType.BIDI_STREAMING;
109-
}
110-
return MethodDescriptor.MethodType.CLIENT_STREAMING;
111-
} else if (methodDescProto.getServerStreaming()) {
112-
return MethodDescriptor.MethodType.SERVER_STREAMING;
113-
} else {
114-
return MethodDescriptor.MethodType.UNARY;
115-
}
116-
}
117-
118-
public static class WaitingStreamObserver implements StreamObserver<Message> {
119-
List<Message> responses = new ArrayList<>();
120-
CompletableFuture<List<Message>> responsesFuture = new CompletableFuture<>();
121-
private final int timeout;
122-
123-
public WaitingStreamObserver(int timeout) {
124-
this.timeout = timeout;
125-
}
126-
127-
@Override
128-
public void onNext(Message messageReply) {
129-
responses.add(messageReply);
130-
}
131-
132-
@Override
133-
public void onError(Throwable throwable) {
134-
responsesFuture.completeExceptionally(throwable);
135-
}
136-
137-
@Override
138-
public void onCompleted() {
139-
responsesFuture.complete(responses);
140-
}
141-
142-
public List<Message> get() {
143-
try {
144-
return responsesFuture.get(timeout, TimeUnit.SECONDS);
145-
} catch (InterruptedException e) {
146-
Thread.currentThread().interrupt();
147-
throw new IllegalStateException(e);
148-
} catch (TimeoutException e) {
149-
throw new IllegalStateException(
150-
String.format("gRPC call timed out after %d seconds", timeout), e);
151-
} catch (ExecutionException e) {
152-
throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause());
153-
}
154-
}
155-
156-
public void checkForServerStreamErrors() {
157-
if (responsesFuture.isCompletedExceptionally()) {
158-
try {
159-
responsesFuture.join();
160-
} catch (CompletionException e) {
161-
throw new IllegalStateException(getServerStreamErrorMessage(e.getCause()), e.getCause());
162-
}
163-
}
164-
}
165-
166-
private String getServerStreamErrorMessage(Throwable throwable) {
167-
return String.format(
168-
"Received an error through gRPC server stream with status: %s",
169-
Status.fromThrowable(throwable));
170-
}
171-
}
17251
}

0 commit comments

Comments
 (0)