Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima

* Added `__contains__` and `keys()` to `Element` in `gremlin-python`.
* Added `subgraph()` support for `gremlin-python` so that results are stored in a detached `Graph` object.
* Added support for remote transactions to the `gremlin-server` through `TransactionManager` and `UnmanagedTransaction`.
* Modified grammar to make `discard()` usage more consistent as a filter step where it can now be used to chain additional traversal steps and be used anonymously.
* Removed `Meta` field from `ResponseResult` struct in `gremlin-go`.
* Removed deprecated elements of the Java-based process testing suite: `ProcessStandardSuite`, `ProcessComputerSuite`, `ProcessLimitedSuite` and associated tests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class Context {
private ScheduledFuture<?> timeoutExecutor = null;
private boolean timeoutExecutorGrabbed = false;
private final Object timeoutExecutorLock = new Object();
private String transactionId; // initially null for non-transactional requests and begin() calls; set after transaction creation.

public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx,
final Settings settings, final GraphManager graphManager,
Expand All @@ -80,6 +81,7 @@ public Context(final RequestMessage requestMessage, final ChannelHandlerContext
this.requestState = requestState;
this.requestTimeout = determineTimeout();
this.materializeProperties = determineMaterializeProperties();
this.transactionId = requestMessage.getField(Tokens.ARGS_TRANSACTION_ID);
}

public void setTimeoutExecutor(final ScheduledFuture<?> timeoutExecutor) {
Expand Down Expand Up @@ -119,6 +121,15 @@ public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}

public String getTransactionId() {
return transactionId;
}

public void setTransactionId(final String transactionId) {
this.transactionId = transactionId;
}


/**
* Gets the current request to Gremlin Server.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ public synchronized CompletableFuture<Void> stop() {
logger.warn("Timeout waiting for boss/worker thread pools to shutdown - continuing with shutdown process.");
}

if (serverGremlinExecutor != null) {
Comment thread
Cole-Greer marked this conversation as resolved.
logger.info("Shutting down TransactionManager");
serverGremlinExecutor.getTransactionManager().shutdown();
}

// close TraversalSource and Graph instances - there aren't guarantees that closing Graph will close all
// spawned TraversalSource instances so both should be closed directly and independently.
if (serverGremlinExecutor != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,24 @@ public Settings() {
*/
public boolean strictTransactionManagement = false;

/**
* Time in milliseconds that a transaction can remain idle before it is automatically rolled back.
* This prevents resource leaks from abandoned transactions. Default is 600000 (10 minutes).
*/
public long transactionTimeout = 600000L;

/**
* Time in milliseconds to wait for a transaction commit or rollback operation to complete.
* Default is 10000 (10 seconds).
*/
public long perGraphCloseTimeout = 10000L;

/**
* Maximum number of concurrent transactions allowed on the server.
* Default is 1000.
*/
public int maxConcurrentTransactions = 1000;

/**
* The full class name of the {@link Channelizer} to use in Gremlin Server.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.tinkerpop.gremlin.server.handler.HttpRequestMessageDecoder;
import org.apache.tinkerpop.gremlin.server.handler.HttpUserAgentHandler;
import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler;
import org.apache.tinkerpop.gremlin.server.handler.TransactionManager;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
Expand All @@ -61,7 +62,8 @@ public class HttpChannelizer extends AbstractChannelizer {
@Override
public void init(final ServerGremlinExecutor serverGremlinExecutor) {
super.init(serverGremlinExecutor);
httpGremlinEndpointHandler = new HttpGremlinEndpointHandler(gremlinExecutor, graphManager, settings);
httpGremlinEndpointHandler = new HttpGremlinEndpointHandler(
gremlinExecutor, graphManager, settings, serverGremlinExecutor.getTransactionManager());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
Expand Down Expand Up @@ -57,6 +55,7 @@
import org.apache.tinkerpop.gremlin.structure.Column;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.util.CloseableIterator;
import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
Expand All @@ -82,32 +81,33 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static com.codahale.metrics.MetricRegistry.name;
import static io.netty.handler.codec.http.HttpHeaderNames.ACCEPT_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderNames.TRANSFER_ENCODING;
import static io.netty.handler.codec.http.HttpHeaderValues.CHUNKED;
import static io.netty.handler.codec.http.HttpHeaderValues.DEFLATE;
import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHED;
import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.FINISHING;
import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.NOT_STARTED;
import static org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler.RequestState.STREAMING;
import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendTrailingHeaders;
import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendHttpResponse;
import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.sendLastHttpContent;
import static org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtil.writeError;

/**
Expand Down Expand Up @@ -168,13 +168,16 @@ public class HttpGremlinEndpointHandler extends SimpleChannelInboundHandler<Requ
private final GremlinExecutor gremlinExecutor;
private final GraphManager graphManager;
private final Settings settings;
private final TransactionManager transactionManager;

public HttpGremlinEndpointHandler(final GremlinExecutor gremlinExecutor,
final GraphManager graphManager,
final Settings settings) {
final Settings settings,
final TransactionManager transactionManager) {
this.gremlinExecutor = gremlinExecutor;
this.graphManager = graphManager;
this.settings = settings;
this.transactionManager = transactionManager;
}

@Override
Expand Down Expand Up @@ -210,18 +213,36 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r
requestMessage.getGremlin());
}

// Send back the 200 OK response header here since the response is always chunk transfer encoded. Any
// failures that follow this will show up in the response body instead.
final HttpResponse responseHeader = new DefaultHttpResponse(HTTP_1_1, OK);
if (acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING))) {
responseHeader.headers().add(CONTENT_ENCODING, DEFLATE);
// These guards prevent any obvious failures from returning 200 OK early by detecting them here and
// throwing before any other processing starts so the user gets a better error code.
final String txId = requestCtx.getTransactionId();
final String gremlin = requestMessage.getGremlin();
if (isTransactionBegin(gremlin)) {
// If this is a begin transaction request then we need to create the Transaction ID first since the
// dual-transmission expectation means the response header below should contain it.

// This prevents accidentally re-opening the underlying transaction.
if (txId != null) throw new ProcessingException(GremlinError.beginHasTransactionId());

doBegin(requestCtx);
} else if (txId != null) {
// This check makes sure that the underlying Graph is already open to stop a closed transaction
// from re-opening due to the default autostart nature of transactions. This occurs in cases where a
// transactional traversal is submitted after a commit/rollback.
final Graph g = graphManager.getTraversalSource(requestMessage.getField(Tokens.ARGS_G)).getGraph();
if ((!g.tx().isOpen())) {
throw new ProcessingException(GremlinError.transactionNotFound(txId));
}
} else if ((txId == null) && (isTransactionCommit(gremlin) || isTransactionRollback(gremlin))) {
// Logically, commit/rollback should only be allowed on a transactional request.
throw new ProcessingException(GremlinError.transactionalControlRequiresTransaction());
}
responseHeader.headers().set(TRANSFER_ENCODING, CHUNKED);
responseHeader.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0());
ctx.writeAndFlush(responseHeader);
ctx.channel().attr(StateKey.HTTP_RESPONSE_SENT).set(true);

iterateScriptEvalResult(requestCtx, serializer.getValue1(), requestMessage);
// Send back the 200 OK response header here since the response is always chunk transfer encoded. Any
// failures that follow this will show up in the response body instead.
sendHttpResponse(ctx, OK, createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new));
sendHttpContents(ctx, requestCtx);
sendLastHttpContent(ctx, HttpResponseStatus.OK, "");
} catch (Throwable t) {
writeError(requestCtx, formErrorResponseMessage(t, requestMessage), serializer.getValue1());
} finally {
Expand All @@ -240,7 +261,10 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r
});

try {
final Future<?> executionFuture = requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
final boolean isBeginTransactionRequest = isTransactionBegin(requestMessage.getGremlin());
final Future<?> executionFuture = ((requestCtx.getTransactionId() != null) && !isBeginTransactionRequest) ?
transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) :
requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture);
if (seto > 0) {
// Schedule a timeout in the thread pool for future execution
requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(() -> {
Expand All @@ -252,6 +276,46 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r
}
} catch (RejectedExecutionException ree) {
writeError(requestCtx, GremlinError.rateLimiting(), serializer.getValue1());
} catch (NoSuchElementException nsee) {
writeError(requestCtx, GremlinError.transactionNotFound(requestCtx.getTransactionId()), serializer.getValue1());
}
}

private List<CharSequence> createResponseHeaders(final ChannelHandlerContext ctx,
final Pair<String, MessageSerializer<?>> serializer,
final Context requestCtx) {
final List<CharSequence> headers = new ArrayList<>();
headers.add(HttpHeaderNames.CONTENT_TYPE);
headers.add(serializer.getValue0());
if (acceptsDeflateEncoding(ctx.attr(StateKey.REQUEST_HEADERS).get().getAll(ACCEPT_ENCODING))) {
headers.add(CONTENT_ENCODING);
headers.add(DEFLATE);
}
if (requestCtx.getTransactionId() != null) {
headers.add(Tokens.Headers.TRANSACTION_ID);
headers.add(requestCtx.getTransactionId());
}
return headers;
}

private void sendHttpContents(final ChannelHandlerContext ctx, final Context requestContext) throws Exception {
final Pair<String, MessageSerializer<?>> serializer = ctx.channel().attr(StateKey.SERIALIZER).get();
final RequestMessage request = requestContext.getRequestMessage();
final String txId = requestContext.getTransactionId();
final Optional<UnmanagedTransaction> transaction = transactionManager.get(txId);

// Early guard against fake or incorrect transaction IDs.
if ((txId != null) && transaction.isEmpty()) throw new ProcessingException(GremlinError.transactionNotFound(txId));

if (isTransactionBegin(request.getGremlin())) {
runBegin(requestContext, transaction.get(), serializer);
} else if (isTransactionCommit(request.getGremlin())) {
handleGraphOp(requestContext, txId, Transaction::commit, serializer);
} else if (isTransactionRollback(requestContext.getRequestMessage().getGremlin())) {
handleGraphOp(requestContext, txId, Transaction::rollback, serializer);
} else {
// Both transactional and non-transactional traversals follow this path for response chunking.
iterateScriptEvalResult(requestContext, serializer.getValue1(), request);
}
}

Expand Down Expand Up @@ -372,6 +436,72 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau
}
}

/**
* Detects if the gremlin script is a transaction begin command.
*/
private boolean isTransactionBegin(final String gremlin) {
if (gremlin == null) return false;
return gremlin.trim().equalsIgnoreCase("g.tx().begin()");
}

/**
* Detects if the gremlin script is a transaction commit command.
*/
private boolean isTransactionCommit(final String gremlin) {
if (gremlin == null) return false;
return gremlin.trim().equalsIgnoreCase("g.tx().commit()");
}

/**
* Detects if the gremlin script is a transaction rollback command.
*/
private boolean isTransactionRollback(final String gremlin) {
if (gremlin == null) return false;
return gremlin.trim().equalsIgnoreCase("g.tx().rollback()");
}

/**
* Handle begin by creating an {@link UnmanagedTransaction} and submitting the open to its executor.
*/
private void doBegin(final Context ctx) throws Exception {
final String traversalSourceName = ctx.getRequestMessage().getField(Tokens.ARGS_G);

final UnmanagedTransaction txCtx;
try {
txCtx = transactionManager.create(traversalSourceName);
ctx.setTransactionId(txCtx.getTransactionId());
final Graph graph = graphManager.getTraversalSource(traversalSourceName).getGraph();
txCtx.submit(new FutureTask<>(() -> {
graph.tx().open();
return null;
})).get(5000, TimeUnit.MILLISECONDS); // Not an option for now, but 5s should be plenty.
} catch (IllegalStateException ise) {
throw new ProcessingException(GremlinError.maxTransactionsExceeded(ise.getMessage()));
} catch (IllegalArgumentException iae) {
throw new ProcessingException(GremlinError.binding(traversalSourceName));
} catch (UnsupportedOperationException uoe) {
throw new ProcessingException(GremlinError.transactionNotSupported(uoe));
} catch (ExecutionException | TimeoutException e) {
throw new ProcessingException(GremlinError.transactionUnableToStart(e.getMessage()));
}
}

private void runBegin(final Context ctx, UnmanagedTransaction tx, final Pair<String, MessageSerializer<?>> serializer) throws Exception {
final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, tx.getTransactionId())), false, false);
ctx.getChannelHandlerContext().writeAndFlush(new DefaultHttpContent(chunk));
}

