feat: Bounded Stream Supervisor#19372
Conversation
| exclusiveStartSequenceNumberPartitions, | ||
| generateSequenceName( | ||
| unfilteredStartingSequencesForSequenceName == null | ||
| ? startingSequences |
abhishekrb19
left a comment
There was a problem hiding this comment.
Thanks @aho135 - took a quick glance and the approach looks good to me overall!
I’m still going through some of the main files and just checkpointing my review. Do you think it would be possible to add a simple embedded test with the new config for some end-to-end coverage?
| this.startSequenceNumbers = Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers"); | ||
| this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers, "endSequenceNumbers"); | ||
|
|
||
| // Validation |
There was a problem hiding this comment.
As a guard rail, I think it'll be good to have stricter checks for each partition so there's no unintended behavior with incorrect ranges specified:
startSequenceNumbers < endSequenceNumbers
There was a problem hiding this comment.
This validation is a bit difficult to do within BoundedStreamConfig because of the generic typing. But I do see there is already validation for this in Task IOConfig. kafka
There was a problem hiding this comment.
The validation there isn't strict enough though because startOffset can equal endOffset. In that scenario the Supervisor spins up a task that consumes nothing and then shuts down. But since no data was consumed there is no metadata update so it gets stuck in a loop where it keeps spinning up tasks. I added additional validation to handle this scenario in this commit
| @@ -4255,6 +4418,23 @@ private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartiti | |||
| } | |||
There was a problem hiding this comment.
P1 Bounded starts can be ignored when metadata exists
getOffsetFromStorageForPartition only falls back to boundedStreamConfig.startSequenceNumbers when no metadata/checkpoint offset exists. If a supervisor is reset or reconfigured with a requested bounded start while metadata storage still has an older offset, the stored metadata wins and the task starts from the stale position instead of the user-supplied bounded start. That can skip the requested backfill range or process a different interval than configured. Bounded mode should either clear/namespace old metadata for the run or explicitly prefer the configured start when initializing the bounded task group.
There was a problem hiding this comment.
Thanks for the review @FrankChen021!
Bounded mode should either clear/namespace old metadata for the run
I'm a bit wary of automatic cleanup of metadata. I'm thinking through the scenario where a cluster operator has a running Supervisor. They want to re-ingest some older data so they resubmit the exact same spec (forgetting to update id) so the Bounded Supervisor succeeds but the previously committed offset gets lost.
explicitly prefer the configured start when initializing the bounded task group.
This falls into the same issue as above where if the operator forgets to set the id to something different than the running Supervisor then the previous committed offset is lost forever
I'm leaning towards adding validation that if metadata already exists for the id then just throw an exception and suggest the operator to resubmit with a different id or reset the Supervisor. Curious to hear your thoughts on this approach. Thanks again
There was a problem hiding this comment.
That validation approach sounds right to me. The important part is preventing a bounded supervisor from accidentally mixing explicit bounded start offsets with existing committed metadata for the same id; failing fast with guidance to use a different id or reset the supervisor would avoid the silent stale-offset behavior without deleting or overwriting a running supervisor's offsets.
There was a problem hiding this comment.
@FrankChen021 Thinking through this one a bit more, the validation approach is a bit tricky. It's not straightforward to tell if the existing metadata is from the bounded Supervisor itself (in which case starting from the metadata would be the correct behavior) or if it's from a previous Supervisor.
One approach we can take is that if the metadata offset falls within the configured start/end offsets then use that, otherwise fall back to startOffset. This does run the risk of partial ingestion of the specified range though.
There was a problem hiding this comment.
I agree the source of the metadata is the hard part here, but I would avoid using "metadata is within the configured start/end range" as the deciding rule. If the stored offset is inside [start, end), starting there can still silently skip the prefix [start, storedOffset), which is the same class of surprise as the original issue. I think the safer behavior is still to fail fast when bounded mode finds existing metadata for the configured bounded partitions unless there is an explicit signal that this is a resume of the same bounded run.
There was a problem hiding this comment.
Thanks @FrankChen021! I took a stab at this in the most recent commit
Please let me know your thoughts when you get the chance
There was a problem hiding this comment.
Thanks, this looks like the right direction to me. Persisting the bounded config in the datasource metadata and rejecting existing metadata whose bounded config is missing or different addresses the silent stale-offset case, while still allowing a supervisor to resume metadata from the same bounded run.
| /** | ||
| * Handle bounded processing completion by shutting down the supervisor. | ||
| * At this point, all task groups are already empty (verified by isBoundedWorkComplete), | ||
| * so we just need to mark the supervisor as completed. | ||
| */ | ||
| private void handleBoundedCompletion() | ||
| { | ||
| log.info("Bounded processing complete for supervisor[%s]. Marking as COMPLETED.", supervisorId); | ||
| stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); |
There was a problem hiding this comment.
Should this call stop() with this COMPLETED state so things get unregistered and the executor is removed?
There was a problem hiding this comment.
One workflow I was testing out was to submit a bounded Supervisor and have it run to completion. Then I adjusted the start/end offsets and re-submitted the spec. Then I did a hard reset to clear the metadata so it could ingest the new offset range. For this kind of workflow we would need the executor to continue running even after the initial completion
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 1 |
| P2 | 0 |
| P3 | 0 |
| Total | 1 |
This is an automated review by Codex GPT-5
| for (PartitionIdType partition : partitionsInGroup) { | ||
| SequenceOffsetType start = startOffsets.get(partition); | ||
| SequenceOffsetType end = endOffsets.get(partition); | ||
| if (!isOffsetAtOrBeyond(start, end)) { |
There was a problem hiding this comment.
[P1] Kinesis bounded ranges with start == end are skipped
The new empty-range check treats start >= end as completed for all bounded supervisors before creating any task. That is valid for Kafka's exclusive end offsets, but Kinesis declares bounded end offsets as inclusive and its task runner returns isEndOffsetExclusive() == false, so a Kinesis bounded ingestion for a single record where startSequenceNumbers equals endSequenceNumbers is marked COMPLETED without reading that record. This should be provider-aware, for example only treating equality as empty when the end offset is exclusive, while still rejecting/handling start > end appropriately.
FrankChen021
left a comment
There was a problem hiding this comment.
I have reviewed the code for correctness, edge cases, concurrency, and integration risks; no issues found.
This is an automated review by Codex GPT-5
Description
Introduces a new property to the Stream Supervisor spec IOConfig called boundedStreamConfig which allows operators to specify start and end offset ranges for short-lived supervised ingestion. This property modifies the main Supervisor run loop to only ingest from and monitor partitions specified in the boundedStreamConfig. After the offset range has been consumed the Supervisor will transition into a terminal state (COMPLETED). The motivation for this PR came out of #19191 which submits backfill tasks that are unsupervised. Once this change is merged, 19191 can be enhanced to use the boundedStreamConfig so that the backfill tasks are supervised.
Release note
Adds a property called boundedStreamConfig to the SeekableStreamSupervisorIOConfig which allows operators to spin up a Supervisor that consumes only a specified offset range.
Key changed/added classes in this PR
SeekableStreamSupervisorIOConfigBoundedStreamConfigSeekableStreamSupervisorThis PR has: