Skip to content

feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397

Open
clintropolis wants to merge 7 commits intoapache:masterfrom
clintropolis:async-cursor-factory
Open

feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397
clintropolis wants to merge 7 commits intoapache:masterfrom
clintropolis:async-cursor-factory

Conversation

@clintropolis
Copy link
Copy Markdown
Member

Description

This PR adds an async variant of CursorFactory.makeCursorHolder and migrates MSQ frame processors to use it, so that future cursor factories backed by partial / lazy-loaded storage can perform I/O without blocking the processing
thread. This PR introduces no partial-loading behavior on its own, instead it just establishes the integration shape for more upcoming partial-segment work that will follow this PR. Every existing cursor factory remains synchronous via the default implementation which just calls Futures.immediateFuture(makeCursorHolder(spec)).

Worth noting, I am planning some changes in a separate PR for V10 segments so that a partial segment can provide a TimeBoundaryInspector for usage by GroupByPreShuffleFrameProcessor without needing to download any column data so we can avoid making an async variant of it.

changes:

  • add CursorFactory.makeCursorHolderAsync(CursorBuildSpec) for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning Futures.immediateFuture(makeCursorHolder(spec)) so existing implementations remain async-correct without changes
  • add GroupingEngine.processAsync returning ListenableFuture<Sequence<ResultRow>> that uses makeCursorHolderAsync, extracting shared processWithCursorHolder helper from GroupingEngine.process()
  • migrate ScanQueryFrameProcessor.runWithSegment to call makeCursorHolderAsync and yield via ReturnOrAwait.awaitAllFutures while the future is pending
  • migrate GroupByPreShuffleFrameProcessor.runWithSegment cursor path to call GroupingEngine.processAsync and yield via ReturnOrAwait.awaitAllFutures

… use it