private void handleGraphOp(final Context ctx,
final String transactionId,
final Consumer<Transaction> graphOp,
final Pair<String, MessageSerializer<?>> serializer) throws Exception {
final Graph graph = graphManager.getTraversalSource(ctx.getRequestMessage().getField(Tokens.ARGS_G)).getGraph();
graphOp.accept(graph.tx());
transactionManager.destroy(transactionId);
final ByteBuf chunk = makeChunk(ctx, serializer.getValue1(), List.of(Map.of(Tokens.ARGS_TRANSACTION_ID, transactionId)), false, false);
ctx.getChannelHandlerContext().writeAndFlush(new DefaultHttpContent(chunk));
}

private Bindings mergeBindingsFromRequest(final Context ctx, final Bindings bindings) throws ProcessingException {
// alias any global bindings to a different variable.
final RequestMessage msg = ctx.getRequestMessage();
Expand Down Expand Up @@ -431,7 +561,6 @@ private void handleIterator(final Context context, final Iterator itty, final Me
// it needs to be released here
if (chunk != null) chunk.release();
}
sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, "");
return;
}

Expand Down Expand Up @@ -518,10 +647,6 @@ private void handleIterator(final Context context, final Iterator itty, final Me
}

nettyContext.writeAndFlush(new DefaultHttpContent(chunk));

if (!hasMore) {
sendTrailingHeaders(nettyContext, HttpResponseStatus.OK, "");
}
}
} else {
final long currentTime = System.currentTimeMillis();
Expand Down
Loading
Loading