Skip to content

feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397

Open
clintropolis wants to merge 3 commits intoapache:masterfrom
clintropolis:async-cursor-factory
Open

feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397
clintropolis wants to merge 3 commits intoapache:masterfrom
clintropolis:async-cursor-factory

Conversation

@clintropolis
Copy link
Copy Markdown
Member

Description

This PR adds an async variant of CursorFactory.makeCursorHolder and migrates MSQ frame processors to use it, so that future cursor factories backed by partial / lazy-loaded storage can perform I/O without blocking the processing
thread. This PR introduces no partial-loading behavior on its own, instead it just establishes the integration shape for more upcoming partial-segment work that will follow this PR. Every existing cursor factory remains synchronous via the default implementation which just calls Futures.immediateFuture(makeCursorHolder(spec)).

Worth noting, I am planning some changes in a separate PR for V10 segments so that a partial segment can provide a TimeBoundaryInspector for usage by GroupByPreShuffleFrameProcessor without needing to download any column data so we can avoid making an async variant of it.

changes:

  • add CursorFactory.makeCursorHolderAsync(CursorBuildSpec) for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning Futures.immediateFuture(makeCursorHolder(spec)) so existing implementations remain async-correct without changes
  • add GroupingEngine.processAsync returning ListenableFuture<Sequence<ResultRow>> that uses makeCursorHolderAsync, extracting shared processWithCursorHolder helper from GroupingEngine.process()
  • migrate ScanQueryFrameProcessor.runWithSegment to call makeCursorHolderAsync and yield via ReturnOrAwait.awaitAllFutures while the future is pending
  • migrate GroupByPreShuffleFrameProcessor.runWithSegment cursor path to call GroupingEngine.processAsync and yield via ReturnOrAwait.awaitAllFutures

… use it

changes:
* add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `Futures.immediateFuture(makeCursorHolder(spec))` so existing implementations remain async-correct without changes
* add `GroupingEngine.processAsync` returning `ListenableFuture<Sequence<ResultRow>>` that uses `makeCursorHolderAsync`, extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()`
* migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait.awaitAllFutures` while the future is  pending
* migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.processAsync` and yield via `ReturnOrAwait.awaitAllFutures`
@github-actions github-actions Bot added Area - Batch Ingestion Area - Segment Format and Ser/De Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels May 1, 2026
* Asynchronous variant of {@link #process} that obtains the {@link CursorHolder} from
* {@link CursorFactory#makeCursorHolderAsync} so callers running on threads that must not block on I/O
* (e.g. MSQ frame processors) can yield via {@link org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures}
* until the cursor holder is ready.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This javadoc feels a bit too specific to me. It's enough to say that this is an asynchronous variant of process that uses CursorFactory#makeCursorHolderAsync to avoid blocking on acquisition of CursorHolder.

final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics);
return FutureUtils.transform(
cursorFactory.makeCursorHolderAsync(buildSpec),
cursorHolder -> processWithCursorHolder(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this end up doing the main processing in whatever thread happened to resolve the makeCursorHolderAsync future? (Because FutureUtils.transform uses a direct executor.) Possibly that'd be in a virtual storage loader thread.

I think we'll need to either adjust this method to accept a ListenableExecutorService that will be used to run processWithCursorHolder, or break it up so callers first call groupingEngine.makeCursorHolderAsync and then call groupingEngine.processCursorHolder.

bufferHolder = bufferPool.take();
}
catch (Throwable e) {
CloseableUtils.closeAndWrapExceptions(cursorHolder);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder) will properly retain exceptions from closing cursorHolder as suppressed exceptions on e.

Although, there's probably some way of structuring this code to use the Closer to handle this better. Like, create the Closer first, register cursorHolder right after it's created, start the main try, then register bufferHolder after it's acquired. If the buffer fails to be acquired then the catch will close the Closer and release the cursorHolder.

);
}

cursorHolderFuture = cursorFactory.makeCursorHolderAsync(
Copy link
Copy Markdown
Contributor

@gianm gianm May 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling futures that return closeable things is tricky. Maybe we can improve it by changing the return of makeCursorHolderAsync from ListenableFuture<CursorHolder> to AsyncCursorHolder that is closeable and has methods get() (blocks if not ready), close() (closes the resource no matter where it is in its lifecycle), and addReadyCallback(Runnable) (used by nonblocking callers to learn when get is ready).

The problem with the future approach is that once this call site gets the cursorHolderFuture, it's responsible for monitoring the future and closing cursorHolder if the future resolves successfully. This has to be done even if the processor is canceled before it has a chance to run through normally. It requires extra carefulness and is easy to mess up.

One way it can be handled is by attaching a callback in cleanup that closes the holder in onSuccess, like:

if (cursorHolderFuture != null) {
  Futures.addCallback(
    cursorHolderFuture,
    new FutureCallback<>() {
      void onSuccess(CursorHolder holder) { holder.close(); }
      void onFailure(Throwable t) { /* nothing */ }
    }
  );
}

But even with this structure, it's important watch out for pitfalls. A big one is that you can never cancel a future that returns a closeable thing. Cancellation of the future can cause the object to be orphaned and eventually GCed without being closed (if the object is created before cancellation has a chance to interrupt whatever was creating it).

If we can avoid these problems by returning something directly closeable (like this AsyncCursorHolder idea) rather than future-of-closeable, then the caller code becomes simpler.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

This is an automated review by Codex GPT-5

CursorBuildSpec buildSpec
)
{
final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Close cursorHolder if query config override throws

processWithCursorHolder now receives an already-created CursorHolder, but computes querySpecificConfig before entering any cleanup-protected block. If withOverrides throws, for example due to an invalid groupBy query context value, the CursorHolder returned by makeCursorHolderAsync/makeCursorHolder is never closed. Move this config resolution before acquiring the holder, or wrap it so cursorHolder is closed on every pre-sequence failure.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.


This is an automated review by Codex GPT-5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 Area - Segment Format and Ser/De

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants