Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"java.compile.nullAnalysis.mode": "automatic",
"java.configuration.updateBuildConfiguration": "automatic"
}
13 changes: 7 additions & 6 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
</licenses>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jackson.version>[2.7, 3.0)</jackson.version> <!-- 2.7+ is needed for JDK8 data type serialization -->
<jackson.version>[2.18.2, 2.18.3)</jackson.version> <!-- 2.18.2+ (stable) -->
<dagger.version>2.55</dagger.version> <!-- 2.55+ adds support for injecting jakarta.inject.Provider -->
<lombok.version>1.18.36</lombok.version> <!-- only compile-time dependency -->
<lombok.version>1.18.42</lombok.version> <!-- only compile-time dependency -->
<appengine.target.version>[2.0.4, 3.0)</appengine.target.version> <!-- now only for tests -->
</properties>

Expand All @@ -42,6 +42,7 @@
<configuration>
<source>17</source>
<target>17</target>

</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -83,7 +84,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>26.53.0</version>
<version>26.73.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -143,13 +144,13 @@
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20231013</version>
<version>20251224</version>
</dependency>
<!-- https://mvnrepository.com/artifact/it.unimi.dsi/fastutil-core -->
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil-core</artifactId>
<version>[8.5.1, 8.6.0)</version>
<version>[8.5.15, 8.7.0)</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
Expand Down Expand Up @@ -247,7 +248,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.15.2</version>
<version>5.21.0</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,44 @@
package com.google.appengine.tools.txn;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.stream.Collectors;

import com.google.appengine.tools.pipeline.impl.backend.PipelineTaskQueue;
import com.google.appengine.tools.pipeline.impl.tasks.PipelineTask;
import com.google.cloud.datastore.*;
import com.google.cloud.datastore.AggregationQuery;
import com.google.cloud.datastore.AggregationResults;
import com.google.cloud.datastore.Datastore;
import com.google.cloud.datastore.Entity;
import com.google.cloud.datastore.FullEntity;
import com.google.cloud.datastore.Key;
import com.google.cloud.datastore.Query;
import com.google.cloud.datastore.QueryResults;
import com.google.cloud.datastore.Transaction;
import com.google.cloud.datastore.models.ExplainOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.extern.java.Log;

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.stream.Collectors;

