diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 963ad54f5c0..b1456d0ec1a 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -69,6 +69,8 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima ** Reconciled the `validationRequest` default: the builder default is now `g.inject(0)` to match the `Settings` default (it was previously `''`). * Removed `Transaction.open()` in favor of `begin()`, which is now the single transaction-start primitive across embedded and remote contexts. * Changed `begin()` and `close()` to be idempotent and calling it when a transaction is already in that state no longer throws. +* Added `maxTransactionLifetime` setting to Gremlin Server, an absolute cap on the total age of an HTTP transaction that interrupts a running operation and rolls the transaction back when it fires (default 600000ms, set to `0` to disable). +* Changed the Gremlin Server HTTP transaction idle timer to suspend while an operation is running (so a long-running operation is bounded by `evaluationTimeout` rather than the idle timeout) and to honor `0` as "disable idle reclamation"; the `idleTransactionTimeout` default is now 60000ms. * Added configurable CORS `allowedOrigins` setting to Gremlin Server; warns when wildcard origin is used alongside authentication. * Standardized `gremlin-go` connection options per the TinkerPop 4.x GLV proposal: ** Moved `BasicAuth`/`SigV4Auth`/`SigV4AuthWithCredentials` out of package `gremlingo` into a new `auth` sub-package as `auth.Basic`/`auth.SigV4`/`auth.SigV4WithCredentials`. The flat `gremlingo` functions have been removed; use the `auth` sub-package. diff --git a/docs/src/dev/provider/index.asciidoc b/docs/src/dev/provider/index.asciidoc index a09c67b9154..ec642cbfa14 100644 --- a/docs/src/dev/provider/index.asciidoc +++ b/docs/src/dev/provider/index.asciidoc @@ -1430,14 +1430,21 @@ rejects it with HTTP 400. This prevents cross-graph operations within a single t ==== Transaction Timeout and Idle Reclamation -Servers implement a configurable transaction timeout (`transactionTimeout`, default 600000ms). The timeout represents -how long a transaction can sit idle with no requests before the server forcibly rolls it back and removes it. The -timeout resets on each request received for that transaction, so active transactions are not affected. After a timeout -fires, any subsequent request with that transaction ID receives a 404 response. +A transaction can be bounded at three independent scopes, each disabled with `0`. The `evaluationTimeout` bounds a +single operation. The `idleTransactionTimeout` (default 60000ms) bounds the idle gaps between operations: how long a +transaction may remain idle, with no operation running or queued, before it is rolled back and removed; once it is +removed, a subsequent request with that transaction ID receives a 404. The `maxTransactionLifetime` (default 600000ms) +bounds the total age of the transaction regardless of activity, and so may end a transaction while an operation is still +running; the in-flight request then receives a 504 (`TransactionException`) and subsequent requests receive a 404. + +The defaults bound a transaction without any operator configuration; disabling them is a deliberate choice, and a +per-request `timeoutMs` is honored as sent rather than overridden by the server. How promptly a running operation is +actually interrupted when a bound fires is a property of the provider's execution and traversal machinery, not +guaranteed by these settings. ==== Transaction Capacity Limits -Servers enforce a configurable maximum number of concurrent open transactions (`maxConcurrentTransactions`, default +Servers may enforce a configurable maximum number of concurrent open transactions (`maxConcurrentTransactions`, default 1000). When the limit is reached, new begin requests are rejected with HTTP 503. Slots are freed when transactions close via commit, rollback, or timeout. diff --git a/docs/src/reference/gremlin-applications.asciidoc b/docs/src/reference/gremlin-applications.asciidoc index 6494972cdfa..eaa9950b697 100644 --- a/docs/src/reference/gremlin-applications.asciidoc +++ b/docs/src/reference/gremlin-applications.asciidoc @@ -930,6 +930,7 @@ The following table describes the various YAML configuration options that Gremli |gremlinPool |The number of "Gremlin" threads available to execute actual scripts in a `ScriptEngine`. This pool represents the workers available to handle blocking operations in Gremlin Server. When set to `0`, Gremlin Server will use the value provided by `Runtime.availableProcessors()`. |0 |host |The name of the host to bind the server to. |localhost |idleConnectionTimeout |Time in milliseconds that the server will allow a channel to not receive requests from a client before it automatically closes. If enabled, the value provided should typically exceed the amount of time given to `keepAliveInterval`. Note that while this value is to be provided as milliseconds it will resolve to second precision. Set this value to `0` to disable this feature. |0 +|idleTransactionTimeout |Time in milliseconds that a transaction can remain idle (no operation running or queued) before the server forcibly rolls it back and removes it. The idle timer is suspended while an operation is running, so a long-running operation does not trip it (its duration is instead bounded by `evaluationTimeout`); the timer is armed only once the transaction returns to idle. Set to `0` to disable idle reclamation. |60000 |keepAliveInterval |Time in milliseconds that the server will allow a channel to not send responses to a client before it sends a "ping" to see if it is still present. If it is present, the client should respond with a "pong" which will thus reset the `idleConnectionTimeout` and keep the channel open. If enabled, this number should be smaller than the value provided to the `idleConnectionTimeout`. Note that while this value is to be provided as milliseconds it will resolve to second precision. Set this value to `0` to disable this feature. |0 |maxAccumulationBufferComponents |Maximum number of request components that can be aggregated for a message. |1024 |maxChunkSize |The maximum length of the content or each chunk. If the content length exceeds this value, the transfer encoding of the decoded request will be converted to 'chunked' and the content will be split into multiple `HttpContent` objects. If the transfer encoding of the HTTP request is 'chunked' already, each chunk will be split into smaller chunks if the length of the chunk exceeds this value. |8192 @@ -937,6 +938,7 @@ The following table describes the various YAML configuration options that Gremli |maxHeaderSize |The maximum length of all headers. |8192 |maxInitialLineLength |The maximum length of the initial line (e.g. "GET / HTTP/1.0") processed in a request, which essentially controls the maximum length of the submitted URI. |4096 |maxParameters |The maximum number of parameters that can be passed on a request. Larger numbers may impact performance for scripts. This configuration only applies to the `HttpChannelizer`. |16 +|maxTransactionLifetime |Absolute cap in milliseconds on the total age of a transaction regardless of activity. Unlike `idleTransactionTimeout`, it fires even while an operation is running, interrupting it and rolling the transaction back. Set to `0` to disable the cap. |600000 |maxRequestContentLength |The maximum length of the aggregated content for a request message. Works in concert with `maxChunkSize` where chunked requests are accumulated back into a single message. A request exceeding this size will return a `413 - Request Entity Too Large` status code. |10485760 |maxWorkQueueSize |The maximum size the general processing queue can grow before the `gremlinPool` starts to reject requests. |8192 |metrics.consoleReporter.enabled |Turns on console reporting of metrics. |false @@ -983,7 +985,6 @@ The following table describes the various YAML configuration options that Gremli |strictTransactionManagement |Set to `true` to require `aliases` to be submitted on every requests, where the `aliases` become the scope of transaction management. |false |threadPoolBoss |The number of threads available to Gremlin Server for accepting connections. Should always be set to `1`. |1 |threadPoolWorker |The number of threads available to Gremlin Server for processing non-blocking reads and writes. |1 -|transactionTimeout |Time in milliseconds that a transaction can sit idle (no requests) before the server forcibly rolls it back and removes it. The timeout resets on each request received for that transaction. Set to `0` to disable this feature. |600000 |useEpollEventLoop |Try to use epoll event loops (works only on Linux os) instead of netty NIO. |false |writeBufferHighWaterMark | If the number of bytes in the network send buffer exceeds this value then the channel is no longer writeable, accepting no additional writes until buffer is drained and the `writeBufferLowWaterMark` is met. |65536 |writeBufferLowWaterMark | Once the number of bytes queued in the network send buffer exceeds the `writeBufferHighWaterMark`, the channel will not become writeable again until the buffer is drained and it drops below this value. |32768 @@ -2259,16 +2260,26 @@ IMPORTANT: Not all graph implementations support explicit transactions (for exam Use `TinkerTransactionGraph` or another graph implementation that supports explicit transactions. Attempting to begin an explicit transaction on a graph that does not support them will result in an error. -Two settings in the Gremlin Server YAML control transaction resource usage: - -* `transactionTimeout` (default: 600000ms) -- How long a transaction can sit idle before the server forcibly rolls it - back. The timeout resets on each request received for that transaction. -* `maxConcurrentTransactions` (default: 1000) -- The maximum number of open transactions allowed. When the limit is - reached, new begin requests are rejected with HTTP 503. +Several settings in the Gremlin Server YAML control transaction resource usage. The `idleTransactionTimeout` +(default 60000ms) governs how long a transaction can remain idle (no operation running or queued) before the +server forcibly rolls it back. The idle timer is suspended while an operation is running, so a long-running operation +does not trip it; it is armed only once the transaction returns to idle. Set it to `0` to disable idle reclamation. The +`maxTransactionLifetime` (default 600000ms) is an absolute cap on the total age of a transaction regardless of +activity. Unlike `idleTransactionTimeout`, it fires even while an operation is running, interrupting it and rolling the +transaction back. It bounds transaction lifetime and concurrency-slot occupancy absolutely; like `evaluationTimeout`, +its ability to free the underlying worker thread depends on the operation reaching an interruptible point. Set it to +`0` to disable the cap. Finally, `maxConcurrentTransactions` (default 1000) caps the number of open transactions +allowed; when the limit is reached, new begin requests are rejected with HTTP 503. + +These compose with the per-operation `evaluationTimeout` (and its per-request `timeoutMs` override) to bound a +transaction at three independent scopes: a single operation (`evaluationTimeout`), the gaps between operations +(`idleTransactionTimeout`), and the transaction as a whole (`maxTransactionLifetime`). The defaults keep a transaction +bounded out of the box; disabling all of them is a deliberate operator choice, and a per-request `timeoutMs` is always +honored as sent. Each open transaction consumes a dedicated thread on the server to maintain thread-local transaction state for the -underlying graph. Ensure that clients close transactions promptly and that the `transactionTimeout` is set to reclaim -abandoned ones. The `transactions` gauge metric can be used to monitor usage. +underlying graph. Ensure that clients close transactions promptly and that the `idleTransactionTimeout` is set to +reclaim abandoned ones. The `transactions` gauge metric can be used to monitor usage. In load-balanced deployments, all requests within a transaction must reach the same server instance because transaction state is local to the server that created it. The `X-Transaction-Id` header is available for load diff --git a/gremlin-server/conf/gremlin-server-transaction.yaml b/gremlin-server/conf/gremlin-server-transaction.yaml index 6b61f40a255..6db62912a94 100644 --- a/gremlin-server/conf/gremlin-server-transaction.yaml +++ b/gremlin-server/conf/gremlin-server-transaction.yaml @@ -31,6 +31,8 @@ metrics: { jmxReporter: {enabled: true}, slf4jReporter: {enabled: true, interval: 180000}} strictTransactionManagement: false +idleTransactionTimeout: 60000 +maxTransactionLifetime: 600000 idleConnectionTimeout: 0 keepAliveInterval: 0 maxInitialLineLength: 4096 diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java index 51373d77410..598514ed6c0 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java @@ -57,6 +57,10 @@ public class Context { private final Object timeoutExecutorLock = new Object(); private String transactionId; // initially null for non-transactional requests and begin() calls; set after transaction creation. private Map parameters = new HashMap<>(); // only available after string parameters are parsed by grammar. + // Set by the transaction's lifetime cap (on the scheduler thread) before it interrupts this request's operation, and + // read as the interrupt unwinds the operation (on the transaction worker thread) to report an accurate + // transaction-timeout error rather than a generic evaluation timeout. volatile for cross-thread visibility. + private volatile boolean closedByLifetimeCap = false; public Context(final RequestMessage requestMessage, final ChannelHandlerContext ctx, final Settings settings, final GraphManager graphManager, @@ -143,6 +147,22 @@ public void setTransactionId(final String transactionId) { this.transactionId = transactionId; } + /** + * Marks this request's operation as having been interrupted because its transaction hit its absolute lifetime cap. + * Set by the transaction's lifetime cap before it interrupts the operation. + */ + public void setClosedByLifetimeCap(final boolean closedByLifetimeCap) { + this.closedByLifetimeCap = closedByLifetimeCap; + } + + /** + * Returns {@code true} if this request's operation was interrupted by its transaction's absolute lifetime cap, in + * which case the resulting interrupt should be reported as a transaction timeout rather than an evaluation timeout. + */ + public boolean isClosedByLifetimeCap() { + return closedByLifetimeCap; + } + public Map getParameters() { return this.parameters; } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java index 8d6111ef27e..c8f718a774f 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Settings.java @@ -185,10 +185,13 @@ public Settings() { public boolean strictTransactionManagement = false; /** - * Time in milliseconds that a transaction can remain idle before it is automatically rolled back. - * This prevents resource leaks from abandoned transactions. Default is 600000 (10 minutes). + * Time in milliseconds that a transaction can remain idle (no operation running or queued) before it is + * automatically rolled back. This prevents resource leaks from abandoned transactions. The idle timer is suspended + * while an operation is in progress, so a long-running operation does not trip it (its duration is instead bounded + * by {@link #evaluationTimeout}). Set to {@code 0} to disable idle reclamation entirely. Default is 60000 + * (1 minute). */ - public long transactionTimeout = 600000L; + public long idleTransactionTimeout = 60000L; /** * Time in milliseconds to wait for a transaction commit or rollback operation to complete. @@ -202,6 +205,17 @@ public Settings() { */ public int maxConcurrentTransactions = 1000; + /** + * Absolute ceiling, in milliseconds, on the total age of a transaction regardless of activity. Unlike + * {@link #idleTransactionTimeout} (which only reclaims idle transactions), this cap fires even while an operation is + * running, interrupting it and rolling the transaction back, so it bounds how long a single transaction can hold its + * dedicated worker thread and concurrency slot. The bound on transaction lifetime and slot occupancy is absolute; + * the bound on thread occupancy is best-effort in the same way {@link #evaluationTimeout} is, since interrupting a + * running operation only takes effect when it reaches an interruptible point. Set to {@code 0} to disable the cap. + * Default is 600000 (10 minutes). + */ + public long maxTransactionLifetime = 600000L; + /** * The full class name of the {@link Channelizer} to use in Gremlin Server. */ diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java index ae540582b3d..eed9428c1d9 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandler.java @@ -184,6 +184,14 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r ctx.channel().attr(StateKey.RESPONSE_COORDINATOR).set(coordinator); final Timer.Context timerContext = evalOpTimer.time(); + + // Resolve the target transaction once for a transactional (non-begin) request and reuse it at submit below, so + // the work runs against exactly the transaction resolved here. Empty for begins / non-transactional requests, + // and also when the id is unknown (the submit path turns that into a 404). + final boolean isTransactionalOp = (requestCtx.getTransactionId() != null) && !requestCtx.isTransactionBegin(); + final Optional txForRequest = + isTransactionalOp ? transactionManager.get(requestCtx.getTransactionId()) : Optional.empty(); + // timeout override - handle both deprecated and newly named configuration. earlier logic should prevent // both configurations from being submitted at the same time final Long timeoutMs = requestMessage.getField(Tokens.TIMEOUT_MS); @@ -277,7 +285,7 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r coordinator.writeHeader(createResponseHeaders(ctx, serializer, requestCtx).toArray(CharSequence[]::new)); sendHttpContents(ctx, requestCtx, coordinator); } catch (Throwable t) { - coordinator.writeError(formErrorResponseMessage(t, requestMessage)); + coordinator.writeError(formErrorResponseMessage(t, requestMessage, requestCtx)); } finally { // Idempotent terminal backstop: if the data or error path already terminated the response, complete() // is a no-op via its COMPLETED short-circuit. It runs in finally — not at the end of the try — so the @@ -301,9 +309,11 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r }); try { - final boolean isBeginTransactionRequest = requestCtx.isTransactionBegin(); - final Future executionFuture = ((requestCtx.getTransactionId() != null) && !isBeginTransactionRequest) ? - transactionManager.get(requestCtx.getTransactionId()).get().submit(evalFuture) : + // Reuse the transaction resolved above (txForRequest) rather than looking it up again. For a transactional + // op an empty Optional means the id is unknown/reclaimed: get() throws NoSuchElementException, caught below + // and reported as a 404, preserving the prior behavior. + final Future executionFuture = isTransactionalOp ? + txForRequest.get().submit(evalFuture, requestCtx) : requestCtx.getGremlinExecutor().getExecutorService().submit(evalFuture); if (seto > 0) { // Schedule a timeout in the thread pool for future execution. The coordinator's monitor guarantees @@ -311,7 +321,12 @@ public void channelRead0(final ChannelHandlerContext ctx, final RequestMessage r // first wins, and the other's write becomes a no-op. requestCtx.setTimeoutExecutor(requestCtx.getScheduledExecutorService().schedule(() -> { executionFuture.cancel(true); - coordinator.writeError(GremlinError.timeout(requestMessage)); + // If the lifetime cap fired for this same operation (it flags the Context before interrupting), + // report the cap's 504 even when this eval-timeout task is the one that writes - so a cap-kill is + // never mislabeled as a generic "increase evaluationTimeout" 500 just because of writer ordering. + coordinator.writeError(requestCtx.isClosedByLifetimeCap() + ? GremlinError.transactionTimeout(requestCtx.getTransactionId(), "execute") + : GremlinError.timeout(requestMessage)); }, seto, TimeUnit.MILLISECONDS)); } } catch (RejectedExecutionException ree) { @@ -360,7 +375,7 @@ private void sendHttpContents(final ChannelHandlerContext ctx, final Context req } } - private GremlinError formErrorResponseMessage(Throwable t, RequestMessage requestMessage) { + GremlinError formErrorResponseMessage(Throwable t, RequestMessage requestMessage, final Context requestCtx) { if (t instanceof UndeclaredThrowableException) t = t.getCause(); // if any exception in the chain is TemporaryException or Failure then we should respond with the @@ -385,6 +400,13 @@ private GremlinError formErrorResponseMessage(Throwable t, RequestMessage reques return GremlinError.longFrame(t); } if (t instanceof InterruptedException || t instanceof TraversalInterruptedException) { + // An interrupt here is normally an evaluation timeout, but it is also how a transaction's absolute lifetime + // cap stops a running operation. In the cap case the transaction flagged this request's Context before + // interrupting, so report an accurate transaction-timeout (504) rather than the generic "increase + // evaluationTimeout" error (500), whose advice would be misleading for a lifetime-cap kill. + if (requestCtx != null && requestCtx.isClosedByLifetimeCap()) { + return GremlinError.transactionTimeout(requestCtx.getTransactionId(), "execute"); + } return GremlinError.timeout(requestMessage); } if (t instanceof TimedInterruptTimeoutException) { @@ -509,7 +531,7 @@ private void doBegin(final Context ctx) throws Exception { txCtx.submit(new FutureTask<>(() -> { graph.tx().begin(); return null; - })).get(5000, TimeUnit.MILLISECONDS); // Not an option for now, but 5s should be plenty. + }), ctx).get(5000, TimeUnit.MILLISECONDS); // Not an option for now, but 5s should be plenty. } catch (IllegalStateException ise) { throw new ProcessingException(GremlinError.maxTransactionsExceeded(ise.getMessage())); } catch (IllegalArgumentException iae) { diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java index 6901664c153..6144b5fb527 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManager.java @@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static com.codahale.metrics.MetricRegistry.name; @@ -41,9 +43,14 @@ public class TransactionManager { private static final Logger logger = LoggerFactory.getLogger(TransactionManager.class); private final ConcurrentMap transactions = new ConcurrentHashMap<>(); + // Absolute-lifetime timers, one per transaction with the cap enabled, keyed by transaction id. The manager owns the + // lifetime cap because it is scoped to the transaction's existence in the registry (a single fixed schedule), unlike + // the activity-driven idle timer that the transaction must own to see its executor's running/idle transitions. + private final ConcurrentMap> lifetimeTimers = new ConcurrentHashMap<>(); private final ScheduledExecutorService scheduledExecutorService; private final GraphManager graphManager; - private final long transactionTimeoutMs; + private final long idleTransactionTimeoutMs; + private final long maxTransactionLifetimeMs; private final int maxConcurrentTransactions; private final long perGraphCloseMs; @@ -52,23 +59,26 @@ public class TransactionManager { * * @param scheduledExecutorService Scheduler for timeout management * @param graphManager The graph manager for accessing traversal sources - * @param transactionTimeoutMs Timeout in milliseconds before auto-rollback + * @param idleTransactionTimeoutMs Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it + * @param maxTransactionLifetimeMs Absolute cap in milliseconds on total transaction age; {@code 0} disables it * @param maxConcurrentTransactions Maximum number of concurrent transactions allowed */ public TransactionManager(final ScheduledExecutorService scheduledExecutorService, final GraphManager graphManager, - final long transactionTimeoutMs, + final long idleTransactionTimeoutMs, + final long maxTransactionLifetimeMs, final int maxConcurrentTransactions, final long perGraphCloseMs) { this.scheduledExecutorService = scheduledExecutorService; this.graphManager = graphManager; - this.transactionTimeoutMs = transactionTimeoutMs; + this.idleTransactionTimeoutMs = idleTransactionTimeoutMs; + this.maxTransactionLifetimeMs = maxTransactionLifetimeMs; this.maxConcurrentTransactions = maxConcurrentTransactions; this.perGraphCloseMs = perGraphCloseMs; MetricManager.INSTANCE.getGauge(transactions::size, name(GremlinServer.class, "transactions")); - logger.info("TransactionManager initialized with timeout={}ms, maxTransactions={}", - transactionTimeoutMs, maxConcurrentTransactions); + logger.info("TransactionManager initialized with idleTransactionTimeout={}ms, maxTransactionLifetime={}ms, maxTransactions={}", + idleTransactionTimeoutMs, maxTransactionLifetimeMs, maxConcurrentTransactions); } /** @@ -105,6 +115,11 @@ public UnmanagedTransaction create(final String traversalSourceName) { */ void destroy(final String id) { transactions.remove(id); + // Cancel this transaction's lifetime cap (if any) so the one-shot cannot fire after the transaction is gone. + // Covers every close path uniformly (commit, rollback, idle reclaim, and the cap firing itself, where + // cancelling the already-running one-shot is a harmless no-op). + final ScheduledFuture lifetimeTimer = lifetimeTimers.remove(id); + if (lifetimeTimer != null) lifetimeTimer.cancel(false); } /** @@ -112,23 +127,35 @@ void destroy(final String id) { * {@link UnmanagedTransaction} is inserted into the transactions map. */ private UnmanagedTransaction createTransactionContext(final String traversalSourceName, final Graph graph) { - String txId; - UnmanagedTransaction ctx; + String transactionId; + UnmanagedTransaction transaction; do { - txId = UUID.randomUUID().toString(); - ctx = new UnmanagedTransaction( - txId, + transactionId = UUID.randomUUID().toString(); + transaction = new UnmanagedTransaction( + transactionId, this, traversalSourceName, graph, scheduledExecutorService, - transactionTimeoutMs, + idleTransactionTimeoutMs, perGraphCloseMs ); - } while (transactions.putIfAbsent(txId, ctx) != null); + } while (transactions.putIfAbsent(transactionId, transaction) != null); + + // Schedule the absolute lifetime cap only AFTER the transaction is registered above. The cap's teardown + // (onLifetimeCap -> close(false)) early-returns if the manager does not yet know about the transaction, so + // scheduling it before registration could let a pathologically small cap fire into nothing and leave an + // unreclaimable transaction holding a worker thread and slot. Scheduling after registration guarantees the cap + // can always tear the transaction down; destroy() cancels it on every close path. The clock effectively starts + // at construction (registration follows within microseconds), so it bounds total transaction age including begin. + if (maxTransactionLifetimeMs > 0) { + final UnmanagedTransaction registered = transaction; // effectively-final copy for the scheduled method ref + lifetimeTimers.put(transactionId, scheduledExecutorService.schedule( + registered::onLifetimeCap, maxTransactionLifetimeMs, TimeUnit.MILLISECONDS)); + } - return ctx; + return transaction; } /** @@ -174,6 +201,10 @@ public void shutdown() { }); transactions.clear(); + // Each close(false) above already cancelled its lifetime timer via destroy(); cancel any stragglers (e.g. a + // transaction whose close threw) so no scheduled cap outlives the manager. + lifetimeTimers.values().forEach(timer -> timer.cancel(false)); + lifetimeTimers.clear(); logger.info("TransactionManager shutdown complete"); } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java index d08c20a47e3..25bd36ee5cc 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransaction.java @@ -18,17 +18,19 @@ */ package org.apache.tinkerpop.gremlin.server.transaction; +import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.structure.Graph; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -42,6 +44,12 @@ * the complete request lifecycle (graph operation, error handling, response writing), * following the same pattern as the non-transactional HTTP path and the legacy * {@code SessionOpProcessor}. + *

+ * The single-threaded executor is a {@link SingleThreadTransactionExecutor} (a {@code ThreadPoolExecutor} with one + * core/max thread) rather than {@link java.util.concurrent.Executors#newSingleThreadExecutor}. It is behaviorally + * identical for task execution but exposes the {@code beforeExecute}/{@code afterExecute} lifecycle hooks and the task + * queue, which the idle-timer management relies on to tell "an operation is running" apart from "the worker is idle + * with an empty queue". The {@code Executors} factory hides those behind a sealed wrapper. */ public class UnmanagedTransaction { private static final Logger logger = LoggerFactory.getLogger(UnmanagedTransaction.class); @@ -51,16 +59,25 @@ public class UnmanagedTransaction { private final TransactionManager manager; private final Graph graph; private final ScheduledExecutorService scheduledExecutorService; - private final long timeout; + private final long idleTimeout; private final long perGraphClose; - private final AtomicReference> timeoutFuture = new AtomicReference<>(); + private final AtomicReference> idleFuture = new AtomicReference<>(); + /** + * The operation currently executing on the worker thread (or most recently submitted) paired with its request + * {@link Context}, held as a single immutable {@link Running} so the lifetime cap reads a consistent pair — it can + * never flag one operation's {@code Context} while interrupting another's {@link Future}. The future is the exact + * same object the per-request evaluation timeout cancels in the handler. Set in {@link #submit} and compare-and- + * cleared in {@link SingleThreadTransactionExecutor#afterExecute} so a fast next operation is not un-tracked by the + * previous one's completion. + */ + private final AtomicReference current = new AtomicReference<>(); // Controls whether the executor is still accepting tasks. private final AtomicBoolean accepting = new AtomicBoolean(true); /** * Single-threaded executor ensures all operations for this transaction run on * the same thread, preserving the ThreadLocal nature of Graph transactions. */ - private final ExecutorService executor; + private final SingleThreadTransactionExecutor executor; /** * Creates a new {@code UnmanagedTransaction} for managing an HTTP transaction. @@ -70,14 +87,14 @@ public class UnmanagedTransaction { * @param traversalSourceName The traversal source name bound at begin time * @param graph The graph instance for this transaction * @param scheduledExecutorService Scheduler for timeout management - * @param transactionTimeout Timeout in milliseconds before auto-rollback + * @param idleTransactionTimeout Inactivity timeout in milliseconds before auto-rollback; {@code 0} disables it */ public UnmanagedTransaction(final String transactionId, final TransactionManager transactionManager, final String traversalSourceName, final Graph graph, final ScheduledExecutorService scheduledExecutorService, - final long transactionTimeout, + final long idleTransactionTimeout, final long perGraphClose) { logger.debug("New transaction context established for {}", transactionId); this.transactionId = transactionId; @@ -85,12 +102,15 @@ public UnmanagedTransaction(final String transactionId, this.manager = transactionManager; this.graph = graph; this.scheduledExecutorService = scheduledExecutorService; - this.timeout = transactionTimeout; + this.idleTimeout = idleTransactionTimeout; this.perGraphClose = perGraphClose; - // Create single-threaded executor with named thread for debugging - this.executor = Executors.newSingleThreadExecutor( - r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length())))); + // Create single-threaded executor with named thread for debugging. A ThreadPoolExecutor(1,1) is used (rather + // than Executors.newSingleThreadExecutor) so the before/afterExecute hooks and the task queue are accessible + // for idle-timer management; see SingleThreadTransactionExecutor. + final ThreadFactory threadFactory = + r -> new Thread(r, "tx-" + transactionId.substring(0, Math.min(8, transactionId.length()))); + this.executor = new SingleThreadTransactionExecutor(threadFactory); } /** @@ -107,36 +127,6 @@ public String getTraversalSourceName() { return traversalSourceName; } - /** - * Resets the timeout for this transaction. Called on each request. - */ - public void touch() { - timeoutFuture.updateAndGet(future -> { - if (future != null) future.cancel(false); - return scheduledExecutorService.schedule(() -> { - logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, timeout); - close(false); - }, timeout, TimeUnit.MILLISECONDS); - }); - } - - /** - * Opens the underlying graph transaction and starts the inactivity timeout. - * Should be called on the transaction's single-threaded executor to preserve - * ThreadLocal affinity. On failure the exception is re-thrown and the caller - * is responsible for cleanup (e.g. via {@link #close(boolean)}). - */ - public void open() { - try { - graph.tx().begin(); - touch(); - logger.debug("Transaction {} opened", transactionId); - } catch (Exception e) { - logger.warn("Failed to begin transaction {}: {}", transactionId, e.getMessage()); - throw e; - } - } - /** * Closes this transaction and releases its resources. When {@code force} is {@code false}, * any open graph transaction is rolled back before shutdown. When {@code force} is {@code true}, @@ -183,7 +173,7 @@ public synchronized void close(boolean force) { // reorder these two statements. manager.destroy(transactionId); executor.shutdown(); - Optional.ofNullable(timeoutFuture.get()).ifPresent(f -> f.cancel(true)); + Optional.ofNullable(idleFuture.get()).ifPresent(f -> f.cancel(true)); logger.debug("Transaction {} closed", transactionId); } @@ -193,13 +183,155 @@ public synchronized void close(boolean force) { * error handling, and response writing. * * @param task The FutureTask to execute on the transaction thread + * @param context The request context driving this task, recorded so the lifetime cap can flag it before + * interrupting; may be {@code null} for internal operations (such as the begin's tx open) where no + * user-facing error needs to be tailored * @return Future that can be used for timeout cancellation * @throws IllegalStateException if the transaction is closed */ - public Future submit(final FutureTask task) { + public Future submit(final FutureTask task, final Context context) { if (!accepting.get()) throw new IllegalStateException("Transaction " + transactionId + " is closed"); - touch(); - return executor.submit(task); + // Insurance backstop: cancel (do NOT arm) the idle timer on submit. Arming is the executor's job, done in + // afterExecute once the worker parks with an empty queue. beforeExecute will also cancel when the task starts; + // cancelling here too closes the small window between accepting a task and the worker picking it up. + cancelIdleTimer(); + + // Track the running operation BEFORE dispatching it, so the lifetime cap can never miss a worker that starts + // running between dispatch and tracking. The submitted FutureTask is itself the Future we track and return: + // cancel(true) on it interrupts the real work (the same future the handler's evaluation timeout cancels), and + // afterExecute receives this same object to compare-and-clear. Pairing the future with its Context in one + // immutable Running means the cap always reads a consistent pair (never flags op1 while interrupting op2). + current.set(new Running(task, context)); + executor.execute(task); + return task; + } + + /** + * Suspends the inactivity timer because an operation is running (or about to run) on the transaction thread. + * Invoked from {@link SingleThreadTransactionExecutor#beforeExecute} and, as a backstop, from {@link #submit}. + *

+ * A long-running operation must not trip the idle timeout: while an operation is in progress the idle timer is + * simply not armed (the operation's own duration is bounded by the per-request {@code evaluationTimeout} instead). + */ + private void cancelIdleTimer() { + idleFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return null; + }); + } + + /** + * (Re)arms the inactivity timer, but only when the transaction is genuinely idle. Invoked from + * {@link SingleThreadTransactionExecutor#afterExecute} once an operation has finished and the worker is about to + * look for more work. + *

+ * "Idle" means: still {@link #accepting} new work (not closing), the executor queue is empty (no sibling request is + * already waiting — on a single thread there is a brief instant between one task finishing and the next starting), + * and the idle timeout is enabled ({@code idleTimeout > 0}; {@code 0} disables idle reclamation entirely). When all + * hold, a fresh {@code close(false)} is scheduled {@code idleTimeout} ms out, replacing any previously scheduled one. + */ + private void maybeScheduleIdleTimer() { + if (!accepting.get()) return; // closing/closed: never re-arm a dying transaction + if (idleTimeout <= 0) return; // 0 (or negative) disables idle reclamation + if (!executor.getQueue().isEmpty()) return; // a sibling task is already queued -> not idle yet + + idleFuture.updateAndGet(future -> { + if (future != null) future.cancel(false); + return scheduledExecutorService.schedule(() -> { + logger.info("Transaction {} timed out after {} ms of inactivity", transactionId, idleTimeout); + close(false); + }, idleTimeout, TimeUnit.MILLISECONDS); + }); + + // The accepting check above and the arm below are not atomic: a concurrent close() could have flipped + // accepting=false and cancelled idleFuture in between, leaving the timer we just armed orphaned (it would fire + // ~idleTimeout later and call close() on an already-gone transaction). Re-check after arming and cancel if so, + // so the "never re-arm a dying transaction" invariant actually holds. + if (!accepting.get()) cancelIdleTimer(); + } + + /** + * Forcibly tears the transaction down because it has reached its absolute lifetime cap. Invoked by the + * {@link TransactionManager}'s lifetime timer (the manager owns scheduling and cancelling that timer; this method is + * the behavior it triggers). Unlike the idle timer, the cap fires regardless of activity, so it may interrupt an + * operation that is still running. + *

+ * It flags the running operation's {@link Context} before interrupting it so that, as the interrupt unwinds + * the operation on the worker thread, the error it reports is an accurate transaction-timeout (504) rather than the + * generic evaluation timeout. It then interrupts only the currently-running operation via + * {@link Future#cancel(boolean) cancel(true)} (any siblings already queued behind it continue to fail fast with a + * 404 via the destroy-before-shutdown guard in {@link #close(boolean)}), and finally runs {@code close(false)} to + * roll back and tear the transaction down. Logged at {@code warn} because this is a forced teardown of active work. + */ + void onLifetimeCap() { + // Read the running (future, context) pair once, as a unit, so the Context we flag always belongs to the same + // operation whose future we interrupt. + final Running running = current.get(); + if (running != null) { + if (running.context != null) running.context.setClosedByLifetimeCap(true); // flag BEFORE interrupting + running.future.cancel(true); // interrupt only the running op + } + + logger.warn("Transaction {} exceeded its maximum lifetime and is being closed", transactionId); + close(false); + } + + /** + * Compare-and-clears the tracked running operation once it completes. Only clears when the completed future is still + * the one tracked, so a fast next operation submitted between this one finishing and this clearing is not lost. + */ + private void clearCurrentExecution(final Future completed) { + current.updateAndGet(running -> (running != null && running.future == completed) ? null : running); + } + + /** + * An in-flight operation paired with the request {@link Context} that drove it, tracked as one immutable unit so the + * lifetime cap reads a consistent pair. {@code context} may be {@code null} for internal operations (e.g. begin's tx + * open) that need no tailored client error. + */ + private static final class Running { + private final Future future; + private final Context context; + + private Running(final Future future, final Context context) { + this.future = future; + this.context = context; + } + } + + /** + * A single-threaded {@link ThreadPoolExecutor} (one core and max thread) that runs all operations for a single + * transaction on the same worker thread, preserving the ThreadLocal nature of graph transactions. + *

+ * It is used in place of {@link java.util.concurrent.Executors#newSingleThreadExecutor} solely to expose the + * {@link #beforeExecute}/{@link #afterExecute} lifecycle hooks (and, via {@link #getQueue()}, the pending-task + * queue), which the enclosing {@link UnmanagedTransaction} needs to distinguish "an operation is running" from + * "the worker is idle with nothing queued". Task-execution semantics are otherwise identical to a single-thread + * executor: one worker, FIFO ordering. Submitted {@link FutureTask}s are returned unwrapped so callers can + * {@code cancel(true)} the real work (e.g. the per-request evaluation timeout interrupting a running operation). + */ + private final class SingleThreadTransactionExecutor extends ThreadPoolExecutor { + private SingleThreadTransactionExecutor(final ThreadFactory threadFactory) { + super(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), threadFactory); + } + + @Override + protected void beforeExecute(final Thread t, final Runnable r) { + super.beforeExecute(t, r); + cancelIdleTimer(); + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + super.afterExecute(r, t); + // For operations submitted via submit(), r is the FutureTask that submit() executed and tracked in + // `current`, so compare-and-clear by identity un-tracks the operation that just finished without disturbing + // a faster sibling that may already have replaced it. Other tasks that complete here (e.g. close()'s + // rollback, which ThreadPoolExecutor also wraps in a Future) simply will not match the tracked future, so + // the compare-and-clear is a safe no-op for them. + if (r instanceof Future) clearCurrentExecution((Future) r); + maybeScheduleIdleTimer(); + } } } diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java index 3995337566a..b8ff46ab59b 100644 --- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java +++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/util/ServerGremlinExecutor.java @@ -306,7 +306,8 @@ public ServerGremlinExecutor(final Settings settings, final ExecutorService grem transactionManager = new TransactionManager( scheduledExecutorService, graphManager, - settings.transactionTimeout, + settings.idleTransactionTimeout, + settings.maxTransactionLifetime, settings.maxConcurrentTransactions, settings.perGraphCloseTimeout ); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java index 855acdde2c6..bbdd9bb69a7 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinDriverTransactionIntegrateTest.java @@ -86,10 +86,10 @@ public Settings overrideSettings(final Settings settings) { case "shouldTimeoutIdleTransaction": case "shouldTimeoutIdleTransactionWithNoOperations": case "shouldRejectLateCommitAfterTimeout": - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldTimeoutOnlyIdleTransactionNotActiveOne": - settings.transactionTimeout = 2000; + settings.idleTransactionTimeout = 2000; break; } return settings; diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java index 95895e66b46..4d1ec785c3d 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/GremlinServerHttpTransactionIntegrateTest.java @@ -93,17 +93,32 @@ public Settings overrideSettings(final Settings settings) { break; case "shouldTimeoutFreeSlotUnderMaxConcurrentTransactions": settings.maxConcurrentTransactions = 1; - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldTimeoutIdleTransactionWithNoOperations": - settings.transactionTimeout = 500; + settings.idleTransactionTimeout = 500; break; case "shouldTimeoutAndRejectLateCommit": case "shouldTrackTransactionCountAccurately": - settings.transactionTimeout = 1000; + settings.idleTransactionTimeout = 1000; break; case "shouldRollbackAbandonedTransaction": - settings.transactionTimeout = 300; + settings.idleTransactionTimeout = 300; + break; + case "shouldNotIdleTimeoutLongRunningOperation": + // Short idle timeout, but a single long operation must NOT trip it (idle suspended while busy). + settings.idleTransactionTimeout = 500; + break; + case "shouldReclaimTransactionExceedingMaxLifetime": + // Short absolute cap with the idle timer disabled, so only the lifetime cap can reclaim the transaction. + settings.idleTransactionTimeout = 0; + settings.maxTransactionLifetime = 800; + break; + case "shouldHonorPerRequestTimeoutMsZeroInTransaction": + // Both transaction timers disabled: a per-request timeoutMs of 0 is honored (not silently overridden), + // so the operation is bounded only by its own request, exactly as on the non-transactional path. + settings.idleTransactionTimeout = 0; + settings.maxTransactionLifetime = 0; break; case "shouldRejectMismatchedGraphAliasInTransaction": { final Settings.GraphSettings gs = new Settings.GraphSettings(); @@ -528,6 +543,98 @@ public void shouldRollbackAbandonedTransaction() throws Exception { } } + @Test + public void shouldNotIdleTimeoutLongRunningOperation() throws Exception { + // With a short idle timeout (500ms), a single operation that runs LONGER than the idle timeout must still + // succeed -- the idle timer is suspended while an operation is in progress, so a long-running op is not + // reclaimed mid-execution (it is instead bounded by evaluationTimeout, left at its default here). + final String txId = beginTx(client, GTX); + + // Seed two vertices and an edge so repeat(both()) has something to traverse and keeps the executor busy. Each + // response body is fully consumed before the next request is sent: the chunked stream is only complete once the + // server has finished processing, so consuming guarantees these requests are strictly ordered. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 1)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV().property(T.id, 2)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V(1).addE('self').to(__.V(2))", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // A traversal that runs well past the 500ms idle timeout. Under the old arm-on-arrival behavior the idle timer + // would have fired mid-execution; under suspend-while-busy it does not. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().repeat(both()).times(2000)", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // The transaction is still alive and usable after the long op (it was not reclaimed mid-flight). + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + assertEquals(2, extractCount(r)); // extractCount fully reads the body + } + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + } + + @Test + public void shouldReclaimTransactionExceedingMaxLifetime() throws Exception { + // With a short absolute cap (800ms) and the idle timer disabled, a transaction that simply stays open past the + // cap must be reclaimed: the lifetime cap rolls it back and removes it, so a later request gets a 404. The + // deterministic mid-operation interrupt and the 504 it yields are covered by the unit tests; here we assert the + // guarantee we actually make end-to-end -- the transaction (and its slot) is reclaimed on time. + final String txId = beginTx(client, GTX); + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.addV()", GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + + // wait for the absolute cap to fire + Thread.sleep(1500); + + // The transaction is gone: a subsequent request on it is rejected as not found. + try (final CloseableHttpResponse r = submitInTx(client, txId, "g.V().count()", GTX)) { + assertEquals(404, r.getStatusLine().getStatusCode()); + assertTrue(extractStatusMessage(r).contains("Transaction not found")); + } + + // and the addV was rolled back, not persisted. + try (final CloseableHttpResponse r = submitNonTx(client, "g.V().count()", GTX)) { + assertEquals(0, extractCount(r)); + } + } + + @Test + public void shouldHonorPerRequestTimeoutMsZeroInTransaction() throws Exception { + // With both transaction timers disabled, a per-request timeoutMs of 0 is honored rather than overridden: the + // server does not reject the begin or silently substitute another timeout - the operation runs as requested. + // (Server-side defaults keep transactions bounded out of the box; disabling them is a deliberate operator + // choice and must not turn into a client-facing failure.) + final String txId = beginTx(client, GTX); + + try (final CloseableHttpResponse r = postJson(client, + "{\"gremlin\":\"g.addV()\",\"g\":\"" + GTX + + "\",\"transactionId\":\"" + txId + "\",\"timeoutMs\":\"0\"}")) { + assertEquals(200, r.getStatusLine().getStatusCode()); + // The operation runs and returns a normal result body, with no timeout error: the request was honored, not + // bounded by a substituted timeout or rejected at begin. + final String body = EntityUtils.toString(r.getEntity()); + assertFalse(body.toLowerCase().contains("timeout")); + } + + try (final CloseableHttpResponse r = commitTx(client, txId, GTX)) { + assertEquals(200, r.getStatusLine().getStatusCode()); + EntityUtils.consume(r.getEntity()); + } + } + @Test public void shouldRejectMismatchedGraphAliasInTransaction() throws Exception { final String txId = beginTx(client, GTX); diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java index 28b2cf50fda..98365f7c448 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/SettingsTest.java @@ -144,4 +144,23 @@ public void lifecycleHooksDefaultsToEmptyListWhenAbsentFromYaml() throws Excepti assertThat(settings.lifecycleHooks, is(notNullValue())); assertThat(settings.lifecycleHooks.isEmpty(), is(true)); } + + @Test + public void transactionTimeoutsDefaultToReasonableValuesWhenAbsentFromYaml() throws Exception { + final Settings settings = Settings.read(getMinimalConfigStream()); + + // Out of the box a transaction is bounded without any operator configuration: idle reclamation at 1 minute and + // an absolute lifetime cap at 10 minutes. + assertEquals(60000L, settings.idleTransactionTimeout); + assertEquals(600000L, settings.maxTransactionLifetime); + } + + @Test + public void maxTransactionLifetimeParsedFromYaml() throws Exception { + final InputStream stream = SettingsTest.class.getResourceAsStream("gremlin-server-integration.yaml"); + final Settings settings = Settings.read(stream); + + // Confirms a YAML-provided value overrides the code default (600000); the resource sets it to 480000. + assertEquals(480000L, settings.maxTransactionLifetime); + } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java index 3058289f7bb..04db1163621 100644 --- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/handler/HttpGremlinEndpointHandlerTest.java @@ -21,13 +21,20 @@ import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.ReferenceCountUtil; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException; +import org.apache.tinkerpop.gremlin.server.Context; +import org.apache.tinkerpop.gremlin.server.util.GremlinError; +import org.apache.tinkerpop.gremlin.util.message.RequestMessage; import org.junit.Test; import static io.netty.util.CharsetUtil.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; /** * Unit tests for {@link HttpGremlinEndpointHandler#exceptionCaught}. When no {@link HttpResponseCoordinator} has been @@ -59,4 +66,35 @@ public void exceptionCaughtFallsBackToSendErrorWhenNoCoordinator() { ReferenceCountUtil.release(response); channel.finishAndReleaseAll(); } + + @Test + public void shouldMapInterruptToTransactionTimeoutWhenClosedByLifetimeCap() { + // When the lifetime cap interrupts an operation, it first flags the request Context, so the interrupt that + // unwinds the operation must be reported as a transaction timeout (504), not the generic evaluation timeout. + final RequestMessage message = RequestMessage.build("g.V()").create(); + final Context ctx = mock(Context.class); + when(ctx.isClosedByLifetimeCap()).thenReturn(true); + when(ctx.getTransactionId()).thenReturn("tx-1234"); + + final GremlinError error = newHandler().formErrorResponseMessage( + new TraversalInterruptedException(), message, ctx); + + assertEquals(HttpResponseStatus.GATEWAY_TIMEOUT, error.getCode()); + assertEquals("TransactionException", error.getException()); + assertTrue(error.getMessage().contains("tx-1234")); + } + + @Test + public void shouldMapInterruptToEvaluationTimeoutWhenNotClosedByLifetimeCap() { + // An ordinary evaluation-timeout interrupt (cap flag unset) must keep the existing 500 timeout behavior. + final RequestMessage message = RequestMessage.build("g.V()").create(); + final Context ctx = mock(Context.class); + when(ctx.isClosedByLifetimeCap()).thenReturn(false); + + final GremlinError error = newHandler().formErrorResponseMessage( + new InterruptedException(), message, ctx); + + assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, error.getCode()); + assertEquals("ServerTimeoutExceededException", error.getException()); + } } diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManagerTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManagerTest.java new file mode 100644 index 00000000000..390bdf17a11 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/TransactionManagerTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.transaction; + +import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource; +import org.apache.tinkerpop.gremlin.server.GraphManager; +import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link TransactionManager}'s ownership of the absolute lifetime cap: the manager schedules the cap + * timer after a transaction is registered and cancels it when the transaction is destroyed. The cap's behavior + * when it fires (interrupt + flag + close) is covered in {@code UnmanagedTransactionTest}; here a deterministic + * {@link ManualScheduledExecutorService} drives the scheduling/cancellation without wall-clock waits. + */ +public class TransactionManagerTest { + + private static final String SOURCE = "g"; + private static final long IDLE_DISABLED = 0L; + private static final long PER_GRAPH_CLOSE_MS = 10000L; + private static final long CAP_MS = 5000L; + private static final int MAX_CONCURRENT = 1000; + + private ManualScheduledExecutorService scheduler; + private GraphManager graphManager; + + @Before + public void setUp() { + scheduler = new ManualScheduledExecutorService(); + + // A traversal source whose graph supports transactions, so create() proceeds to build a transaction. + final Graph graph = mock(Graph.class, RETURNS_DEEP_STUBS); + when(graph.features().graph().supportsTransactions()).thenReturn(true); + final Transaction graphTx = mock(Transaction.class); + when(graph.tx()).thenReturn(graphTx); + when(graphTx.isOpen()).thenReturn(false); // rollback during close(false) is a no-op + + final TraversalSource ts = mock(TraversalSource.class); + when(ts.getGraph()).thenReturn(graph); + + graphManager = mock(GraphManager.class); + when(graphManager.getTraversalSource(SOURCE)).thenReturn(ts); + } + + private TransactionManager newManager(final long maxLifetimeMs) { + return new TransactionManager(scheduler, graphManager, IDLE_DISABLED, maxLifetimeMs, MAX_CONCURRENT, PER_GRAPH_CLOSE_MS); + } + + @Test + public void shouldNotScheduleLifetimeCapWhenDisabled() { + final TransactionManager manager = newManager(0L); // cap disabled + manager.create(SOURCE); + + // No lifetime timer is scheduled when the cap is disabled (idle is also disabled here, so nothing is scheduled). + assertEquals(0, scheduler.getScheduledTaskCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldScheduleLifetimeCapAfterRegistrationWhenEnabled() { + final TransactionManager manager = newManager(CAP_MS); + manager.create(SOURCE); + + // The cap is scheduled exactly once, for the configured delay, as soon as the transaction is created. + assertEquals(1, scheduler.getScheduledTaskCount()); + assertEquals(1, scheduler.getPendingTaskCount()); + assertEquals(CAP_MS, scheduler.nextPendingDelayMillis()); + } + + @Test + public void shouldReclaimTransactionWhenLifetimeCapFires() { + final TransactionManager manager = newManager(CAP_MS); + final UnmanagedTransaction tx = manager.create(SOURCE); + assertEquals(1, manager.getActiveTransactionCount()); + + scheduler.advanceTimeBy(CAP_MS, TimeUnit.MILLISECONDS); + + // The cap fired and tore the transaction down: it is no longer tracked and its timer is gone. + assertEquals(0, manager.getActiveTransactionCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + assertFalse(manager.get(tx.getTransactionId()).isPresent()); + } + + @Test + public void shouldCancelLifetimeCapWhenTransactionClosed() { + final TransactionManager manager = newManager(CAP_MS); + final UnmanagedTransaction tx = manager.create(SOURCE); + assertEquals(1, scheduler.getPendingTaskCount()); + + tx.close(true); // explicit close -> destroy() must cancel the pending cap + + assertEquals(0, scheduler.getPendingTaskCount()); + // Advancing past the cap must not resurrect a close on a transaction that is already gone. + scheduler.advanceTimeBy(CAP_MS * 2, TimeUnit.MILLISECONDS); + assertEquals(0, manager.getActiveTransactionCount()); + } + + @Test + public void shouldScheduleAnIndependentLifetimeCapPerTransaction() { + final TransactionManager manager = newManager(CAP_MS); + manager.create(SOURCE); + manager.create(SOURCE); + + // Each transaction gets its own one-shot cap timer. + assertEquals(2, scheduler.getScheduledTaskCount()); + assertEquals(2, scheduler.getPendingTaskCount()); + assertEquals(2, manager.getActiveTransactionCount()); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java new file mode 100644 index 00000000000..a5031d81cf5 --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/transaction/UnmanagedTransactionTest.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.transaction; + +import org.apache.tinkerpop.gremlin.server.Context; +import org.apache.tinkerpop.gremlin.server.util.ManualScheduledExecutorService; +import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.Transaction; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Unit tests for {@link UnmanagedTransaction}, driven by a deterministic {@link ManualScheduledExecutorService} so the + * inactivity-timeout behaviour can be asserted without real wall-clock waits. + *

+ * These are specification tests for the reworked idle timer (suspend-while-busy): the idle timer is armed only + * when the transaction goes idle (no operation running, empty queue) and is suspended while an operation runs. The idle + * timer is (re)armed from the executor's {@code afterExecute} hook, which runs on the transaction worker thread, so + * timer assertions poll the scheduler with a bounded wait via {@link #awaitPendingTimer(boolean)}. + */ +public class UnmanagedTransactionTest { + + private static final String TX_ID = "test-tx-0001"; + private static final long TIMEOUT_MS = 600000L; + private static final long PER_GRAPH_CLOSE_MS = 10000L; + private static final long AWAIT_MS = 5000L; + + private TransactionManager manager; + private Graph graph; + private ManualScheduledExecutorService scheduler; + private UnmanagedTransaction tx; + + @Before + public void setUp() { + manager = mock(TransactionManager.class); + graph = mock(Graph.class); + final Transaction graphTx = mock(Transaction.class); + when(graph.tx()).thenReturn(graphTx); + when(graphTx.isOpen()).thenReturn(false); // rollback path is a no-op during close(false) + + scheduler = new ManualScheduledExecutorService(); + tx = new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, TIMEOUT_MS, PER_GRAPH_CLOSE_MS); + + // close() short-circuits unless the manager still knows about the transaction. + when(manager.get(TX_ID)).thenReturn(Optional.of(tx)); + } + + /** + * Submits a no-op task and blocks until it has finished running on the worker thread. + */ + private void runOp() throws Exception { + tx.submit(new FutureTask<>(() -> null), null).get(AWAIT_MS, TimeUnit.MILLISECONDS); + } + + /** + * Waits (bounded) for the idle timer to reach the expected armed/not-armed state, since it is (re)armed on the + * worker thread from afterExecute slightly after the submitted task's Future completes. Returns once the condition + * holds or the wait elapses; the caller asserts on the final state. + */ + private void awaitPendingTimer(final boolean expectArmed) throws InterruptedException { + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (System.nanoTime() < deadline) { + if ((scheduler.getPendingTaskCount() == 1) == expectArmed) return; + Thread.sleep(5); + } + } + + @Test + public void shouldNotScheduleAnyCloseAtConstruction() { + assertEquals(0, scheduler.getScheduledTaskCount()); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldArmIdleTimerWhenWorkerGoesIdleAfterAnOperation() throws Exception { + runOp(); + + awaitPendingTimer(true); + assertEquals("idle timer should be armed once the worker parks with an empty queue", + 1, scheduler.getPendingTaskCount()); + assertEquals(TIMEOUT_MS, scheduler.nextPendingDelayMillis()); + } + + @Test + public void shouldNotArmIdleTimerWhileAnOperationIsRunning() throws Exception { + // Hold an operation "running" and assert no idle timer is armed during that window. + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + final Future running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + release.await(); + return null; + }), null); + + assertTrue(started.await(AWAIT_MS, MILLISECONDS)); + // While the op runs, the idle timer must not be armed (a long op must not trip the idle timeout). + assertEquals(0, scheduler.getPendingTaskCount()); + + release.countDown(); + running.get(AWAIT_MS, MILLISECONDS); + + // Once the worker goes idle, the timer arms. + awaitPendingTimer(true); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotFireIdleCloseForALongRunningOperation() throws Exception { + // A single operation that runs longer than the idle timeout must not be reclaimed mid-execution: no timer is + // armed while it runs, so advancing the clock far past the timeout fires nothing. + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch release = new CountDownLatch(1); + final Future running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + release.await(); + return null; + }), null); + assertTrue(started.await(AWAIT_MS, MILLISECONDS)); + + scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); + + verify(manager, never()).destroy(TX_ID); + release.countDown(); + running.get(AWAIT_MS, MILLISECONDS); + } + + @Test + public void shouldCloseTransactionWhenIdleTimeoutFires() throws Exception { + runOp(); + awaitPendingTimer(true); + + scheduler.advanceTimeBy(TIMEOUT_MS, MILLISECONDS); + + // The scheduled close(false) removes the transaction from the manager. + verify(manager).destroy(TX_ID); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotCloseBeforeIdleTimeoutElapses() throws Exception { + runOp(); + awaitPendingTimer(true); + + scheduler.advanceTimeBy(TIMEOUT_MS - 1, MILLISECONDS); + + verify(manager, never()).destroy(TX_ID); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldReArmIdleTimerAfterEachOperation() throws Exception { + runOp(); + awaitPendingTimer(true); + assertEquals(1, scheduler.getScheduledTaskCount()); + + runOp(); + awaitPendingTimer(true); + + // A second operation cancels the prior idle timer and arms a fresh one. + assertEquals(2, scheduler.getScheduledTaskCount()); + assertEquals(1, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotArmIdleTimerWhenIdleTimeoutDisabled() throws Exception { + // idleTransactionTimeout == 0 disables idle reclamation entirely: the timer is never armed. + final UnmanagedTransaction disabledTx = + new UnmanagedTransaction(TX_ID, manager, "g", graph, scheduler, 0L, PER_GRAPH_CLOSE_MS); + + disabledTx.submit(new FutureTask<>(() -> null), null).get(AWAIT_MS, TimeUnit.MILLISECONDS); + + awaitPendingTimer(false); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldCancelScheduledCloseOnExplicitClose() throws Exception { + runOp(); + awaitPendingTimer(true); + + tx.close(true); + + verify(manager).destroy(TX_ID); + // The pending inactivity close must be cancelled so it cannot fire after the transaction is gone. + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldNotReArmIdleTimerAfterClose() throws Exception { + runOp(); + awaitPendingTimer(true); + + tx.close(false); + + verify(manager).destroy(TX_ID); + // Advancing the clock must not resurrect a close on a transaction that is already gone. + scheduler.advanceTimeBy(TIMEOUT_MS * 2, MILLISECONDS); + assertEquals(0, scheduler.getPendingTaskCount()); + } + + @Test + public void shouldRunSubmittedTasksOnASingleNamedTransactionThreadInOrder() throws Exception { + final List executionOrder = new CopyOnWriteArrayList<>(); + final List threadNames = new CopyOnWriteArrayList<>(); + + Future last = null; + for (int i = 0; i < 5; i++) { + final int n = i; + last = tx.submit(new FutureTask<>(() -> { + threadNames.add(Thread.currentThread().getName()); + executionOrder.add("task-" + n); + return null; + }), null); + } + last.get(5, TimeUnit.SECONDS); // FIFO single thread: the last task completing means all ran + + assertEquals(List.of("task-0", "task-1", "task-2", "task-3", "task-4"), executionOrder); + // All ran on one thread, and that thread is the named transaction worker. + assertEquals(1, threadNames.stream().distinct().count()); + assertTrue("expected tx-* thread but was " + threadNames.get(0), + threadNames.get(0).startsWith("tx-")); + } + + @Test + public void shouldInterruptRunningTaskWhenReturnedFutureIsCancelled() throws Exception { + // Guards the "do NOT wrap submitted tasks" invariant: cancel(true) on the Future returned by submit() must + // interrupt the real work, exactly as the per-request evaluation timeout relies on in the handler. + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + final AtomicReference unexpected = new AtomicReference<>(); + + final Future running = tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); // block until interrupted by cancel(true) + } catch (InterruptedException e) { + interrupted.set(true); + throw e; + } catch (Throwable t) { + unexpected.set(t); + } + return null; + }), null); + + assertTrue("task did not start", started.await(5, TimeUnit.SECONDS)); + running.cancel(true); + + // Give the worker a moment to observe the interrupt and record it. + final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("cancel(true) did not interrupt the running task", interrupted.get()); + assertEquals(null, unexpected.get()); + } + + @Test + public void shouldCloseTransactionWhenLifetimeCapFiresWhileIdle() { + // The lifetime timer itself is scheduled/cancelled by the TransactionManager (see TransactionManagerTest); this + // and the other onLifetimeCap() tests cover what the cap does when it fires, by invoking it directly as the + // manager's timer would. Here: the cap tears the transaction down even when nothing is running. + tx.onLifetimeCap(); + + verify(manager).destroy(TX_ID); + } + + @Test + public void shouldInterruptRunningOperationAndFlagContextWhenLifetimeCapFires() throws Exception { + final Context ctx = mock(Context.class); + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); // block until the cap interrupts it + } catch (InterruptedException e) { + interrupted.set(true); + } + return null; + }), ctx); + assertTrue("operation did not start", started.await(AWAIT_MS, MILLISECONDS)); + + tx.onLifetimeCap(); + + // The cap flagged the running request's Context (before interrupting) so the unwinding op reports a 504, + // interrupted the running operation, and tore the transaction down. + verify(ctx).setClosedByLifetimeCap(true); + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("lifetime cap did not interrupt the running operation", interrupted.get()); + verify(manager).destroy(TX_ID); + } + + @Test + public void shouldFlagAndInterruptTheSameOperationWhenLifetimeCapFires() throws Exception { + // Guards against a mismatched (future, context) pair: the cap must flag the Context of the very operation whose + // future it interrupts. op1 completes (clearing its tracking), then op2 runs; when the cap fires it must flag + // op2's Context and never op1's, and interrupt op2. + final Context ctx1 = mock(Context.class); + final Context ctx2 = mock(Context.class); + + tx.submit(new FutureTask<>(() -> null), ctx1).get(AWAIT_MS, MILLISECONDS); // op1 runs to completion + + final CountDownLatch started = new CountDownLatch(1); + final AtomicBoolean interrupted = new AtomicBoolean(false); + tx.submit(new FutureTask<>(() -> { + started.countDown(); + try { + Thread.sleep(30000); + } catch (InterruptedException e) { + interrupted.set(true); + } + return null; + }), ctx2); // op2 is the running op when the cap fires + assertTrue("op2 did not start", started.await(AWAIT_MS, MILLISECONDS)); + + tx.onLifetimeCap(); + + verify(ctx2).setClosedByLifetimeCap(true); + verify(ctx1, never()).setClosedByLifetimeCap(true); + final long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(AWAIT_MS); + while (!interrupted.get() && System.nanoTime() < deadline) { + Thread.sleep(10); + } + assertTrue("the running op (op2) was not interrupted", interrupted.get()); + } + + @Test + public void shouldClearTrackedExecutionAfterOperationCompletesSoLaterCapFlagsNothing() throws Exception { + // Compare-and-clear guard: once an operation completes its tracking is cleared, so a cap firing while the + // transaction is idle finds no running op to flag/interrupt. The tracking is cleared on the worker thread in + // afterExecute, which races a bare get() on the completed future; the executor is FIFO with one worker, so a + // second submitted-and-awaited op guarantees the first op's afterExecute (and thus its clear) has already run. + final Context ctx = mock(Context.class); + tx.submit(new FutureTask<>(() -> null), ctx); // op1 tracks ctx + tx.submit(new FutureTask<>(() -> null), null).get(AWAIT_MS, MILLISECONDS); // op2 awaited -> op1 cleared + + // The cap still closes the transaction, but op1's Context was cleared (and op2 carried none), so nothing is + // flagged: a quiet, completed transaction reaching its cap reports no in-flight cap-kill. + tx.onLifetimeCap(); + verify(ctx, never()).setClosedByLifetimeCap(true); + verify(manager).destroy(TX_ID); + } +} diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java new file mode 100644 index 00000000000..21d93a2e08b --- /dev/null +++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/util/ManualScheduledExecutorService.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.server.util; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A deterministic, single-threaded test double for {@link ScheduledExecutorService} backed by a virtual clock. + *

+ * It exists so timer-driven behavior (such as the {@code gremlin-server} transaction idle / lifetime timeouts) can be + * exercised without {@link Thread#sleep(long)} and without real wall-clock waits, which are slow and flaky. Time only + * advances when the test calls {@link #advanceTimeBy(long, TimeUnit)} (or {@link #runDueTasks()}), at which point any + * scheduled task whose trigger time has been reached is run synchronously on the calling thread, in trigger-time order. + *

+ * Only the one-shot {@link #schedule(Runnable, long, TimeUnit)} overload is implemented, because that is all the + * transaction code under test uses. Every other {@link ScheduledExecutorService} method throws + * {@link UnsupportedOperationException} with an explanatory message so that an unsupported use is loud rather than + * silently wrong. + *

+ * It is thread-safe: the transaction idle timer is armed/cancelled on the transaction's worker thread (via the + * executor's before/afterExecute hooks) while a test thread advances the clock and reads counts. All shared state is + * guarded by {@code lock}. Fired task commands are run outside the lock — a fired idle close calls + * {@code close(false)}, which submits a rollback to the transaction executor and blocks on it; running it under the + * lock could deadlock against the worker thread re-entering {@link #schedule}. + */ +public class ManualScheduledExecutorService implements ScheduledExecutorService { + + private final Object lock = new Object(); + private final List tasks = new ArrayList<>(); + private long nowMillis = 0L; + private int scheduledCount = 0; + + /** + * Schedules a one-shot task to run when the virtual clock advances by at least {@code delay}. Returns a + * {@link ScheduledFuture} whose {@link Future#cancel(boolean)} prevents the task from running on a later advance. + */ + @Override + public ScheduledFuture schedule(final Runnable command, final long delay, final TimeUnit unit) { + synchronized (lock) { + final ScheduledTask task = new ScheduledTask(command, nowMillis + unit.toMillis(delay)); + tasks.add(task); + scheduledCount++; + return task; + } + } + + /** + * Advances the virtual clock by the given amount and runs every task that is now due (trigger time {@code <=} the + * new current time) and not cancelled, in ascending trigger-time order. + */ + public void advanceTimeBy(final long amount, final TimeUnit unit) { + synchronized (lock) { + nowMillis += unit.toMillis(amount); + } + runDueTasks(); + } + + /** + * Runs every currently-due, non-cancelled task without advancing the clock. Useful for firing a zero-delay task. + * Each due task is selected under the lock but executed outside it (see class Javadoc). + */ + public void runDueTasks() { + while (true) { + final ScheduledTask next; + synchronized (lock) { + ScheduledTask soonest = null; + // Loop because a fired task may schedule another task that is itself immediately due. + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done && t.triggerAtMillis <= nowMillis) { + if (soonest == null || t.triggerAtMillis < soonest.triggerAtMillis) soonest = t; + } + } + if (soonest == null) return; + soonest.done = true; // mark done under lock so it is not re-selected + next = soonest; + } + next.command.run(); // run OUTSIDE the lock + } + } + + /** + * The number of tasks that are still scheduled to run (not cancelled, not yet fired). + */ + public int getPendingTaskCount() { + synchronized (lock) { + int count = 0; + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done) count++; + } + return count; + } + } + + /** + * The total number of tasks ever scheduled, including ones later cancelled or already fired. Lets a test assert + * that a reschedule actually issued a fresh {@code schedule(...)} call. + */ + public int getScheduledTaskCount() { + synchronized (lock) { + return scheduledCount; + } + } + + /** + * The remaining delay (ms) until the soonest still-pending task fires, or {@code -1} if none is pending. + */ + public long nextPendingDelayMillis() { + synchronized (lock) { + long soonest = Long.MAX_VALUE; + for (final ScheduledTask t : tasks) { + if (!t.cancelled && !t.done) soonest = Math.min(soonest, t.triggerAtMillis - nowMillis); + } + return soonest == Long.MAX_VALUE ? -1L : soonest; + } + } + + private final class ScheduledTask implements ScheduledFuture { + private final Runnable command; + private final long triggerAtMillis; + private volatile boolean cancelled = false; + private volatile boolean done = false; + + private ScheduledTask(final Runnable command, final long triggerAtMillis) { + this.command = command; + this.triggerAtMillis = triggerAtMillis; + } + + @Override + public long getDelay(final TimeUnit unit) { + synchronized (lock) { + return unit.convert(triggerAtMillis - nowMillis, TimeUnit.MILLISECONDS); + } + } + + @Override + public int compareTo(final Delayed o) { + return Long.compare(getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(final boolean mayInterruptIfRunning) { + if (done || cancelled) return false; + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return done || cancelled; + } + + @Override + public Object get() { + return null; + } + + @Override + public Object get(final long timeout, final TimeUnit unit) { + return null; + } + } + + // ---- Unsupported ScheduledExecutorService surface: fail loudly rather than behave unexpectedly. ---- + + private static UnsupportedOperationException unsupported(final String method) { + return new UnsupportedOperationException( + ManualScheduledExecutorService.class.getSimpleName() + " does not support " + method + + "; only schedule(Runnable, long, TimeUnit) is implemented for tests."); + } + + @Override + public ScheduledFuture schedule(final Callable callable, final long delay, final TimeUnit unit) { + throw unsupported("schedule(Callable, long, TimeUnit)"); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(final Runnable command, final long initialDelay, final long period, final TimeUnit unit) { + throw unsupported("scheduleAtFixedRate"); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(final Runnable command, final long initialDelay, final long delay, final TimeUnit unit) { + throw unsupported("scheduleWithFixedDelay"); + } + + @Override + public void execute(final Runnable command) { + throw unsupported("execute"); + } + + @Override + public void shutdown() { + throw unsupported("shutdown"); + } + + @Override + public List shutdownNow() { + throw unsupported("shutdownNow"); + } + + @Override + public boolean isShutdown() { + throw unsupported("isShutdown"); + } + + @Override + public boolean isTerminated() { + throw unsupported("isTerminated"); + } + + @Override + public boolean awaitTermination(final long timeout, final TimeUnit unit) { + throw unsupported("awaitTermination"); + } + + @Override + public Future submit(final Callable task) { + throw unsupported("submit(Callable)"); + } + + @Override + public Future submit(final Runnable task, final T result) { + throw unsupported("submit(Runnable, T)"); + } + + @Override + public Future submit(final Runnable task) { + throw unsupported("submit(Runnable)"); + } + + @Override + public List> invokeAll(final java.util.Collection> tasks) { + throw unsupported("invokeAll"); + } + + @Override + public List> invokeAll(final java.util.Collection> tasks, final long timeout, final TimeUnit unit) { + throw unsupported("invokeAll"); + } + + @Override + public T invokeAny(final java.util.Collection> tasks) throws ExecutionException { + throw unsupported("invokeAny"); + } + + @Override + public T invokeAny(final java.util.Collection> tasks, final long timeout, final TimeUnit unit) throws ExecutionException { + throw unsupported("invokeAny"); + } +} diff --git a/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml b/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml index be135f03c79..ff84e4a5e0b 100644 --- a/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml +++ b/gremlin-server/src/test/resources/org/apache/tinkerpop/gremlin/server/gremlin-server-integration.yaml @@ -31,6 +31,9 @@ host: 0.0.0.0 port: 45940 evaluationTimeout: 30000 +# Set below the code default (600000) so SettingsTest can confirm a YAML-provided value is honored; still far larger +# than any integration test's runtime (8 minutes), so it never fires during a test. +maxTransactionLifetime: 480000 graphs: { graph: { configuration: conf/tinkergraph-empty.properties,