Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

public class DefaultPersistenceInstanceHandlers extends PersistenceInstanceHandlers {

private final PersistenceInstanceStore store;

public static DefaultPersistenceInstanceHandlers from(PersistenceInstanceStore store) {
return new DefaultPersistenceInstanceHandlers(
new DefaultPersistenceInstanceWriter(store),
new DefaultPersistenceInstanceReader(store),
store);
}

private DefaultPersistenceInstanceHandlers(
PersistenceInstanceWriter writer,
PersistenceInstanceReader reader,
PersistenceInstanceStore store) {
super(writer, reader);
this.store = store;
}

@Override
public void close() {
safeClose(store);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowInstance;
import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;

public class DefaultPersistenceInstanceReader implements PersistenceInstanceReader {

private final PersistenceInstanceStore store;

protected DefaultPersistenceInstanceReader(PersistenceInstanceStore store) {
this.store = store;
}

@Override
public Stream<WorkflowInstance> scan(
WorkflowDefinition definition, Collection<String> instanceIds) {
PersistenceInstanceTransaction transaction = store.begin();
return instanceIds.stream()
.map(id -> read(transaction, definition, id))
.flatMap(Optional::stream)
.onClose(() -> transaction.commit());
}

@Override
public Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId) {
PersistenceInstanceTransaction transaction = store.begin();
try {
return read(transaction, definition, instanceId);
} catch (Exception ex) {
transaction.rollback();
throw ex;
}
}

private Optional<WorkflowInstance> read(
PersistenceInstanceTransaction t, WorkflowDefinition definition, String instanceId) {
return t.readWorkflowInfo(definition, instanceId)
.map(i -> new WorkflowPersistenceInstance(definition, i));
}

@Override
public Stream<WorkflowInstance> scanAll(WorkflowDefinition definition) {
PersistenceInstanceTransaction transaction = store.begin();
return transaction
.scanAll(definition)
.onClose(() -> transaction.commit())
.map(v -> new WorkflowPersistenceInstance(definition, v));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowStatus;
import java.util.function.Consumer;

public class DefaultPersistenceInstanceWriter implements PersistenceInstanceWriter {

private final PersistenceInstanceStore store;

protected DefaultPersistenceInstanceWriter(PersistenceInstanceStore store) {
this.store = store;
}

@Override
public void started(WorkflowContextData workflowContext) {
doTransaction(t -> t.writeInstanceData(key(workflowContext), workflowContext));
}

@Override
public void completed(WorkflowContextData workflowContext) {
removeProcessInstance(workflowContext);
}

@Override
public void failed(WorkflowContextData workflowContext, Throwable ex) {
removeProcessInstance(workflowContext);
}

@Override
public void aborted(WorkflowContextData workflowContext) {
removeProcessInstance(workflowContext);
}

protected void removeProcessInstance(WorkflowContextData workflowContext) {
doTransaction(
t -> {
String key = key(workflowContext);
t.removeInstanceData(key, workflowContext);
t.removeStatus(key, workflowContext);
t.removeTasks(key);
});
}

@Override
public void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext) {
// not recording
}

@Override
public void taskRetried(WorkflowContextData workflowContext, TaskContextData taskContext) {
doTransaction(t -> t.writeRetryTask(key(workflowContext), workflowContext, taskContext));
}

@Override
public void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext) {
doTransaction(t -> t.writeCompletedTask(key(workflowContext), workflowContext, taskContext));
}

@Override
public void suspended(WorkflowContextData workflowContext) {
doTransaction(
t -> t.writeStatus(key(workflowContext), WorkflowStatus.SUSPENDED, workflowContext));
}

@Override
public void resumed(WorkflowContextData workflowContext) {
doTransaction(t -> t.removeStatus(key(workflowContext), workflowContext));
}

private void doTransaction(Consumer<PersistenceInstanceTransaction> operations) {
PersistenceInstanceTransaction transaction = store.begin();
try {
operations.accept(transaction);
transaction.commit();
} catch (Exception ex) {
transaction.rollback();
throw ex;
}
}

protected String key(WorkflowContextData workflowContext) {
return workflowContext.instanceData().id();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@
*/
package io.serverlessworkflow.impl.persistence;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;
public class PersistenceInstanceHandlers implements AutoCloseable {

public abstract class PersistenceInstanceHandlers implements AutoCloseable {
private final PersistenceInstanceWriter writer;
private final PersistenceInstanceReader reader;

protected final PersistenceInstanceWriter writer;
protected final PersistenceInstanceReader reader;

protected PersistenceInstanceHandlers(
public PersistenceInstanceHandlers(
PersistenceInstanceWriter writer, PersistenceInstanceReader reader) {
this.writer = writer;
this.reader = reader;
Expand All @@ -37,8 +35,5 @@ public PersistenceInstanceReader reader() {
}

@Override
public void close() {
safeClose(writer);
safeClose(reader);
}
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence.bigmap;
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.WorkflowModel;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowInstance;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

public interface PersistenceInstanceReader extends AutoCloseable {
Map<String, WorkflowInstance> readAll(WorkflowDefinition definition);
public interface PersistenceInstanceReader {

Map<String, WorkflowInstance> read(WorkflowDefinition definition, Collection<String> instanceIds);
Stream<WorkflowInstance> scanAll(WorkflowDefinition definition);

Optional<WorkflowInstance> read(WorkflowDefinition definition, String instanceId);
Stream<WorkflowInstance> scan(WorkflowDefinition definition, Collection<String> instanceIds);

@Override
default void close() {}
Optional<WorkflowInstance> find(WorkflowDefinition definition, String instanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence.bigmap;
package io.serverlessworkflow.impl.persistence;

public interface BigMapInstanceStore<K, V, T, S> extends AutoCloseable {
BigMapInstanceTransaction<K, V, T, S> begin();
public interface PersistenceInstanceStore extends AutoCloseable {
PersistenceInstanceTransaction begin();

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.persistence;

import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowStatus;
import java.util.Optional;
import java.util.stream.Stream;

public interface PersistenceInstanceTransaction {

void commit();

void rollback();

void writeInstanceData(String instanceId, WorkflowContextData workflowContext);

void writeRetryTask(
String instanceId, WorkflowContextData workflowContext, TaskContextData taskContext);

void writeCompletedTask(
String instanceId, WorkflowContextData workflowContext, TaskContextData taskContext);

void writeStatus(
String instanceId, WorkflowStatus suspended, WorkflowContextData workflowContext);

void removeInstanceData(String instanceId, WorkflowContextData workflowContext);

void removeStatus(String instanceId, WorkflowContextData workflowContext);

void removeTasks(String instanceId);

Stream<PersistenceWorkflowInfo> scanAll(WorkflowDefinition definition);

Optional<PersistenceWorkflowInfo> readWorkflowInfo(
WorkflowDefinition definition, String instanceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;

public interface PersistenceInstanceWriter extends AutoCloseable {
public interface PersistenceInstanceWriter {

void started(WorkflowContextData workflowContext);

Expand All @@ -37,7 +37,4 @@ public interface PersistenceInstanceWriter extends AutoCloseable {
void taskStarted(WorkflowContextData workflowContext, TaskContextData taskContext);

void taskCompleted(WorkflowContextData workflowContext, TaskContextData taskContext);

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package io.serverlessworkflow.impl.persistence;

import static io.serverlessworkflow.impl.WorkflowUtils.safeClose;

import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent;
import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent;
Expand Down Expand Up @@ -80,8 +78,4 @@ public void onTaskCompleted(TaskCompletedEvent ev) {
public void onTaskRetried(TaskRetriedEvent ev) {
persistenceWriter.taskRetried(ev.workflowContext(), ev.taskContext());
}

public void close() {
safeClose(persistenceWriter);
}
}
Loading