Skip to content

Commit 2b4e42b

Browse files
committed
[VQueues] Provisions to add user limits to datafusion
1 parent 7f40280 commit 2b4e42b

4 files changed

Lines changed: 148 additions & 0 deletions

File tree

crates/storage-query-datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod statistics;
3939
pub mod table_docs;
4040
mod table_macro;
4141
mod table_providers;
42+
mod user_limits;
4243
mod vqueue_stats;
4344
mod vqueues;
4445
pub use table_providers::Scan;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
#[allow(dead_code)]
12+
mod row;
13+
#[allow(dead_code)]
14+
pub(crate) mod schema;
15+
16+
// TODO: table.rs with register_self() — requires plumbing a handle from the
17+
// scheduler's UserLimiter to the DataFusion context, similar to the StatusHandle
18+
// pattern used by invocation_state/table.rs.
19+
//
20+
// The scan should:
21+
// 1. Call UserLimiter::scan_counters() to get CounterView entries
22+
// 2. For each entry, resolve the rule handle via UserLimiter::resolve_rule()
23+
// 3. Build UserLimitRow and append to the table builder
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use restate_types::identifiers::PartitionKey;
12+
13+
use super::schema::SysUserLimitsBuilder;
14+
15+
/// A single row of enriched counter data ready to be appended to the table.
16+
///
17+
/// This is produced by the scanner from a `CounterView` + rule resolution.
18+
#[allow(dead_code)]
19+
pub(crate) struct UserLimitRow {
20+
pub partition_key: PartitionKey,
21+
pub scope: String,
22+
pub l1: Option<String>,
23+
pub l2: Option<String>,
24+
pub level: &'static str,
25+
pub usage: u32,
26+
pub concurrency_limit: Option<u64>,
27+
pub rule_pattern: Option<String>,
28+
pub num_waiters: u64,
29+
}
30+
31+
#[inline]
32+
#[allow(dead_code)]
33+
pub(crate) fn append_user_limit_row(builder: &mut SysUserLimitsBuilder, row_data: &UserLimitRow) {
34+
let mut row = builder.row();
35+
36+
row.partition_key(row_data.partition_key);
37+
38+
if row.is_scope_defined() {
39+
row.scope(&row_data.scope);
40+
}
41+
if row.is_l1_defined()
42+
&& let Some(l1) = &row_data.l1
43+
{
44+
row.l1(l1);
45+
}
46+
if row.is_l2_defined()
47+
&& let Some(l2) = &row_data.l2
48+
{
49+
row.l2(l2);
50+
}
51+
if row.is_level_defined() {
52+
row.level(row_data.level);
53+
}
54+
if row.is_usage_defined() {
55+
row.usage(row_data.usage);
56+
}
57+
if row.is_concurrency_limit_defined()
58+
&& let Some(limit) = row_data.concurrency_limit
59+
{
60+
row.concurrency_limit(limit);
61+
}
62+
if row.is_rule_pattern_defined()
63+
&& let Some(pattern) = &row_data.rule_pattern
64+
{
65+
row.rule_pattern(pattern);
66+
}
67+
if row.is_available_defined()
68+
&& let Some(limit) = row_data.concurrency_limit
69+
{
70+
row.available(limit.saturating_sub(row_data.usage as u64));
71+
}
72+
if row.is_num_waiters_defined() {
73+
row.num_waiters(row_data.num_waiters);
74+
}
75+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use crate::table_macro::*;
12+
13+
use datafusion::arrow::datatypes::DataType;
14+
15+
define_sort_order!(sys_user_limits(partition_key, scope, l1, l2));
16+
17+
define_table!(sys_user_limits(
18+
/// Internal column that is used for partitioning. Can be ignored.
19+
partition_key: DataType::UInt64,
20+
21+
/// The scope this counter belongs to.
22+
scope: DataType::Utf8,
23+
24+
/// The level-1 key component (null for scope-level counters).
25+
l1: DataType::Utf8,
26+
27+
/// The level-2 key component (null for scope-level and L1-level counters).
28+
l2: DataType::Utf8,
29+
30+
/// The hierarchy level: "Scope", "Level1", or "Level2".
31+
level: DataType::Utf8,
32+
33+
/// Current concurrency usage at this counter.
34+
usage: DataType::UInt32,
35+
36+
/// The configured concurrency limit (null if unlimited).
37+
concurrency_limit: DataType::UInt64,
38+
39+
/// The rule pattern that defines the limit (null if unlimited).
40+
/// Resolved from the rule handle; shows "[removed]" if the rule was
41+
/// deleted since the counter was created.
42+
rule_pattern: DataType::Utf8,
43+
44+
/// Available capacity (limit - usage). Null if unlimited.
45+
available: DataType::UInt64,
46+
47+
/// Number of vqueues currently waiting behind this counter.
48+
num_waiters: DataType::UInt64,
49+
));

0 commit comments

Comments
 (0)