feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397
feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397clintropolis wants to merge 7 commits intoapache:masterfrom
Conversation
… use it changes: * add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `Futures.immediateFuture(makeCursorHolder(spec))` so existing implementations remain async-correct without changes * add `GroupingEngine.processAsync` returning `ListenableFuture<Sequence<ResultRow>>` that uses `makeCursorHolderAsync`, extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()` * migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait.awaitAllFutures` while the future is pending * migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.processAsync` and yield via `ReturnOrAwait.awaitAllFutures`
| * Asynchronous variant of {@link #process} that obtains the {@link CursorHolder} from | ||
| * {@link CursorFactory#makeCursorHolderAsync} so callers running on threads that must not block on I/O | ||
| * (e.g. MSQ frame processors) can yield via {@link org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures} | ||
| * until the cursor holder is ready. |
There was a problem hiding this comment.
This javadoc feels a bit too specific to me. It's enough to say that this is an asynchronous variant of process that uses CursorFactory#makeCursorHolderAsync to avoid blocking on acquisition of CursorHolder.
| final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics); | ||
| return FutureUtils.transform( | ||
| cursorFactory.makeCursorHolderAsync(buildSpec), | ||
| cursorHolder -> processWithCursorHolder( |
There was a problem hiding this comment.
Won't this end up doing the main processing in whatever thread happened to resolve the makeCursorHolderAsync future? (Because FutureUtils.transform uses a direct executor.) Possibly that'd be in a virtual storage loader thread.
I think we'll need to either adjust this method to accept a ListenableExecutorService that will be used to run processWithCursorHolder, or break it up so callers first call groupingEngine.makeCursorHolderAsync and then call groupingEngine.processCursorHolder.
| bufferHolder = bufferPool.take(); | ||
| } | ||
| catch (Throwable e) { | ||
| CloseableUtils.closeAndWrapExceptions(cursorHolder); |
There was a problem hiding this comment.
throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder) will properly retain exceptions from closing cursorHolder as suppressed exceptions on e.
Although, there's probably some way of structuring this code to use the Closer to handle this better. Like, create the Closer first, register cursorHolder right after it's created, start the main try, then register bufferHolder after it's acquired. If the buffer fails to be acquired then the catch will close the Closer and release the cursorHolder.
| ); | ||
| } | ||
|
|
||
| cursorHolderFuture = cursorFactory.makeCursorHolderAsync( |
There was a problem hiding this comment.
Handling futures that return closeable things is tricky. Maybe we can improve it by changing the return of makeCursorHolderAsync from ListenableFuture<CursorHolder> to AsyncCursorHolder that is closeable and has methods get() (blocks if not ready), close() (closes the resource no matter where it is in its lifecycle), and addReadyCallback(Runnable) (used by nonblocking callers to learn when get is ready).
The problem with the future approach is that once this call site gets the cursorHolderFuture, it's responsible for monitoring the future and closing cursorHolder if the future resolves successfully. This has to be done even if the processor is canceled before it has a chance to run through normally. It requires extra carefulness and is easy to mess up.
One way it can be handled is by attaching a callback in cleanup that closes the holder in onSuccess, like:
if (cursorHolderFuture != null) {
Futures.addCallback(
cursorHolderFuture,
new FutureCallback<>() {
void onSuccess(CursorHolder holder) { holder.close(); }
void onFailure(Throwable t) { /* nothing */ }
}
);
}
But even with this structure, it's important watch out for pitfalls. A big one is that you can never cancel a future that returns a closeable thing. Cancellation of the future can cause the object to be orphaned and eventually GCed without being closed (if the object is created before cancellation has a chance to interrupt whatever was creating it).
If we can avoid these problems by returning something directly closeable (like this AsyncCursorHolder idea) rather than future-of-closeable, then the caller code becomes simpler.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
This is an automated review by Codex GPT-5
| CursorBuildSpec buildSpec | ||
| ) | ||
| { | ||
| final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query); |
There was a problem hiding this comment.
P2 Close cursorHolder if query config override throws
processWithCursorHolder now receives an already-created CursorHolder, but computes querySpecificConfig before entering any cleanup-protected block. If withOverrides throws, for example due to an invalid groupBy query context value, the CursorHolder returned by makeCursorHolderAsync/makeCursorHolder is never closed. Move this config resolution before acquiring the holder, or wrap it so cursorHolder is closed on every pre-sequence failure.
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
This is an automated review by Codex GPT-5
| * regardless of where the underlying load is in its lifecycle. | ||
| * <p> | ||
| * The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable) | ||
| * makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future |
| * can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the | ||
| * load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves. | ||
| * <p> | ||
| * Typical usage from a non-blocking caller (e.g. an MSQ frame processor): |
There was a problem hiding this comment.
No reason to include (e.g. an MSQ frame processor) here. I would also edit the code sample to be less MSQ specific.
There was a problem hiding this comment.
The example code got a bit weird trying to get rid of ReturnOrAwait, so instead of tried to frame it that the example is using 'ReturnOrAwait' to show how you're supposed to use this thing, since the important part is the yield-then-resume when ready pattern, which i think makes it feel less like this is a thing for MSQ than the last iteration did?
| } | ||
|
|
||
| private final ListenableFuture<CursorHolder> future; | ||
| private boolean closed = false; |
There was a problem hiding this comment.
use @GuardedBy on closed and disposed, since synchronization is crucial given how they're used.
| * <li>Already closed: no-op.</li> | ||
| * </ul> | ||
| * Note that closing does NOT cancel an in-flight load — cancelling a future-of-Closeable is the exact lifecycle | ||
| * hazard this class exists to prevent. The load completes normally and the resulting holder is closed promptly. |
There was a problem hiding this comment.
Really we should structure this to be capable of canceling the in-flight load. I think to make it work we'll want to not base this class on futures at all. Something like:
- Accept a
Runnable cancelerin the constructor - Replace the
futurefield withEither<Throwable, T> - Expose
setandsetExceptionmethods that returnboolean(true for set accepted, false for set not accepted) - In
close, callcancelerif the set methods haven't been called yet - Whatever provides the actual
CursorHoldershould close it ifsetreturns false
There was a problem hiding this comment.
This is nicer, though I ended up not using Either for the field since it didn't save me much and was kind of awkward since internally we are always dealing with one or the other (i did use it to make it easier to share a set method internally though)
| cb.run(); | ||
| } | ||
| catch (Throwable ignored) { | ||
| // Best-effort; one bad callback shouldn't break others. |
There was a problem hiding this comment.
log.warn something, otherwise a callback failure will be completely silent, and this could make debugging something difficult down the road. Especially since the likely outcome of an readiness callback failing would be that things just get stalled out.
| } | ||
| if (error != null) { | ||
| // pass through as is | ||
| if (error instanceof RuntimeException runtime) { |
| * <p> | ||
| * Callbacks registered via {@link #addReadyCallback} fire outside the lock to avoid re-entrancy deadlocks. | ||
| */ | ||
| public boolean set(CursorHolder holder) |
There was a problem hiding this comment.
This method should reject null holder with a defensive exception.
| * itself. Throws {@link DruidException} if the load was already completed (from prior calls to this method or | ||
| * {@link #setException}). | ||
| * <p> | ||
| * Callbacks registered via {@link #addReadyCallback} fire outside the lock to avoid re-entrancy deadlocks. |
There was a problem hiding this comment.
What does this comment mean? Seems like there's only one lock (this) and it should be fine to re-enter it.
There was a problem hiding this comment.
oh, i guess bad wording; like its more like avoiding problems like if the callback needs to hold some other lock we don't have any weird ordering issues like if that lock is some lock shared between callback thread and producing thread, will try to clarify
| if (result == null) { | ||
| throw DruidException.defensive("AsyncCursorHolder is not ready yet"); | ||
| } | ||
| disposed = true; |
There was a problem hiding this comment.
set result to null here to allow GC? The AsyncCursorHolder can live longer than the reference from release() if the AsyncCursorHolder is added to a high-level Closer or withBaggage or something like that.
| final Runnable cancelerToRun; | ||
| synchronized (this) { | ||
| if (closed) { | ||
| return; |
There was a problem hiding this comment.
Instead of idempotent close, how about throwing a defensive error here? Double-close can be indicative of resource management problems, and an exception would help us identify those potential issues.
If you do this then the javadoc should be updated too.
| return; | ||
| } | ||
| if (result != null) { | ||
| // Result is here and no one has released it; we close it. |
There was a problem hiding this comment.
I think this logic would end up being simpler if you had nulled out result in release. It would then just be: always close result if it is nonnull, because if it's nonnull, we own it.
There was a problem hiding this comment.
I added the nulling out of result in release(), however i still have the disposed flag because otherwise calling release would allow something to be able to call set/setException again which seems unchill, so I've kept this short-circuit on disposed here as is and haven't really changed this part.
We do essentially always call close on the result if we get to this part where we check not null, stuff maybe looks a bit more funny because we actually call close and the canceler outside of the synchronized block?
Description
This PR adds an async variant of
CursorFactory.makeCursorHolderand migrates MSQ frame processors to use it, so that future cursor factories backed by partial / lazy-loaded storage can perform I/O without blocking the processingthread. This PR introduces no partial-loading behavior on its own, instead it just establishes the integration shape for more upcoming partial-segment work that will follow this PR. Every existing cursor factory remains synchronous via the default implementation which just calls
Futures.immediateFuture(makeCursorHolder(spec)).Worth noting, I am planning some changes in a separate PR for V10 segments so that a partial segment can provide a
TimeBoundaryInspectorfor usage byGroupByPreShuffleFrameProcessorwithout needing to download any column data so we can avoid making an async variant of it.changes:
CursorFactory.makeCursorHolderAsync(CursorBuildSpec)for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returningFutures.immediateFuture(makeCursorHolder(spec))so existing implementations remain async-correct without changesGroupingEngine.processAsyncreturningListenableFuture<Sequence<ResultRow>>that usesmakeCursorHolderAsync, extracting sharedprocessWithCursorHolderhelper fromGroupingEngine.process()ScanQueryFrameProcessor.runWithSegmentto callmakeCursorHolderAsyncand yield viaReturnOrAwait.awaitAllFutureswhile the future is pendingGroupByPreShuffleFrameProcessor.runWithSegmentcursor path to callGroupingEngine.processAsyncand yield viaReturnOrAwait.awaitAllFutures