Skip to content

feat: Add scan transform to unnest arrays for realtime ingestion#19379

Open
abhishekrb19 wants to merge 10 commits intoapache:masterfrom
abhishekrb19:scan_transform_realtime
Open

feat: Add scan transform to unnest arrays for realtime ingestion#19379
abhishekrb19 wants to merge 10 commits intoapache:masterfrom
abhishekrb19:scan_transform_realtime

Conversation

@abhishekrb19
Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 commented Apr 27, 2026

This PR adds scan query to Druid's transformSpec that can unnest array-valued columns into individual rows during streaming ingestion and possibly do other things that are feasible with scan query semantics. This is similar to the UNNEST support in SQL with MSQ batch ingestion, but for realtime ingestion.

  • New ScanTransform (type: "scan") — a multi-row transform that wraps an embedded scan query and reuses the existing ScanQueryEngine, UnnestDataSource and UnnestCursorFactory machinery to explode arrays at ingest time. Each input row is wrapped
    in a temporary single-row segment, the configured scan query runs against it, and the resulting rows are emitted downstream.

  • The scan query's data source uses "input" as the base table, with an unnest data source to specify which column to explode and an optional unnestFilter to filter array elements

  • Supports string arrays, object arrays, and nested arrays - objects are preserved through unnest

This simplifies queries by avoiding UNNEST at query time and improves query performance since unnesting is done once at ingest time rather than repeatedly at query time.

Release note

Scan transform for ingestion-time array unnesting. Added a new "scan" transform type in transformSpec that unnests array-valued columns into individual rows during streaming ingestion (similar to existing UNNEST functionality with Druid SQL and the MSQ engine). The scan transform wraps a scan query with an unnest data source to explode arrays at ingest time.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@JsonProperty("unnestFilter") @Nullable final DimFilter unnestFilter
)
{
this.name = name;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Validate scan transform output name

ScanTransform stores name separately from unnestColumn.getOutputName(), but the transform framework treats Transform.getName() as the generated field name for collision checks and input-column pruning. If a spec sets name: "tag" but the virtual column outputs "elt", Druid accepts it, writes elt, and silently leaves references to tag unresolved while also missing collisions on elt. Reject mismatches or derive the transform name from the virtual column output.


private List<String> resolveColumnsForRow(final InputRow inputRow)
{
final Set<String> columns = new LinkedHashSet<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Preserve non-dimension raw columns in scan output

resolveColumnsForRow only emits __time, inputRow.getDimensions(), scan virtual columns, and unnest output columns. In ingestion, fields used by metricsSpec are commonly excluded from dimensions, so after a scan transform expands a row those raw metric fields disappear from the MapBasedInputRow event and downstream aggregators read null/zero instead of the original value. The previous implementation explicitly included MapBasedInputRow event keys, and the incremental diff removes the test that covered this case. Include all raw event keys, where available, when constructing scan output rows.

cursor.reset();
}

if (cursor == null || cursor.isDone()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Do not pass through rows rejected by scan filters

When the scan cursor is done, the transformer always returns buildPassthroughRow(inputRow). That makes a scan query filter unable to drop rows: a row rejected by query.getFilter() is still ingested once with unnest outputs set to null, as the updated test now asserts. This diverges from normal TransformSpec filtering and from scan/unnest filtering semantics, and it inflates row counts for specs that use scan filters to exclude input or unnested rows. Distinguish empty/null unnest passthrough from filter rejection and return an empty list when the filter rejects the row.

@jtuglu1
Copy link
Copy Markdown
Contributor

jtuglu1 commented Apr 28, 2026

Have we done performance profiles on this? Generally I've found transforms to really slow down tput on streaming ingests.

Copy link
Copy Markdown
Member

@FrankChen021 FrankChen021 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Severity Findings
P0 0
P1 0
P2 1
P3 0
Total 1

This is an automated review by Codex GPT-5

// to dimensionExclusions. Without this, fixed-dimension ingestions with metrics would read null for
// their metric source fields in expanded rows.
if (inputRow instanceof MapBasedInputRow) {
columns.addAll(((MapBasedInputRow) inputRow).getEvent().keySet());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] Keep metric source fields out of dimensions

Adding every MapBasedInputRow event key to the resolved scan columns also feeds those keys into resolveDimensionColumns, which promotes metric input fields such as bytes_sent into row.getDimensions() after expansion. IncrementalIndex treats row.getDimensions() as authoritative and will discover/store those metric source fields as dimensions even though DataSchema excluded them for metricsSpec. Preserve the raw field in the event map for aggregators, but do not add non-dimension event fields to the output dimension list; add a test that verifies the metric source is not ingested as a dimension.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants