feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397
feat: add async CursorFactory API and migrate MSQ frame processors to use it#19397clintropolis wants to merge 3 commits intoapache:masterfrom
Conversation
… use it changes: * add `CursorFactory.makeCursorHolderAsync(CursorBuildSpec)` for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returning `Futures.immediateFuture(makeCursorHolder(spec))` so existing implementations remain async-correct without changes * add `GroupingEngine.processAsync` returning `ListenableFuture<Sequence<ResultRow>>` that uses `makeCursorHolderAsync`, extracting shared `processWithCursorHolder` helper from `GroupingEngine.process()` * migrate `ScanQueryFrameProcessor.runWithSegment` to call `makeCursorHolderAsync` and yield via `ReturnOrAwait.awaitAllFutures` while the future is pending * migrate `GroupByPreShuffleFrameProcessor.runWithSegment` cursor path to call `GroupingEngine.processAsync` and yield via `ReturnOrAwait.awaitAllFutures`
| * Asynchronous variant of {@link #process} that obtains the {@link CursorHolder} from | ||
| * {@link CursorFactory#makeCursorHolderAsync} so callers running on threads that must not block on I/O | ||
| * (e.g. MSQ frame processors) can yield via {@link org.apache.druid.frame.processor.ReturnOrAwait#awaitAllFutures} | ||
| * until the cursor holder is ready. |
There was a problem hiding this comment.
This javadoc feels a bit too specific to me. It's enough to say that this is an asynchronous variant of process that uses CursorFactory#makeCursorHolderAsync to avoid blocking on acquisition of CursorHolder.
| final CursorBuildSpec buildSpec = makeCursorBuildSpec(query, groupByQueryMetrics); | ||
| return FutureUtils.transform( | ||
| cursorFactory.makeCursorHolderAsync(buildSpec), | ||
| cursorHolder -> processWithCursorHolder( |
There was a problem hiding this comment.
Won't this end up doing the main processing in whatever thread happened to resolve the makeCursorHolderAsync future? (Because FutureUtils.transform uses a direct executor.) Possibly that'd be in a virtual storage loader thread.
I think we'll need to either adjust this method to accept a ListenableExecutorService that will be used to run processWithCursorHolder, or break it up so callers first call groupingEngine.makeCursorHolderAsync and then call groupingEngine.processCursorHolder.
| bufferHolder = bufferPool.take(); | ||
| } | ||
| catch (Throwable e) { | ||
| CloseableUtils.closeAndWrapExceptions(cursorHolder); |
There was a problem hiding this comment.
throw CloseableUtils.closeAndWrapInCatch(e, cursorHolder) will properly retain exceptions from closing cursorHolder as suppressed exceptions on e.
Although, there's probably some way of structuring this code to use the Closer to handle this better. Like, create the Closer first, register cursorHolder right after it's created, start the main try, then register bufferHolder after it's acquired. If the buffer fails to be acquired then the catch will close the Closer and release the cursorHolder.
| ); | ||
| } | ||
|
|
||
| cursorHolderFuture = cursorFactory.makeCursorHolderAsync( |
There was a problem hiding this comment.
Handling futures that return closeable things is tricky. Maybe we can improve it by changing the return of makeCursorHolderAsync from ListenableFuture<CursorHolder> to AsyncCursorHolder that is closeable and has methods get() (blocks if not ready), close() (closes the resource no matter where it is in its lifecycle), and addReadyCallback(Runnable) (used by nonblocking callers to learn when get is ready).
The problem with the future approach is that once this call site gets the cursorHolderFuture, it's responsible for monitoring the future and closing cursorHolder if the future resolves successfully. This has to be done even if the processor is canceled before it has a chance to run through normally. It requires extra carefulness and is easy to mess up.
One way it can be handled is by attaching a callback in cleanup that closes the holder in onSuccess, like:
if (cursorHolderFuture != null) {
Futures.addCallback(
cursorHolderFuture,
new FutureCallback<>() {
void onSuccess(CursorHolder holder) { holder.close(); }
void onFailure(Throwable t) { /* nothing */ }
}
);
}
But even with this structure, it's important watch out for pitfalls. A big one is that you can never cancel a future that returns a closeable thing. Cancellation of the future can cause the object to be orphaned and eventually GCed without being closed (if the object is created before cancellation has a chance to interrupt whatever was creating it).
If we can avoid these problems by returning something directly closeable (like this AsyncCursorHolder idea) rather than future-of-closeable, then the caller code becomes simpler.
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
This is an automated review by Codex GPT-5
| CursorBuildSpec buildSpec | ||
| ) | ||
| { | ||
| final GroupByQueryConfig querySpecificConfig = configSupplier.get().withOverrides(query); |
There was a problem hiding this comment.
P2 Close cursorHolder if query config override throws
processWithCursorHolder now receives an already-created CursorHolder, but computes querySpecificConfig before entering any cleanup-protected block. If withOverrides throws, for example due to an invalid groupBy query context value, the CursorHolder returned by makeCursorHolderAsync/makeCursorHolder is never closed. Move this config resolution before acquiring the holder, or wrap it so cursorHolder is closed on every pre-sequence failure.
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
This is an automated review by Codex GPT-5
Description
This PR adds an async variant of
CursorFactory.makeCursorHolderand migrates MSQ frame processors to use it, so that future cursor factories backed by partial / lazy-loaded storage can perform I/O without blocking the processingthread. This PR introduces no partial-loading behavior on its own, instead it just establishes the integration shape for more upcoming partial-segment work that will follow this PR. Every existing cursor factory remains synchronous via the default implementation which just calls
Futures.immediateFuture(makeCursorHolder(spec)).Worth noting, I am planning some changes in a separate PR for V10 segments so that a partial segment can provide a
TimeBoundaryInspectorfor usage byGroupByPreShuffleFrameProcessorwithout needing to download any column data so we can avoid making an async variant of it.changes:
CursorFactory.makeCursorHolderAsync(CursorBuildSpec)for cursor factories backed by partial downloads can do I/O without blocking processing threads, with a default implementation returningFutures.immediateFuture(makeCursorHolder(spec))so existing implementations remain async-correct without changesGroupingEngine.processAsyncreturningListenableFuture<Sequence<ResultRow>>that usesmakeCursorHolderAsync, extracting sharedprocessWithCursorHolderhelper fromGroupingEngine.process()ScanQueryFrameProcessor.runWithSegmentto callmakeCursorHolderAsyncand yield viaReturnOrAwait.awaitAllFutureswhile the future is pendingGroupByPreShuffleFrameProcessor.runWithSegmentcursor path to callGroupingEngine.processAsyncand yield viaReturnOrAwait.awaitAllFutures