feat: Add scan transform to unnest arrays for realtime ingestion#19379
feat: Add scan transform to unnest arrays for realtime ingestion#19379abhishekrb19 wants to merge 10 commits intoapache:masterfrom
scan transform to unnest arrays for realtime ingestion#19379Conversation
| @JsonProperty("unnestFilter") @Nullable final DimFilter unnestFilter | ||
| ) | ||
| { | ||
| this.name = name; |
There was a problem hiding this comment.
[P2] Validate scan transform output name
ScanTransform stores name separately from unnestColumn.getOutputName(), but the transform framework treats Transform.getName() as the generated field name for collision checks and input-column pruning. If a spec sets name: "tag" but the virtual column outputs "elt", Druid accepts it, writes elt, and silently leaves references to tag unresolved while also missing collisions on elt. Reject mismatches or derive the transform name from the virtual column output.
|
|
||
| private List<String> resolveColumnsForRow(final InputRow inputRow) | ||
| { | ||
| final Set<String> columns = new LinkedHashSet<>(); |
There was a problem hiding this comment.
[P1] Preserve non-dimension raw columns in scan output
resolveColumnsForRow only emits __time, inputRow.getDimensions(), scan virtual columns, and unnest output columns. In ingestion, fields used by metricsSpec are commonly excluded from dimensions, so after a scan transform expands a row those raw metric fields disappear from the MapBasedInputRow event and downstream aggregators read null/zero instead of the original value. The previous implementation explicitly included MapBasedInputRow event keys, and the incremental diff removes the test that covered this case. Include all raw event keys, where available, when constructing scan output rows.
| cursor.reset(); | ||
| } | ||
|
|
||
| if (cursor == null || cursor.isDone()) { |
There was a problem hiding this comment.
[P2] Do not pass through rows rejected by scan filters
When the scan cursor is done, the transformer always returns buildPassthroughRow(inputRow). That makes a scan query filter unable to drop rows: a row rejected by query.getFilter() is still ingested once with unnest outputs set to null, as the updated test now asserts. This diverges from normal TransformSpec filtering and from scan/unnest filtering semantics, and it inflates row counts for specs that use scan filters to exclude input or unnested rows. Distinguish empty/null unnest passthrough from filter rejection and return an empty list when the filter rejects the row.
|
Have we done performance profiles on this? Generally I've found transforms to really slow down tput on streaming ingests. |
FrankChen021
left a comment
There was a problem hiding this comment.
| Severity | Findings |
|---|---|
| P0 | 0 |
| P1 | 0 |
| P2 | 1 |
| P3 | 0 |
| Total | 1 |
This is an automated review by Codex GPT-5
| // to dimensionExclusions. Without this, fixed-dimension ingestions with metrics would read null for | ||
| // their metric source fields in expanded rows. | ||
| if (inputRow instanceof MapBasedInputRow) { | ||
| columns.addAll(((MapBasedInputRow) inputRow).getEvent().keySet()); |
There was a problem hiding this comment.
[P2] Keep metric source fields out of dimensions
Adding every MapBasedInputRow event key to the resolved scan columns also feeds those keys into resolveDimensionColumns, which promotes metric input fields such as bytes_sent into row.getDimensions() after expansion. IncrementalIndex treats row.getDimensions() as authoritative and will discover/store those metric source fields as dimensions even though DataSchema excluded them for metricsSpec. Preserve the raw field in the event map for aggregators, but do not add non-dimension event fields to the output dimension list; add a test that verifies the metric source is not ingested as a dimension.
This PR adds scan query to Druid's transformSpec that can unnest array-valued columns into individual rows during streaming ingestion and possibly do other things that are feasible with scan query semantics. This is similar to the UNNEST support in SQL with MSQ batch ingestion, but for realtime ingestion.
New ScanTransform (type: "scan") — a multi-row transform that wraps an embedded scan query and reuses the existing ScanQueryEngine, UnnestDataSource and UnnestCursorFactory machinery to explode arrays at ingest time. Each input row is wrapped
in a temporary single-row segment, the configured scan query runs against it, and the resulting rows are emitted downstream.
The scan query's data source uses "input" as the base table, with an unnest data source to specify which column to explode and an optional unnestFilter to filter array elements
Supports string arrays, object arrays, and nested arrays - objects are preserved through unnest
This simplifies queries by avoiding UNNEST at query time and improves query performance since unnesting is done once at ingest time rather than repeatedly at query time.
Release note
Scan transform for ingestion-time array unnesting. Added a new "scan" transform type in transformSpec that unnests array-valued columns into individual rows during streaming ingestion (similar to existing UNNEST functionality with Druid SQL and the MSQ engine). The scan transform wraps a scan query with an unnest data source to explode arrays at ingest time.
This PR has: