Skip to content

feat: Bounded Stream Supervisor#19372

Open
aho135 wants to merge 44 commits intoapache:masterfrom
aho135:bounded-stream-supervisor
Open

feat: Bounded Stream Supervisor#19372
aho135 wants to merge 44 commits intoapache:masterfrom
aho135:bounded-stream-supervisor

Conversation

@aho135
Copy link
Copy Markdown
Contributor

@aho135 aho135 commented Apr 24, 2026

Description

Introduces a new property to the Stream Supervisor spec IOConfig called boundedStreamConfig which allows operators to specify start and end offset ranges for short-lived supervised ingestion. This property modifies the main Supervisor run loop to only ingest from and monitor partitions specified in the boundedStreamConfig. After the offset range has been consumed the Supervisor will transition into a terminal state (COMPLETED). The motivation for this PR came out of #19191 which submits backfill tasks that are unsupervised. Once this change is merged, 19191 can be enhanced to use the boundedStreamConfig so that the backfill tasks are supervised.

Release note

Adds a property called boundedStreamConfig to the SeekableStreamSupervisorIOConfig which allows operators to spin up a Supervisor that consumes only a specified offset range.


Key changed/added classes in this PR
  • SeekableStreamSupervisorIOConfig
  • BoundedStreamConfig
  • SeekableStreamSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@aho135 aho135 changed the title Bounded stream supervisor feat: Bounded stream supervisor Apr 24, 2026
@aho135 aho135 changed the title feat: Bounded stream supervisor feat: Bounded Stream Supervisor Apr 24, 2026
@aho135 aho135 requested a review from abhishekrb19 April 24, 2026 17:48
exclusiveStartSequenceNumberPartitions,
generateSequenceName(
unfilteredStartingSequencesForSequenceName == null
? startingSequences
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @aho135 - took a quick glance and the approach looks good to me overall!
I’m still going through some of the main files and just checkpointing my review. Do you think it would be possible to add a simple embedded test with the new config for some end-to-end coverage?

this.startSequenceNumbers = Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers");
this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers, "endSequenceNumbers");

// Validation
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a guard rail, I think it'll be good to have stricter checks for each partition so there's no unintended behavior with incorrect ranges specified:
startSequenceNumbers < endSequenceNumbers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validation is a bit difficult to do within BoundedStreamConfig because of the generic typing. But I do see there is already validation for this in Task IOConfig. kafka

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation there isn't strict enough though because startOffset can equal endOffset. In that scenario the Supervisor spins up a task that consumes nothing and then shuts down. But since no data was consumed there is no metadata update so it gets stuck in a loop where it keeps spinning up tasks. I added additional validation to handle this scenario in this commit

@@ -4255,6 +4418,23 @@ private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartiti
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Bounded starts can be ignored when metadata exists

getOffsetFromStorageForPartition only falls back to boundedStreamConfig.startSequenceNumbers when no metadata/checkpoint offset exists. If a supervisor is reset or reconfigured with a requested bounded start while metadata storage still has an older offset, the stored metadata wins and the task starts from the stale position instead of the user-supplied bounded start. That can skip the requested backfill range or process a different interval than configured. Bounded mode should either clear/namespace old metadata for the run or explicitly prefer the configured start when initializing the bounded task group.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FrankChen021!

Bounded mode should either clear/namespace old metadata for the run

I'm a bit wary of automatic cleanup of metadata. I'm thinking through the scenario where a cluster operator has a running Supervisor. They want to re-ingest some older data so they resubmit the exact same spec (forgetting to update id) so the Bounded Supervisor succeeds but the previously committed offset gets lost.

explicitly prefer the configured start when initializing the bounded task group.

This falls into the same issue as above where if the operator forgets to set the id to something different than the running Supervisor then the previous committed offset is lost forever

I'm leaning towards adding validation that if metadata already exists for the id then just throw an exception and suggest the operator to resubmit with a different id or reset the Supervisor. Curious to hear your thoughts on this approach. Thanks again

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That validation approach sounds right to me. The important part is preventing a bounded supervisor from accidentally mixing explicit bounded start offsets with existing committed metadata for the same id; failing fast with guidance to use a different id or reset the supervisor would avoid the silent stale-offset behavior without deleting or overwriting a running supervisor's offsets.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankChen021 Thinking through this one a bit more, the validation approach is a bit tricky. It's not straightforward to tell if the existing metadata is from the bounded Supervisor itself (in which case starting from the metadata would be the correct behavior) or if it's from a previous Supervisor.

One approach we can take is that if the metadata offset falls within the configured start/end offsets then use that, otherwise fall back to startOffset. This does run the risk of partial ingestion of the specified range though.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the source of the metadata is the hard part here, but I would avoid using "metadata is within the configured start/end range" as the deciding rule. If the stored offset is inside [start, end), starting there can still silently skip the prefix [start, storedOffset), which is the same class of surprise as the original issue. I think the safer behavior is still to fail fast when bounded mode finds existing metadata for the configured bounded partitions unless there is an explicit signal that this is a resume of the same bounded run.

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 May 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @FrankChen021! I took a stab at this in the most recent commit
Please let me know your thoughts when you get the chance

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks like the right direction to me. Persisting the bounded config in the datasource metadata and rejecting existing metadata whose bounded config is missing or different addresses the silent stale-offset case, while still allowing a supervisor to resume metadata from the same bounded run.

Comment on lines +4643 to +4651
/**
* Handle bounded processing completion by shutting down the supervisor.
* At this point, all task groups are already empty (verified by isBoundedWorkComplete),
* so we just need to mark the supervisor as completed.
*/
private void handleBoundedCompletion()
{
log.info("Bounded processing complete for supervisor[%s]. Marking as COMPLETED.", supervisorId);
stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this call stop() with this COMPLETED state so things get unregistered and the executor is removed?

Copy link
Copy Markdown
Contributor Author

@aho135 aho135 Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One workflow I was testing out was to submit a bounded Supervisor and have it run to completion. Then I adjusted the start/end offsets and re-submitted the spec. Then I did a hard reset to clear the metadata so it could ingest the new offset range. For this kind of workflow we would need the executor to continue running even after the initial completion

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The latest commit handles this workflow

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 1
P2 0
P3 0
Total 1

This is an automated review by Codex GPT-5

for (PartitionIdType partition : partitionsInGroup) {
SequenceOffsetType start = startOffsets.get(partition);
SequenceOffsetType end = endOffsets.get(partition);
if (!isOffsetAtOrBeyond(start, end)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Kinesis bounded ranges with start == end are skipped

The new empty-range check treats start >= end as completed for all bounded supervisors before creating any task. That is valid for Kafka's exclusive end offsets, but Kinesis declares bounded end offsets as inclusive and its task runner returns isEndOffsetExclusive() == false, so a Kinesis bounded ingestion for a single record where startSequenceNumbers equals endSequenceNumbers is marked COMPLETED without reading that record. This should be provider-aware, for example only treating equality as empty when the end offset is exclusive, while still rejecting/handling start > end appropriately.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.


This is an automated review by Codex GPT-5

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants