Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added `__contains__` and `keys()` to `Element` in `gremlin-python`.
* Added `subgraph()` support for `gremlin-python` so that results are stored in a detached `Graph` object.
* Added support for remote transactions to the `gremlin-server` through `TransactionManager` and `UnmanagedTransaction`.
* Added support for transactions to `gremlin-driver` using the new `HttpRemoteTransaction`.
* Modified grammar to make `discard()` usage more consistent as a filter step where it can now be used to chain additional traversal steps and be used anonymously.
* Removed `Meta` field from `ResponseResult` struct in `gremlin-go`.
* Removed deprecated elements of the Java-based process testing suite: `ProcessStandardSuite`, `ProcessComputerSuite`, `ProcessLimitedSuite` and associated tests.
Expand All @@ -38,6 +39,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added `RequestInterceptor` to `gremlin-go` with `auth` reference implementations to replace `authInfo`.
* Refactored GraphBinary serializers to use `io.Writer` and `io.Reader` instead of `*bytes.Buffer` for streaming capacities.
* Refactored `httpProtocol` and `httpTransport` in `gremlin-go` into single `connection.go` that handles HTTP request and response.
* Refactored result handling in `gremlin-driver` by merging `ResultQueue` into `ResultSet`.
* Replace `Bytecode` with `GremlinLang` in `gremlin-dotnet`.
* Replace `WebSocket` with `HTTP` (non-streaming) in `gremlin-dotnet`.
* Added `MimeType` to `IMessageSerializer` and split client option to allow separate request and response serialization in `gremlin-dotnet`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,14 +703,19 @@ public <S> GraphTraversal<S, S> io(final String file) {

/**
* Proxies calls through to the underlying {@link Graph#tx()} or to the {@link RemoteConnection#tx()}.
* <p>
* When a remote connection is present, this method delegates to the connection's
* {@link RemoteConnection#tx()} method, which returns an appropriate transaction
* implementation for the remote connection type (e.g., {@code HttpRemoteTransaction}
* for HTTP-based connections).
*
* @return A {@link Transaction} for managing transactional operations
*/
public Transaction tx() {
if (null == this.connection)
return this.graph.tx();
else {
throw new UnsupportedOperationException("TinkerPop 4 does not yet support remote transactions");
}

else
return this.connection.tx();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ abstract class AbstractChannelizer extends ChannelInitializer<SocketChannel> imp
protected Connection connection;
protected Cluster cluster;
protected SslHandler sslHandler;
private AtomicReference<ResultQueue> pending;
private AtomicReference<ResultSet> pending;

protected static final String PIPELINE_GREMLIN_HANDLER = "gremlin-handler";
protected static final String PIPELINE_SSL_HANDLER = "gremlin-ssl-handler";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class Client {
public abstract class Client implements RequestSubmitter, RequestSubmitterAsync {

private static final Logger logger = LoggerFactory.getLogger(Client.class);
public static final String TOO_MANY_IN_FLIGHT_REQUESTS = "Number of active requests (%s) exceeds pool size (%s). " +
Expand Down Expand Up @@ -239,6 +239,7 @@ public CompletableFuture<ResultSet> submitAsync(final String gremlin, final Requ
options.getLanguage().ifPresent(lang -> request.addLanguage(lang));
options.getMaterializeProperties().ifPresent(mp -> request.addMaterializeProperties(mp));
options.getBulkResults().ifPresent(bulked -> request.addBulkResults(Boolean.parseBoolean(bulked)));
options.getTransactionId().ifPresent(transactionId -> request.addTransactionId(transactionId));

return submitAsync(request.create());
}
Expand Down Expand Up @@ -286,6 +287,13 @@ public Cluster getCluster() {
return cluster;
}

protected Host chooseRandomHost() {
cluster.init();
final List<Host> hosts = new ArrayList<>(cluster.allHosts());
final int ix = random.nextInt(hosts.size());
return hosts.get(ix);
}

/**
* A {@code Client} implementation. Requests are sent to multiple servers given a {@link LoadBalancingStrategy}.
* Transactions are automatically committed (or rolled-back on error) after each request.
Expand Down Expand Up @@ -358,12 +366,6 @@ protected Connection chooseConnection(final RequestMessage msg) throws TimeoutEx
return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}

private Host chooseRandomHost() {
final List<Host> hosts = new ArrayList<>(cluster.allHosts());
final int ix = random.nextInt(hosts.size());
return hosts.get(ix);
}

/**
* Initializes the connection pools on all hosts.
*/
Expand Down Expand Up @@ -561,5 +563,58 @@ public Client alias(final String graphOrTraversalSource) {
if (close.isDone()) throw new IllegalStateException("Client is closed");
return new AliasClusteredClient(client, graphOrTraversalSource);
}

}

/**
* A {@link Client} that pins all requests to a single {@link Host}. Used internally by transactions
* to ensure all requests within a transaction go to the same server.
* <p>
* This client is not intended to be used directly — obtain a {@link org.apache.tinkerpop.gremlin.structure.Transaction}
* via {@link Cluster#transact()} or {@link Cluster#transact(String)} instead.
*/
public static class PinnedClient extends Client {
private final ClusteredClient clusteredClient;
private final Host pinnedHost;
private final AtomicReference<CompletableFuture<Void>> closing = new AtomicReference<>(null);

PinnedClient(final Cluster cluster) {
super(cluster);
this.pinnedHost = chooseRandomHost();
this.clusteredClient = cluster.connect();
}

public Host getPinnedHost() {
return pinnedHost;
}

@Override
protected void initializeImplementation() {
this.clusteredClient.init();
initialized = true;
}

@Override
protected Connection chooseConnection(final RequestMessage msg) throws TimeoutException, ConnectionException {
final ConnectionPool pool = clusteredClient.hostConnectionPools.get(pinnedHost);
if (pool == null) throw new NoHostAvailableException();
return pool.borrowConnection(cluster.connectionPoolSettings().maxWaitForConnection, TimeUnit.MILLISECONDS);
}

@Override
public boolean isClosing() {
return closing.get() != null;
}

/**
* Marks this client as closed. The underlying pool is owned by {@link ClusteredClient} and is not closed here.
*/
@Override
public synchronized CompletableFuture<Void> closeAsync() {
if (closing.get() != null) return closing.get();

closing.set(clusteredClient.closeAsync());
return closing.get();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.tinkerpop.gremlin.driver.auth.Auth;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.interceptor.PayloadSerializingInterceptor;
import org.apache.tinkerpop.gremlin.driver.remote.HttpRemoteTransaction;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.util.MessageSerializer;
import org.apache.tinkerpop.gremlin.util.message.RequestMessage;
import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV4;
Expand Down Expand Up @@ -63,6 +66,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -94,49 +98,31 @@ public synchronized void init() {
}

/**
* Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to
* a single server (randomly selected from the cluster), where the same bindings will be available on each request.
* Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a
* single request. The transactions are managed by the user and must be committed or rolled-back manually.
* <p/>
* Note that calling this method does not imply that a connection is made to the server itself at this point.
* Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an
* error will not be raised at this point. Connections get initialized in the {@link Client} when a request is
* submitted or can be directly initialized via {@link Client#init()}.
*
* @param sessionId user supplied id for the session which should be unique (a UUID is ideal).
* Creates a new {@link Client} based on the settings provided.
*/
public <T extends Client> T connect(final String sessionId) {
throw new UnsupportedOperationException("not implemented");
public <T extends Client> T connect() {
final Client client = new Client.ClusteredClient(this);
manager.trackClient(client);
return (T) client;
}

/**
* Creates a SessionedClient instance to this {@code Cluster}, meaning requests will be routed to
* a single server (randomly selected from the cluster), where the same bindings will be available on each request.
* Requests are bound to the same thread on the server and thus transactions may extend beyond the bounds of a
* single request. If {@code manageTransactions} is set to {@code false} then transactions are managed by the
* user and must be committed or rolled-back manually. When set to {@code true} the transaction is committed or
* rolled-back at the end of each request.
* <p/>
* Note that calling this method does not imply that a connection is made to the server itself at this point.
* Therefore, if there is only one server specified in the {@code Cluster} and that server is not available an
* error will not be raised at this point. Connections get initialized in the {@link Client} when a request is
* submitted or can be directly initialized via {@link Client#init()}.
*
* @param sessionId user supplied id for the session which should be unique (a UUID is ideal).
* @param manageTransactions enables auto-transactions when set to true
* Creates a new {@link Transaction} using the server's default traversal source.
* The server will bind to "g" by default when no traversal source is specified.
*/
public <T extends Client> T connect(final String sessionId, final boolean manageTransactions) {
throw new UnsupportedOperationException("not implemented");
public RemoteTransaction transact() {
return transact(null);
}

/**
* Creates a new {@link Client} based on the settings provided.
* Creates a new {@link Transaction} bound to the specified graph or traversal source.
*
* @param graphOrTraversalSource the graph/traversal source alias, or null to use the server default
*/
public <T extends Client> T connect() {
final Client client = new Client.ClusteredClient(this);
manager.trackClient(client);
return (T) client;
public RemoteTransaction transact(final String graphOrTraversalSource) {
final Client.PinnedClient pinnedClient = new Client.PinnedClient(this);
manager.trackClient(pinnedClient);
return new HttpRemoteTransaction(pinnedClient, graphOrTraversalSource);
}

@Override
Expand Down Expand Up @@ -365,6 +351,14 @@ public Collection<Host> allHosts() {
return Collections.unmodifiableCollection(manager.allHosts());
}

public void trackTransaction(final HttpRemoteTransaction tx) {
manager.trackTransaction(tx);
}

public void untrackTransaction(final HttpRemoteTransaction tx) {
manager.untrackTransaction(tx);
}

Factory getFactory() {
return manager.factory;
}
Expand Down Expand Up @@ -936,6 +930,7 @@ Future<?> shutdown() {

class Manager {
private final ConcurrentMap<InetSocketAddress, Host> hosts = new ConcurrentHashMap<>();
private final Set<HttpRemoteTransaction> openTransactions = ConcurrentHashMap.newKeySet();
private boolean initialized;
private final List<InetSocketAddress> contactPoints;
private final Factory factory;
Expand Down Expand Up @@ -1081,6 +1076,14 @@ void trackClient(final Client client) {
openedClients.add(new WeakReference<>(client));
}

void trackTransaction(final HttpRemoteTransaction tx) {
openTransactions.add(tx);
}

void untrackTransaction(final HttpRemoteTransaction tx) {
openTransactions.remove(tx);
}

public Host add(final InetSocketAddress address) {
final Host newHost = new Host(address, Cluster.this);
final Host previous = hosts.putIfAbsent(address, newHost);
Expand All @@ -1096,6 +1099,17 @@ synchronized CompletableFuture<Void> close() {
if (closeFuture.get() != null)
return closeFuture.get();

// best-effort rollback of any open transactions before closing
// snapshot to avoid concurrent modification since rollback() calls untrackTransaction()
new ArrayList<>(openTransactions).forEach(tx -> {
try {
tx.rollback();
} catch (Exception e) {
logger.warn("Failed to rollback transaction on cluster close", e);
}
});
openTransactions.clear();

final List<CompletableFuture<Void>> clientCloseFutures = new ArrayList<>(openedClients.size());
for (WeakReference<Client> openedClient : openedClients) {
final Client client = openedClient.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.net.URI;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,7 +55,7 @@ final class Connection {

private final Channel channel;
private final URI uri;
private final AtomicReference<ResultQueue> pending = new AtomicReference<>();
private final AtomicReference<ResultSet> pending = new AtomicReference<>();
private final Cluster cluster;
private final Client client;
private final ConnectionPool pool;
Expand Down Expand Up @@ -155,7 +154,7 @@ Cluster getCluster() {
return cluster;
}

AtomicReference<ResultQueue> getPending() {
AtomicReference<ResultSet> getPending() {
return pending;
}

Expand Down Expand Up @@ -184,9 +183,9 @@ public synchronized CompletableFuture<Void> closeAsync() {
return future;
}

public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultQueueSetup) {
// once there is a completed write, then create a traverser for the result set and complete
// the promise so that the client knows that that it can start checking for results.
public ChannelPromise write(final RequestMessage requestMessage, final CompletableFuture<ResultSet> resultSetFuture) {
// once there is a completed write, then create a ResultSet and complete
// the promise so that the client knows that it can start checking for results.
final Connection thisConnection = this;

final ChannelPromise requestPromise = channel.newPromise()
Expand All @@ -198,12 +197,11 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab

handleConnectionCleanupOnError(thisConnection);

cluster.executor().submit(() -> resultQueueSetup.completeExceptionally(f.cause()));
cluster.executor().submit(() -> resultSetFuture.completeExceptionally(f.cause()));
} else {
final LinkedBlockingQueue<Result> resultLinkedBlockingQueue = new LinkedBlockingQueue<>();
final CompletableFuture<Void> readCompleted = new CompletableFuture<>();
final ResultSet resultSet = new ResultSet(cluster.executor(), requestMessage, pool.host);

readCompleted.whenCompleteAsync((v, t) -> {
resultSet.getReadCompleted().whenCompleteAsync((v, t) -> {
if (t != null) {
// the callback for when the read failed. a failed read means the request went to the server
// and came back with a server-side error of some sort. it means the server is responsive
Expand All @@ -212,7 +210,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
logger.debug("Error while processing request on the server {}.", this, t);
handleConnectionCleanupOnError(thisConnection);
} else {
// the callback for when the read was successful, meaning that ResultQueue.markComplete()
// the callback for when the read was successful, meaning that ResultSet.markComplete()
// was called
thisConnection.returnToPool();
}
Expand All @@ -223,15 +221,12 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
tryShutdown();
}, cluster.executor());

final ResultQueue handler = new ResultQueue(resultLinkedBlockingQueue, readCompleted);
// pending.put(requestMessage.getRequestId(), handler);
pending.set(handler);
pending.set(resultSet);

// resultQueueSetup should only be completed by a worker since the application code might have sync
// resultSetFuture should only be completed by a worker since the application code might have sync
// completion stages attached to it which and we do not want the event loop threads to process those
// stages.
cluster.executor().submit(() -> resultQueueSetup.complete(
new ResultSet(handler, cluster.executor(), readCompleted, requestMessage, pool.host)));
cluster.executor().submit(() -> resultSetFuture.complete(resultSet));
}
});
channel.writeAndFlush(requestMessage, requestPromise);
Expand Down
Loading
Loading