changes:
* add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `Futures.immediateFuture(makeCursorHolder(spec))` so existing implementations remain async-correct without changes
* add `GroupingEngine.processAsync` returning `ListenableFuture<Sequence<ResultRow>>` that uses `makeCursorHolderAsync`, extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()`
* migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait.awaitAllFutures` while the future is  pending
* migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.processAsync` and yield via `ReturnOrAwait.awaitAllFutures`
@github-actions github-actions Bot added Area - Batch Ingestion Area - Segment Format and Ser/De Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels May 1, 2026
* Asynchronous variant of {@link #process} that obtains the {@link CursorHolder} from
* {@link CursorFactory#makeCursorHolderAsync} so callers running on threads that must not block on I/O
* (e.g. MSQ frame processors) can yield via {@link org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
* until the cursor holder is ready.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This javadoc feels a bit too specific to me. It's enough to say that this is an asynchronous variant of process that uses CursorFactory#makeCursorHolderAsync to avoid blocking on acquisition of CursorHolder.

final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics);
return FutureUtils.transform(
cursorFactory.makeCursorHolderAsync(buildSpec),
cursorHolder -> processWithCursorHolder(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this end up doing the main processing in whatever thread happened to resolve the makeCursorHolderAsync future? (Because FutureUtils.transform uses a direct executor.) Possibly that'd be in a virtual storage loader thread.

I think we'll need to either adjust this method to accept a ListenableExecutorService that will be used to run processWithCursorHolder, or break it up so callers first call groupingEngine.makeCursorHolderAsync and then call groupingEngine.processCursorHolder.

bufferHolder = bufferPool.take();
}
catch (Throwable e) {
CloseableUtils.closeAndWrapExceptions(cursorHolder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder) will properly retain exceptions from closing cursorHolder as suppressed exceptions on e.

Although, there's probably some way of structuring this code to use the Closer to handle this better. Like, create the Closer first, register cursorHolder right after it's created, start the main try, then register bufferHolder after it's acquired. If the buffer fails to be acquired then the catch will close the Closer and release the cursorHolder.

);
}

cursorHolderFuture = cursorFactory.makeCursorHolderAsync(
Copy link
Copy Markdown
Contributor

@gianm gianm May 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling futures that return closeable things is tricky. Maybe we can improve it by changing the return of makeCursorHolderAsync from ListenableFuture<CursorHolder> to AsyncCursorHolder that is closeable and has methods get() (blocks if not ready), close() (closes the resource no matter where it is in its lifecycle), and addReadyCallback(Runnable) (used by nonblocking callers to learn when get is ready).

The problem with the future approach is that once this call site gets the cursorHolderFuture, it's responsible for monitoring the future and closing cursorHolder if the future resolves successfully. This has to be done even if the processor is canceled before it has a chance to run through normally. It requires extra carefulness and is easy to mess up.

One way it can be handled is by attaching a callback in cleanup that closes the holder in onSuccess, like:

if (cursorHolderFuture != null) {
  Futures.addCallback(
    cursorHolderFuture,
    new FutureCallback<>() {
      void onSuccess(CursorHolder holder) { holder.close(); }
      void onFailure(Throwable t) { /* nothing */ }
    }
  );
}

But even with this structure, it's important watch out for pitfalls. A big one is that you can never cancel a future that returns a closeable thing. Cancellation of the future can cause the object to be orphaned and eventually GCed without being closed (if the object is created before cancellation has a chance to interrupt whatever was creating it).

If we can avoid these problems by returning something directly closeable (like this AsyncCursorHolder idea) rather than future-of-closeable, then the caller code becomes simpler.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

This is an automated review by Codex GPT-5

CursorBuildSpec buildSpec
)
{
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Close cursorHolder if query config override throws

processWithCursorHolder now receives an already-created CursorHolder, but computes querySpecificConfig before entering any cleanup-protected block. If withOverrides throws, for example due to an invalid groupBy query context value, the CursorHolder returned by makeCursorHolderAsync/makeCursorHolder is never closed. Move this config resolution before acquiring the holder, or wrap it so cursorHolder is closed on every pre-sequence failure.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.


This is an automated review by Codex GPT-5

* regardless of where the underlying load is in its lifecycle.
* <p>
* The hazard this exists to avoid: returning a {@code ListenableFuture<CursorHolder>} (or similar future-of-Closeable)
* makes correct cleanup error-prone, where cancelling the future or letting a caller fail before consuming the future
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canceling (spelling)

* can orphan the produced holder, leaking the underlying resources. By exposing a Closeable that internally tracks the
* load and disposes whatever has materialized, callers don't have to write that bookkeeping themselves.
* <p>
* Typical usage from a non-blocking caller (e.g. an MSQ frame processor):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason to include (e.g. an MSQ frame processor) here. I would also edit the code sample to be less MSQ specific.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example code got a bit weird trying to get rid of ReturnOrAwait, so instead of tried to frame it that the example is using 'ReturnOrAwait' to show how you're supposed to use this thing, since the important part is the yield-then-resume when ready pattern, which i think makes it feel less like this is a thing for MSQ than the last iteration did?

Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
}

private final ListenableFuture<CursorHolder> future;
private boolean closed = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use @GuardedBy on closed and disposed, since synchronization is crucial given how they're used.

* <li>Already closed: no-op.</li>
* </ul>
* Note that closing does NOT cancel an in-flight load — cancelling a future-of-Closeable is the exact lifecycle
* hazard this class exists to prevent. The load completes normally and the resulting holder is closed promptly.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really we should structure this to be capable of canceling the in-flight load. I think to make it work we'll want to not base this class on futures at all. Something like:

  • Accept a Runnable canceler in the constructor
  • Replace the future field with Either<Throwable, T>
  • Expose set and setException methods that return boolean (true for set accepted, false for set not accepted)
  • In close, call canceler if the set methods haven't been called yet
  • Whatever provides the actual CursorHolder should close it if set returns false

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nicer, though I ended up not using Either for the field since it didn't save me much and was kind of awkward since internally we are always dealing with one or the other (i did use it to make it easier to share a set method internally though)

Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
Comment thread processing/src/main/java/org/apache/druid/segment/AsyncCursorHolder.java Outdated
cb.run();
}
catch (Throwable ignored) {
// Best-effort; one bad callback shouldn't break others.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.warn something, otherwise a callback failure will be completely silent, and this could make debugging something difficult down the road. Especially since the likely outcome of an readiness callback failing would be that things just get stalled out.

}
if (error != null) {
// pass through as is
if (error instanceof RuntimeException runtime) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or Error?

* <p>
* Callbacks registered via {@link #addReadyCallback} fire outside the lock to avoid re-entrancy deadlocks.
*/
public boolean set(CursorHolder holder)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should reject null holder with a defensive exception.

* itself. Throws {@link DruidException} if the load was already completed (from prior calls to this method or
* {@link #setException}).
* <p>
* Callbacks registered via {@link #addReadyCallback} fire outside the lock to avoid re-entrancy deadlocks.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this comment mean? Seems like there's only one lock (this) and it should be fine to re-enter it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, i guess bad wording; like its more like avoiding problems like if the callback needs to hold some other lock we don't have any weird ordering issues like if that lock is some lock shared between callback thread and producing thread, will try to clarify

if (result == null) {
throw DruidException.defensive("AsyncCursorHolder is not ready yet");
}
disposed = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set result to null here to allow GC? The AsyncCursorHolder can live longer than the reference from release() if the AsyncCursorHolder is added to a high-level Closer or withBaggage or something like that.

final Runnable cancelerToRun;
synchronized (this) {
if (closed) {
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of idempotent close, how about throwing a defensive error here? Double-close can be indicative of resource management problems, and an exception would help us identify those potential issues.

If you do this then the javadoc should be updated too.

return;
}
if (result != null) {
// Result is here and no one has released it; we close it.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this logic would end up being simpler if you had nulled out result in release. It would then just be: always close result if it is nonnull, because if it's nonnull, we own it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the nulling out of result in release(), however i still have the disposed flag because otherwise calling release would allow something to be able to call set/setException again which seems unchill, so I've kept this short-circuit on disposed here as is and haven't really changed this part.

We do essentially always call close on the result if we get to this part where we check not null, stuff maybe looks a bit more funny because we actually call close and the canceler outside of the synchronized block?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Segment Format and Ser/De

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants