Skip to content

Commit c9e6e79

Browse files
[fix](runtime-filter) Fix race condition in Set operator runtime filter processing (#62434)
This pull request addresses a race condition in the Set operator (`INTERSECT`/`EXCEPT`) when using the `IN_FILTER` runtime filter type. The main fix ensures that the hash table size used for runtime filter processing is captured before the probe side can modify the hash table, preventing errors during query execution. Additionally, regression tests are added to verify the fix and prevent future regressions. **Fixes for race condition in Set operator:** * The hash table size is now snapshotted in `SetSinkOperatorX<is_intersect>::sink()` before enabling the probe pipeline, and this saved value is used for runtime filter processing in `SetSinkLocalState<is_intersect>::close()`. This prevents the probe side from shrinking the hash table before the runtime filter is processed, avoiding possible `InternalError` failures. [[1]](diffhunk://#diff-7658815d4a9a8268b4ef4ad00a93998af07a421b538ad608e3f3a0e20ef805cdR103-R107) [[2]](diffhunk://#diff-7658815d4a9a8268b4ef4ad00a93998af07a421b538ad608e3f3a0e20ef805cdL48-R47) [[3]](diffhunk://#diff-1a8671c9fbc97801353bffeffe99e2189a16d275d3f865714e2cc6750ee8c54fR65-R68) * Added a new member variable `_build_hash_table_size` to `SetSinkLocalState` to store the snapshotted hash table size. **Testing and regression coverage:** * Added a new regression test suite `set_operator_in_filter.groovy` and its output file to reproduce and verify the race condition, ensuring the fix is tested for both `INTERSECT` and `EXCEPT` queries with `IN_FILTER` type and small `max_in_num`. [[1]](diffhunk://#diff-2ff8fca2d88b291375196153f6f9377e6a0d80a2b8222d63ef67d2dc307cb332R1-R144) [[2]](diffhunk://#diff-795c4c2d53c199ee4187b5aef698e62a6c993d2aad99014063c0c3540553caf3R1-R23) **Code cleanup:** * Removed unnecessary `#include "common/compile_check_begin.h"` and `#include "common/compile_check_end.h"` lines from several files for better code clarity. [[1]](diffhunk://#diff-7658815d4a9a8268b4ef4ad00a93998af07a421b538ad608e3f3a0e20ef805cdL27) [[2]](diffhunk://#diff-90e895f6474975cbb1d9b2fc276bc6d9e1a22675f5126b836fb83c13fd148eb5L31) [[3]](diffhunk://#diff-90e895f6474975cbb1d9b2fc276bc6d9e1a22675f5126b836fb83c13fd148eb5L57) [[4]](diffhunk://#diff-a8e4192eb0bf59708c56d0a59c46f601684a554628105f70c50211784a1c5271L26) --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent c491c8a commit c9e6e79

File tree

4 files changed

+172
-1
lines changed

4 files changed

+172
-1
lines changed

be/src/exec/operator/set_sink_operator.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status exec_s
4444
if (!_terminated && _runtime_filter_producer_helper && !state->is_cancelled()) {
4545
try {
4646
RETURN_IF_ERROR(_runtime_filter_producer_helper->process(
47-
state, &_shared_state->build_block, _shared_state->get_hash_table_size()));
47+
state, &_shared_state->build_block, _build_hash_table_size));
4848
} catch (Exception& e) {
4949
return Status::InternalError(
5050
"rf process meet error: {}, _terminated: {}, _finish_dependency: {}",
@@ -100,6 +100,11 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block* in_block
100100
COUNTER_SET(local_state._hash_table_size, (int64_t)hash_table_size);
101101
COUNTER_SET(local_state._valid_element_in_hash_table, valid_element_in_hash_tbl);
102102

103+
// Snapshot hash table size before enabling the probe pipeline. The probe side
104+
// can shrink the hash table via _refresh_hash_table() after set_ready(), so
105+
// close() must use this saved value for runtime filter processing.
106+
local_state._build_hash_table_size = hash_table_size;
107+
103108
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
104109
->set_ready();
105110
DCHECK_GT(_child_quantity, 1);

be/src/exec/operator/set_sink_operator.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {
4545
Status open(RuntimeState* state) override;
4646
Status terminate(RuntimeState* state) override;
4747
Status close(RuntimeState* state, Status exec_status) override;
48+
Dependency* finishdependency() override { return _finish_dependency.get(); }
4849

4950
private:
5051
friend class SetSinkOperatorX<is_intersect>;
@@ -60,6 +61,10 @@ class SetSinkLocalState final : public PipelineXSinkLocalState<SetSharedState> {
6061

6162
std::shared_ptr<RuntimeFilterProducerHelperSet> _runtime_filter_producer_helper;
6263
std::shared_ptr<CountedFinishDependency> _finish_dependency;
64+
// Snapshot of hash table size taken in sink(eos) before set_ready(). The probe side can
65+
// modify the hash table via _refresh_hash_table() after set_ready(), so close() must use
66+
// this saved value instead of calling get_hash_table_size() again.
67+
uint64_t _build_hash_table_size = 0;
6368
};
6469

6570
template <bool is_intersect>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !intersect_in_rf --
3+
1
4+
11
5+
13
6+
15
7+
3
8+
5
9+
7
10+
9
11+
12+
-- !except_in_rf --
13+
14+
-- !intersect_in_rf_zero --
15+
1
16+
11
17+
13
18+
15
19+
3
20+
5
21+
7
22+
9
23+
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Regression test for race condition in Set operator (INTERSECT/EXCEPT) with
19+
// IN_FILTER runtime filter type.
20+
//
21+
// Before the fix, when runtime_filter_type=1 (IN) and runtime_filter_max_in_num
22+
// was small, a race between the build sink's close() and the probe's hash table
23+
// refresh could cause RuntimeFilterWrapper::insert() to fail with InternalError.
24+
// The fix snapshots hash_table_size in sink(eos) before set_ready() and uses the
25+
// saved value in close() for RF processing. Also overrides finishdependency() so
26+
// the pipeline task properly waits for sync_filter_size completion.
27+
28+
suite("set_operator_in_filter") {
29+
sql "DROP TABLE IF EXISTS set_rf_t1"
30+
sql "DROP TABLE IF EXISTS set_rf_t2"
31+
sql "DROP TABLE IF EXISTS set_rf_t3"
32+
33+
sql """
34+
CREATE TABLE set_rf_t1 (
35+
k1 INT NOT NULL,
36+
v1 VARCHAR(64) NOT NULL
37+
)
38+
DUPLICATE KEY(k1)
39+
DISTRIBUTED BY HASH(k1) BUCKETS 3
40+
PROPERTIES("replication_num" = "1");
41+
"""
42+
43+
sql """
44+
CREATE TABLE set_rf_t2 (
45+
k1 INT NOT NULL,
46+
v1 VARCHAR(64) NOT NULL
47+
)
48+
DUPLICATE KEY(k1)
49+
DISTRIBUTED BY HASH(k1) BUCKETS 3
50+
PROPERTIES("replication_num" = "1");
51+
"""
52+
53+
sql """
54+
CREATE TABLE set_rf_t3 (
55+
k1 INT NOT NULL,
56+
v1 VARCHAR(64) NOT NULL
57+
)
58+
DUPLICATE KEY(k1)
59+
DISTRIBUTED BY HASH(k1) BUCKETS 3
60+
PROPERTIES("replication_num" = "1");
61+
"""
62+
63+
// Insert >10 unique values into t1 so that with max_in_num=10 the IN filter
64+
// would overflow. This is the key condition that triggers the original bug.
65+
sql """
66+
INSERT INTO set_rf_t1 VALUES
67+
(1,'a'),(2,'b'),(3,'c'),(4,'d'),(5,'e'),
68+
(6,'f'),(7,'g'),(8,'h'),(9,'i'),(10,'j'),
69+
(11,'k'),(12,'l'),(13,'m'),(14,'n'),(15,'o'),
70+
(16,'p'),(17,'q'),(18,'r'),(19,'s'),(20,'t');
71+
"""
72+
73+
// t2 shares some values with t1 (for INTERSECT) and has extras (for EXCEPT)
74+
sql """
75+
INSERT INTO set_rf_t2 VALUES
76+
(1,'a'),(3,'c'),(5,'e'),(7,'g'),(9,'i'),
77+
(11,'k'),(13,'m'),(15,'o'),(17,'q'),(19,'s'),
78+
(21,'u'),(22,'v');
79+
"""
80+
81+
// t3 is the probe-side table for the join that triggers RF generation
82+
sql """
83+
INSERT INTO set_rf_t3 VALUES
84+
(1,'x'),(3,'x'),(5,'x'),(7,'x'),(9,'x'),
85+
(11,'x'),(13,'x'),(15,'x'),(100,'x'),(200,'x');
86+
"""
87+
88+
sql "sync"
89+
90+
// Use pure IN_FILTER type with a small max_in_num to reproduce the conditions
91+
// that triggered the race condition bug.
92+
sql "set runtime_filter_type = 1"
93+
sql "set runtime_filter_max_in_num = 10"
94+
sql "set disable_join_reorder = true"
95+
96+
// INTERSECT with join: the INTERSECT probe shrinks the hash table, which
97+
// previously could race with RF processing in the build sink's close().
98+
order_qt_intersect_in_rf """
99+
SELECT T.k1 FROM (
100+
SELECT k1 FROM set_rf_t1
101+
INTERSECT
102+
SELECT k1 FROM set_rf_t2
103+
) T JOIN set_rf_t3 ON T.k1 = set_rf_t3.k1;
104+
"""
105+
106+
// EXCEPT with join: similar scenario but with EXCEPT semantics.
107+
order_qt_except_in_rf """
108+
SELECT T.k1 FROM (
109+
SELECT k1 FROM set_rf_t1
110+
EXCEPT
111+
SELECT k1 FROM set_rf_t2
112+
) T JOIN set_rf_t3 ON T.k1 = set_rf_t3.k1;
113+
"""
114+
115+
// Run INTERSECT query multiple times to increase coverage of the timing
116+
// window. Before the fix, this would intermittently fail.
117+
for (int i = 0; i < 5; i++) {
118+
sql """
119+
SELECT T.k1 FROM (
120+
SELECT k1 FROM set_rf_t1
121+
INTERSECT
122+
SELECT k1 FROM set_rf_t2
123+
) T JOIN set_rf_t3 ON T.k1 = set_rf_t3.k1
124+
ORDER BY T.k1;
125+
"""
126+
}
127+
128+
// Also test with max_in_num=0 (always overflow immediately)
129+
sql "set runtime_filter_max_in_num = 0"
130+
131+
order_qt_intersect_in_rf_zero """
132+
SELECT T.k1 FROM (
133+
SELECT k1 FROM set_rf_t1
134+
INTERSECT
135+
SELECT k1 FROM set_rf_t2
136+
) T JOIN set_rf_t3 ON T.k1 = set_rf_t3.k1;
137+
"""
138+
}

0 commit comments

Comments
 (0)