/**
* Transaction wrapper class that aims to mimic cross-services transactions. In this case datastore-cloud tasks.
* Transaction wrapper class that aims to mimic cross-services transactions. In
* this case datastore-cloud tasks.
*/
@Log
public class PipelineBackendTransactionImpl implements PipelineBackendTransaction {
Expand Down Expand Up @@ -64,19 +79,22 @@ public PipelineBackendTransactionImpl(@NonNull Datastore datastore, @NonNull Pip
/** Transaction interface delegate **/

public Response commit() {
//noinspection unchecked
// noinspection unchecked
try {
Response dsResponse = getDsTransaction().commit();
// log.info("commit transaction for " + Arrays.stream(Thread.currentThread().getStackTrace()).toList().get(2));
// we see more Datastore errors (contention / ) than cloud tasks enqueue errors (barely none)
// log.info("commit transaction for " +
// Arrays.stream(Thread.currentThread().getStackTrace()).toList().get(2));
// we see more Datastore errors (contention / ) than cloud tasks enqueue errors
// (barely none)
// let's only commit if the datastore txn went through
taskReferences.addAll(this.commitTasks());
return dsResponse;
} catch (Throwable t) {
rollbackTasks();
throw t;
} finally {
log.log(Level.FINE, String.format("Transaction commit %s- opened for %s", dsTransaction.getTransactionId().toStringUtf8(), stopwatch.elapsed()));
log.log(Level.FINE, String.format("Transaction commit %s- opened for %s",
dsTransaction.getTransactionId().toStringUtf8(), stopwatch.elapsed()));
}
}

Expand All @@ -99,7 +117,8 @@ public boolean rollbackIfActive() {
log.log(Level.WARNING, "Rollback of transaction failed: ", e);
} finally {
if (shouldLog) {
log.log(Level.WARNING, String.format("Transaction rollback bc still active - opened for %s", stopwatch.elapsed()));
log.log(Level.WARNING,
String.format("Transaction rollback bc still active - opened for %s", stopwatch.elapsed()));
}
}
return shouldLog;
Expand Down Expand Up @@ -150,6 +169,16 @@ public <T> QueryResults<T> run(Query<T> query, ExplainOptions explainOptions) {
return getDsTransaction().run(query, explainOptions);
}

@Override
public AggregationResults runAggregation(AggregationQuery query) {
return getDsTransaction().runAggregation(query);
}

@Override
public AggregationResults runAggregation(AggregationQuery query, ExplainOptions explainOptions) {
return getDsTransaction().runAggregation(query, explainOptions);
}

@Override
public void addWithDeferredIdAllocation(FullEntity<?>... fullEntities) {
getDsTransaction().addWithDeferredIdAllocation(fullEntities);
Expand Down Expand Up @@ -208,32 +237,36 @@ private synchronized Transaction getDsTransaction() {

private Collection<PipelineTaskQueue.TaskReference> commitTasks() {
if (!pendingTaskSpecsByQueue.isEmpty()) {
//noinspection unchecked
// noinspection unchecked
// pipeline specs
List<PipelineTaskQueue.TaskReference> taskReferences = new ArrayList<>();
pendingTaskSpecsByQueue.asMap()
.forEach((queue, tasks) -> {
// PoC: we can deal with the delay here prior to commit
Instant fixedNow = Instant.now();
Collection<PipelineTaskQueue.TaskSpec> delayedTasks = tasks.stream()
.map(task -> task.withScheduledExecutionTime(Optional.ofNullable(task.getScheduledExecutionTime()).orElse(fixedNow).plus(ENQUEUE_DELAY_FOR_SAFER_ROLLBACK)))
.collect(Collectors.toSet());

if (delayedTasks.size() != tasks.size()) {
HashSet<PipelineTaskQueue.TaskSpec> distinctTasks = new HashSet<>(tasks);
List<PipelineTaskQueue.TaskSpec> duplicatedTasks = tasks.stream()
.filter(task -> !distinctTasks.add(task))
.collect(Collectors.toList());
String message = String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", ")));
if (isCloud) {
log.log(Level.WARNING, message);
} else {
throw new IllegalStateException(String.format("Some identical pipeline tasks were enqueued. Duplicates are %s", duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", "))));
.forEach((queue, tasks) -> {
// PoC: we can deal with the delay here prior to commit
Instant fixedNow = Instant.now();
Collection<PipelineTaskQueue.TaskSpec> delayedTasks = tasks.stream()
.map(task -> task.withScheduledExecutionTime(Optional.ofNullable(task.getScheduledExecutionTime())
.orElse(fixedNow).plus(ENQUEUE_DELAY_FOR_SAFER_ROLLBACK)))
.collect(Collectors.toSet());

if (delayedTasks.size() != tasks.size()) {
HashSet<PipelineTaskQueue.TaskSpec> distinctTasks = new HashSet<>(tasks);
List<PipelineTaskQueue.TaskSpec> duplicatedTasks = tasks.stream()
.filter(task -> !distinctTasks.add(task))
.collect(Collectors.toList());
String message = String.format("Some identical pipeline tasks were enqueued. Duplicates are %s",
duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", ")));
if (isCloud) {
log.log(Level.WARNING, message);
} else {
throw new IllegalStateException(
String.format("Some identical pipeline tasks were enqueued. Duplicates are %s",
duplicatedTasks.stream().map(Object::toString).collect(Collectors.joining(", "))));
}
}
}

taskReferences.addAll(taskQueue.enqueue(queue, delayedTasks));
});
taskReferences.addAll(taskQueue.enqueue(queue, delayedTasks));
});
pendingTaskSpecsByQueue.clear();
return taskReferences;
} else {
Expand All @@ -242,16 +275,18 @@ private Collection<PipelineTaskQueue.TaskReference> commitTasks() {
}

private void rollbackTasks() {
// two cases here that should be mutually exclusive, but deal together for simplicity:
// two cases here that should be mutually exclusive, but deal together for
// simplicity:
// 1. if it was never enqueued, just clear the tasks
if (!pendingTaskSpecsByQueue.isEmpty()) {
log.log(Level.WARNING, String.format("Rollback never enqueued %d tasks", pendingTaskSpecsByQueue.asMap().values().stream().map(Collection::size).reduce(Integer::sum).orElse(-1)));
log.log(Level.WARNING, String.format("Rollback never enqueued %d tasks",
pendingTaskSpecsByQueue.asMap().values().stream().map(Collection::size).reduce(Integer::sum).orElse(-1)));
pendingTaskSpecsByQueue.clear();
}
// 2. if anything was enqueued, delete it,
if (!taskReferences.isEmpty()) {
log.log(Level.WARNING, String.format("Rollback already enqueued %d tasks: %s", taskReferences.size(),
taskReferences.stream().map(PipelineTaskQueue.TaskReference::getTaskName).collect(Collectors.joining(","))));
taskReferences.stream().map(PipelineTaskQueue.TaskReference::getTaskName).collect(Collectors.joining(","))));
taskQueue.deleteTasks(taskReferences);
taskReferences.clear();
}
Expand All @@ -266,8 +301,10 @@ private void rollbackAllServices() {
protected void finalize() throws Throwable {
try {
if (this.getDsTransaction().isActive()) {
// shouldn't happen, unless opening tnx just for read, just is kind of absurd in a strong consistency model
log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s", stopwatch.elapsed(TimeUnit.MILLISECONDS)));
// shouldn't happen, unless opening tnx just for read, just is kind of absurd in
// a strong consistency model
log.log(Level.WARNING, String.format("Finalizing PipelineBackendTransactionImpl transaction open for %s",
stopwatch.elapsed(TimeUnit.MILLISECONDS)));
}
} finally {
super.finalize();
Expand Down
Loading