Skip to content

Commit 0e50e7d

Browse files
committed
[Fix #1247] Adding sync/async implementations
Signed-off-by: fjtirado <ftirados@redhat.com>
1 parent b29c722 commit 0e50e7d

8 files changed

Lines changed: 202 additions & 29 deletions

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AsyncPersistenceInstanceWriter.java renamed to impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractAsyncPersistenceExecutor.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,30 +16,26 @@
1616
package io.serverlessworkflow.impl.persistence;
1717

1818
import io.serverlessworkflow.impl.WorkflowContextData;
19-
import io.serverlessworkflow.impl.WorkflowDefinitionData;
2019
import java.util.Map;
2120
import java.util.Optional;
2221
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.ConcurrentHashMap;
2423
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.ExecutorService;
26-
import java.util.function.Consumer;
2725
import org.slf4j.Logger;
2826
import org.slf4j.LoggerFactory;
2927

30-
public abstract class AsyncPersistenceInstanceWriter extends AbstractPersistenceInstanceWriter {
28+
public abstract class AbstractAsyncPersistenceExecutor implements PersistenceExecutor {
3129

3230
private static final Logger logger =
33-
LoggerFactory.getLogger(AsyncPersistenceInstanceWriter.class);
31+
LoggerFactory.getLogger(AbstractAsyncPersistenceExecutor.class);
3432

3533
private final Map<String, CompletableFuture<Void>> futuresMap = new ConcurrentHashMap<>();
3634

3735
@Override
38-
protected CompletableFuture<Void> doTransaction(
39-
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
36+
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
4037
final ExecutorService service =
4138
executorService().orElse(context.definition().application().executorService());
42-
final Runnable runnable = () -> doTransaction(operation, context.definition());
4339
return futuresMap.compute(
4440
context.instanceData().id(),
4541
(k, v) ->
@@ -49,16 +45,20 @@ protected CompletableFuture<Void> doTransaction(
4945
}
5046

5147
@Override
52-
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
53-
return super.removeProcessInstance(workflowContext)
54-
.thenRun(() -> futuresMap.remove(workflowContext.instanceData().id()));
48+
public CompletableFuture<Void> startInstance(Runnable runnable, WorkflowContextData context) {
49+
return SyncPersistenceExecutor.execute(runnable);
5550
}
5651

57-
protected abstract void doTransaction(
58-
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition);
59-
60-
protected Optional<ExecutorService> executorService() {
61-
return Optional.empty();
52+
@Override
53+
public CompletableFuture<Void> deleteInstance(Runnable runnable, WorkflowContextData context) {
54+
CompletableFuture<Void> completable = futuresMap.remove(context.instanceData().id());
55+
if (completable != null) {
56+
CompletableFuture<Void> result = completable.whenComplete((__, ___) -> runnable.run());
57+
completable.cancel(true);
58+
return result;
59+
} else {
60+
return CompletableFuture.completedFuture(null);
61+
}
6262
}
6363

6464
@Override
@@ -75,4 +75,6 @@ public void close() {
7575
}
7676
futuresMap.clear();
7777
}
78+
79+
protected abstract Optional<ExecutorService> executorService();
7880
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/AbstractPersistenceInstanceWriter.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public abstract class AbstractPersistenceInstanceWriter implements PersistenceIn
2525

2626
@Override
2727
public CompletableFuture<Void> started(WorkflowContextData workflowContext) {
28-
return doTransaction(t -> t.writeInstanceData(workflowContext), workflowContext);
28+
return doStartInstance(t -> t.writeInstanceData(workflowContext), workflowContext);
2929
}
3030

3131
@Override
@@ -44,7 +44,7 @@ public CompletableFuture<Void> aborted(WorkflowContextData workflowContext) {
4444
}
4545

4646
protected CompletableFuture<Void> removeProcessInstance(WorkflowContextData workflowContext) {
47-
return doTransaction(t -> t.removeProcessInstance(workflowContext), workflowContext);
47+
return doCompleteInstance(t -> t.removeProcessInstance(workflowContext), workflowContext);
4848
}
4949

5050
@Override
@@ -81,4 +81,14 @@ public void close() throws Exception {}
8181

8282
protected abstract CompletableFuture<Void> doTransaction(
8383
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context);
84+
85+
protected CompletableFuture<Void> doCompleteInstance(
86+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData workflowContext) {
87+
return doTransaction(operation, workflowContext);
88+
}
89+
90+
protected CompletableFuture<Void> doStartInstance(
91+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData workflowContext) {
92+
return doTransaction(operation, workflowContext);
93+
}
8494
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import java.util.Optional;
19+
import java.util.concurrent.ExecutorService;
20+
21+
public class AsyncPersistenceExecutor extends AbstractAsyncPersistenceExecutor {
22+
23+
private final Optional<ExecutorService> service;
24+
25+
protected AsyncPersistenceExecutor(ExecutorService service) {
26+
this.service = Optional.ofNullable(service);
27+
}
28+
29+
@Override
30+
protected Optional<ExecutorService> executorService() {
31+
return service;
32+
}
33+
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceHandlers.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,33 @@
1717

1818
import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
1919

20-
import java.util.Optional;
2120
import java.util.concurrent.ExecutorService;
2221

2322
public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {
2423

2524
public static class Builder {
2625

2726
private final PersistenceInstanceStore store;
28-
private ExecutorService executorService;
27+
private PersistenceExecutor executor;
2928

3029
private Builder(PersistenceInstanceStore store) {
3130
this.store = store;
3231
}
3332

3433
public Builder withExecutorService(ExecutorService executorService) {
35-
this.executorService = executorService;
34+
this.executor = new AsyncPersistenceExecutor(executorService);
35+
return this;
36+
}
37+
38+
public Builder withPersistenceExecutor(PersistenceExecutor executor) {
39+
this.executor = executor;
3640
return this;
3741
}
3842

3943
public PersistenceInstanceHandlers build() {
4044
return new DefaultPersistenceInstanceHandlers(
41-
new DefaultPersistenceInstanceWriter(store, Optional.ofNullable(executorService)),
45+
new DefaultPersistenceInstanceWriter(
46+
store, executor == null ? new SyncPersistenceExecutor() : executor),
4247
new DefaultPersistenceInstanceReader(store),
4348
store);
4449
}

impl/persistence/api/src/main/java/io/serverlessworkflow/impl/persistence/DefaultPersistenceInstanceWriter.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,23 @@
1616
package io.serverlessworkflow.impl.persistence;
1717

1818
import io.serverlessworkflow.impl.WorkflowDefinitionData;
19-
import java.util.Optional;
20-
import java.util.concurrent.ExecutorService;
2119
import java.util.function.Consumer;
2220
import org.slf4j.Logger;
2321
import org.slf4j.LoggerFactory;
2422

25-
public class DefaultPersistenceInstanceWriter extends AsyncPersistenceInstanceWriter {
23+
public class DefaultPersistenceInstanceWriter extends TransactedPersistenceInstanceWriter {
2624

2725
private final PersistenceInstanceStore store;
28-
private final Optional<ExecutorService> executorService;
26+
27+
private final PersistenceExecutor persistenceExecutor;
2928

3029
private static final Logger logger =
3130
LoggerFactory.getLogger(DefaultPersistenceInstanceWriter.class);
3231

3332
protected DefaultPersistenceInstanceWriter(
34-
PersistenceInstanceStore store, Optional<ExecutorService> executorService) {
35-
this.executorService = executorService;
33+
PersistenceInstanceStore store, PersistenceExecutor persistenceExecutor) {
3634
this.store = store;
35+
this.persistenceExecutor = persistenceExecutor;
3736
}
3837

3938
@Override
@@ -54,7 +53,7 @@ protected void doTransaction(
5453
}
5554

5655
@Override
57-
protected Optional<ExecutorService> executorService() {
58-
return executorService;
56+
protected PersistenceExecutor persistenceExecutor() {
57+
return persistenceExecutor;
5958
}
6059
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
public interface PersistenceExecutor extends AutoCloseable {
22+
CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context);
23+
24+
default CompletableFuture<Void> startInstance(Runnable runnable, WorkflowContextData context) {
25+
return execute(runnable, context);
26+
}
27+
28+
default CompletableFuture<Void> deleteInstance(Runnable runnable, WorkflowContextData context) {
29+
return execute(runnable, context);
30+
}
31+
32+
default void close() {}
33+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import java.util.concurrent.CompletableFuture;
20+
21+
public class SyncPersistenceExecutor implements PersistenceExecutor {
22+
23+
@Override
24+
public CompletableFuture<Void> execute(Runnable runnable, WorkflowContextData context) {
25+
return execute(runnable);
26+
}
27+
28+
public static CompletableFuture<Void> execute(Runnable runnable) {
29+
try {
30+
runnable.run();
31+
return CompletableFuture.completedFuture(null);
32+
} catch (Exception ex) {
33+
return CompletableFuture.failedFuture(ex);
34+
}
35+
}
36+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2020-Present The Serverless Workflow Specification Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.serverlessworkflow.impl.persistence;
17+
18+
import io.serverlessworkflow.impl.WorkflowContextData;
19+
import io.serverlessworkflow.impl.WorkflowDefinitionData;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.function.Consumer;
22+
23+
public abstract class TransactedPersistenceInstanceWriter
24+
extends AbstractPersistenceInstanceWriter {
25+
26+
@Override
27+
protected CompletableFuture<Void> doTransaction(
28+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
29+
return persistenceExecutor()
30+
.execute(() -> doTransaction(operation, context.definition()), context);
31+
}
32+
33+
@Override
34+
protected CompletableFuture<Void> doStartInstance(
35+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
36+
return persistenceExecutor()
37+
.startInstance(() -> doTransaction(operation, context.definition()), context);
38+
}
39+
40+
@Override
41+
protected CompletableFuture<Void> doCompleteInstance(
42+
Consumer<PersistenceInstanceOperations> operation, WorkflowContextData context) {
43+
return persistenceExecutor()
44+
.deleteInstance(() -> doTransaction(operation, context.definition()), context);
45+
}
46+
47+
protected abstract void doTransaction(
48+
Consumer<PersistenceInstanceOperations> operation, WorkflowDefinitionData definition);
49+
50+
public void close() {
51+
persistenceExecutor().close();
52+
}
53+
54+
protected abstract PersistenceExecutor persistenceExecutor();
55+
}

0 commit comments

Comments
 (0)