Skip to content

Commit e864d2d

Browse files
committed
[DataFusion] Unify partition leader status and leader query protocol
1 parent 6da3278 commit e864d2d

19 files changed

Lines changed: 787 additions & 240 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sharding/src/key_range.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ use crate::PartitionKey;
2626
#[repr(transparent)]
2727
pub struct KeyRange(std::range::RangeInclusive<PartitionKey>);
2828

29+
impl PartialOrd for KeyRange {
30+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
31+
Some(self.cmp(other))
32+
}
33+
}
34+
35+
impl Ord for KeyRange {
36+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
37+
(self.start(), self.end()).cmp(&(other.start(), other.end()))
38+
}
39+
}
40+
2941
impl NetSerde for KeyRange {}
3042

3143
impl KeyRange {

crates/storage-query-datafusion/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ restate-service-protocol-v4 = { workspace = true, features = ["entry-codec"] }
2323
restate-sharding = { workspace = true }
2424
restate-storage-api = { workspace = true }
2525
restate-types = { workspace = true }
26+
restate-worker-api = { workspace = true }
2627

2728
ahash = { workspace = true } # Required to due a yanked version used by datafusion
2829
anyhow = { workspace = true }

crates/storage-query-datafusion/src/context.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use codederror::CodedError;
3030
use restate_core::{Metadata, TaskCenter};
3131
use restate_invoker_api::StatusHandle;
3232
use restate_partition_store::PartitionStoreManager;
33+
use restate_sharding::KeyRange;
3334
use restate_types::cluster::cluster_state::LegacyClusterState;
3435
use restate_types::config::QueryEngineOptions;
3536
use restate_types::errors::GenericError;
@@ -39,6 +40,7 @@ use restate_types::partition_table::Partition;
3940
use restate_types::partitions::state::PartitionReplicaSetStates;
4041
use restate_types::schema::deployment::DeploymentResolver;
4142
use restate_types::schema::service::ServiceMetadataResolver;
43+
use restate_worker_api::SchedulerStatusEntry;
4244

4345
use crate::node_fan_out::NodeWarnings;
4446
use crate::remote_query_scanner_manager::RemoteScannerManager;
@@ -130,6 +132,28 @@ pub trait RegisterTable: Send + Sync + 'static {
130132
fn register(&self, ctx: &QueryContext) -> impl Future<Output = Result<(), BuildError>>;
131133
}
132134

135+
/// A leader-state introspection handle that extends invoker status queries with
136+
/// additional query methods for future leader-owned components.
137+
pub trait PartitionLeaderStatusHandle:
138+
StatusHandle + Send + Sync + Debug + Clone + 'static
139+
{
140+
type SchedulerStatus;
141+
type SchedulerStatusIterator: Iterator<Item = Self::SchedulerStatus> + Send;
142+
143+
type UserLimitCounter;
144+
type UserLimitCounterIterator: Iterator<Item = Self::UserLimitCounter> + Send;
145+
146+
fn read_scheduler_status(
147+
&self,
148+
keys: KeyRange,
149+
) -> impl Future<Output = Self::SchedulerStatusIterator> + Send;
150+
151+
fn read_user_limit_counters(
152+
&self,
153+
keys: KeyRange,
154+
) -> impl Future<Output = Self::UserLimitCounterIterator> + Send;
155+
}
156+
133157
/// A no-op registerer that creates a minimal query context with no tables.
134158
/// Useful for nodes that only need to serve remote scanner RPCs (e.g.,
135159
/// log-server-only nodes), where only `task_ctx()` is needed.
@@ -145,7 +169,7 @@ impl RegisterTable for NoTables {
145169
pub struct UserTables<P, S, D> {
146170
partition_selector: P,
147171
partition_store_manager: Arc<PartitionStoreManager>,
148-
status: Option<S>,
172+
partition_leader_status: Option<S>,
149173
schemas: Live<D>,
150174
remote_scanner_manager: RemoteScannerManager,
151175
}
@@ -154,14 +178,14 @@ impl<P, S, D> UserTables<P, S, D> {
154178
pub fn new(
155179
partition_selector: P,
156180
partition_store_manager: Arc<PartitionStoreManager>,
157-
status: Option<S>,
181+
partition_leader_status: Option<S>,
158182
schemas: Live<D>,
159183
remote_scanner_manager: RemoteScannerManager,
160184
) -> Self {
161185
Self {
162186
partition_selector,
163187
partition_store_manager,
164-
status,
188+
partition_leader_status,
165189
schemas,
166190
remote_scanner_manager,
167191
}
@@ -171,7 +195,7 @@ impl<P, S, D> UserTables<P, S, D> {
171195
impl<P, S, D> RegisterTable for UserTables<P, S, D>
172196
where
173197
P: SelectPartitions + Clone,
174-
S: StatusHandle + Send + Sync + Debug + Clone + 'static,
198+
S: PartitionLeaderStatusHandle<SchedulerStatus = SchedulerStatusEntry>,
175199
D: DeploymentResolver + ServiceMetadataResolver + Send + Sync + Debug + Clone + 'static,
176200
{
177201
async fn register(&self, ctx: &QueryContext) -> Result<(), BuildError> {
@@ -182,7 +206,7 @@ where
182206
crate::invocation_state::register_self(
183207
ctx,
184208
self.partition_selector.clone(),
185-
self.status.clone(),
209+
self.partition_leader_status.clone(),
186210
self.partition_store_manager.clone(),
187211
&self.remote_scanner_manager,
188212
)?;
@@ -350,7 +374,9 @@ impl QueryContext {
350374
options: &QueryEngineOptions,
351375
partition_selector: impl SelectPartitions + Clone,
352376
partition_store_manager: Arc<PartitionStoreManager>,
353-
status: Option<impl StatusHandle + Send + Sync + Debug + Clone + 'static>,
377+
partition_leader_status: Option<
378+
impl PartitionLeaderStatusHandle<SchedulerStatus = SchedulerStatusEntry>,
379+
>,
354380
schemas: Live<
355381
impl DeploymentResolver + ServiceMetadataResolver + Send + Sync + Debug + Clone + 'static,
356382
>,
@@ -359,7 +385,7 @@ impl QueryContext {
359385
let tables = UserTables::new(
360386
partition_selector,
361387
partition_store_manager,
362-
status,
388+
partition_leader_status,
363389
schemas,
364390
remote_scanner_manager,
365391
);

crates/storage-query-datafusion/src/empty_invoker_status_handle.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::future::Future;
1112
use std::{future, iter};
1213

1314
use restate_invoker_api::{InvocationStatusReport, StatusHandle};
1415
use restate_types::sharding::KeyRange;
16+
use restate_worker_api::SchedulerStatusEntry;
17+
18+
use crate::context::PartitionLeaderStatusHandle;
1519

1620
#[derive(Clone, Debug)]
1721
pub struct EmptyInvokerStatusHandle;
@@ -23,3 +27,25 @@ impl StatusHandle for EmptyInvokerStatusHandle {
2327
future::ready(iter::empty())
2428
}
2529
}
30+
31+
impl PartitionLeaderStatusHandle for EmptyInvokerStatusHandle {
32+
type SchedulerStatus = SchedulerStatusEntry;
33+
type SchedulerStatusIterator = std::iter::Empty<Self::SchedulerStatus>;
34+
35+
type UserLimitCounter = ();
36+
type UserLimitCounterIterator = std::iter::Empty<Self::UserLimitCounter>;
37+
38+
fn read_scheduler_status(
39+
&self,
40+
_keys: KeyRange,
41+
) -> impl Future<Output = Self::SchedulerStatusIterator> + Send {
42+
future::ready(iter::empty())
43+
}
44+
45+
fn read_user_limit_counters(
46+
&self,
47+
_keys: KeyRange,
48+
) -> impl Future<Output = Self::UserLimitCounterIterator> + Send {
49+
future::ready(iter::empty())
50+
}
51+
}

crates/storage-query-datafusion/src/mocks.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// by the Apache License, Version 2.0.
1010

1111
use std::fmt::Debug;
12+
use std::future::Future;
1213
use std::marker::PhantomData;
1314
use std::sync::Arc;
1415

@@ -20,7 +21,6 @@ use datafusion::common::DataFusionError;
2021
use googletest::matcher::{Matcher, MatcherResult};
2122
use serde_json::Value;
2223

23-
use restate_invoker_api::StatusHandle;
2424
use restate_invoker_api::status_handle::test_util::MockStatusHandle;
2525
use restate_partition_store::{PartitionStore, PartitionStoreManager};
2626
use restate_rocksdb::RocksDbManager;
@@ -38,9 +38,10 @@ use restate_types::schema::deployment::{Deployment, DeploymentResolver};
3838
use restate_types::schema::service::test_util::MockServiceMetadataResolver;
3939
use restate_types::schema::service::{ServiceMetadata, ServiceMetadataResolver};
4040
use restate_types::sharding::KeyRange;
41+
use restate_worker_api::SchedulerStatusEntry;
4142

4243
use super::context::QueryContext;
43-
use crate::context::SelectPartitions;
44+
use crate::context::{PartitionLeaderStatusHandle, SelectPartitions};
4445
use crate::remote_query_scanner_client::{RemoteScanner, RemoteScannerService};
4546
use crate::remote_query_scanner_manager::{
4647
PartitionLocation, PartitionLocator, RemoteScannerManager,
@@ -74,6 +75,28 @@ impl ServiceMetadataResolver for MockSchemas {
7475
}
7576
}
7677

78+
impl PartitionLeaderStatusHandle for MockStatusHandle {
79+
type SchedulerStatus = SchedulerStatusEntry;
80+
type SchedulerStatusIterator = std::iter::Empty<Self::SchedulerStatus>;
81+
82+
type UserLimitCounter = ();
83+
type UserLimitCounterIterator = std::iter::Empty<Self::UserLimitCounter>;
84+
85+
fn read_scheduler_status(
86+
&self,
87+
_keys: KeyRange,
88+
) -> impl Future<Output = Self::SchedulerStatusIterator> + Send {
89+
std::future::ready(std::iter::empty())
90+
}
91+
92+
fn read_user_limit_counters(
93+
&self,
94+
_keys: KeyRange,
95+
) -> impl Future<Output = Self::UserLimitCounterIterator> + Send {
96+
std::future::ready(std::iter::empty())
97+
}
98+
}
99+
77100
impl DeploymentResolver for MockSchemas {
78101
fn resolve_latest_deployment_for_service(
79102
&self,
@@ -149,7 +172,7 @@ impl PartitionLocator for AlwaysLocalPartitionLocator {
149172

150173
impl MockQueryEngine {
151174
pub async fn create_with(
152-
status: impl StatusHandle + Send + Sync + Debug + Clone + 'static,
175+
status: impl PartitionLeaderStatusHandle<SchedulerStatus = SchedulerStatusEntry>,
153176
schemas: impl DeploymentResolver
154177
+ ServiceMetadataResolver
155178
+ Send

crates/worker-api/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ restate-workspace-hack = { workspace = true }
1212

1313
restate-clock = { workspace = true }
1414
restate-core = { workspace = true }
15+
restate-futures-util = { workspace = true }
16+
restate-invoker-api = { workspace = true }
1517
restate-limiter = { workspace = true }
1618
restate-storage-api = { workspace = true }
1719
restate-types = { workspace = true }
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
#![allow(dead_code)]
12+
13+
use tokio::sync::mpsc;
14+
15+
use restate_futures_util::command::{Command, UnboundedCommandReceiver, UnboundedCommandSender};
16+
use restate_types::sharding::KeyRange;
17+
use restate_types::vqueues::VQueueId;
18+
19+
use crate::VQueueSchedulerStatus;
20+
21+
/// Queries that route through the partition processor's main `select!` loop.
22+
///
23+
/// Invoker status queries do NOT travel through this channel: DataFusion calls
24+
/// the invoker's `ChannelStatusReader` directly via the partition-leader handles
25+
/// registry. See `PartitionLeaderHandlesRegistry` in the worker crate.
26+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27+
pub enum LeaderQueryKind {
28+
SchedulerStatus,
29+
UserLimitCounters,
30+
}
31+
32+
#[derive(Debug, Clone)]
33+
pub enum LeaderQueryRequest {
34+
SchedulerStatus { keys: KeyRange },
35+
UserLimitCounters { keys: KeyRange },
36+
}
37+
38+
impl LeaderQueryRequest {
39+
pub fn kind(&self) -> LeaderQueryKind {
40+
match self {
41+
Self::SchedulerStatus { .. } => LeaderQueryKind::SchedulerStatus,
42+
Self::UserLimitCounters { .. } => LeaderQueryKind::UserLimitCounters,
43+
}
44+
}
45+
}
46+
47+
pub type SchedulerStatusEntry = (VQueueId, VQueueSchedulerStatus);
48+
49+
#[derive(Debug, Clone)]
50+
pub enum LeaderQueryResponse {
51+
SchedulerStatus(Vec<SchedulerStatusEntry>),
52+
NotLeader(LeaderQueryKind),
53+
}
54+
55+
pub type LeaderQueryCommand = Command<LeaderQueryRequest, LeaderQueryResponse>;
56+
pub type LeaderQuerySender = UnboundedCommandSender<LeaderQueryRequest, LeaderQueryResponse>;
57+
pub type LeaderQueryReceiver = UnboundedCommandReceiver<LeaderQueryRequest, LeaderQueryResponse>;
58+
59+
pub fn channel() -> (LeaderQuerySender, LeaderQueryReceiver) {
60+
mpsc::unbounded_channel()
61+
}

crates/worker-api/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
mod leader_query;
1112
mod partition_processor_manager;
1213
mod partition_processor_rpc_client;
1314
mod scheduler_status;
1415

16+
pub use leader_query::*;
1517
pub use partition_processor_manager::*;
1618
pub use partition_processor_rpc_client::*;
1719
pub use scheduler_status::*;

crates/worker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ restate-memory = { workspace = true }
3737
restate-metadata-server = { workspace = true }
3838
restate-metadata-store = { workspace = true }
3939
restate-partition-store = { workspace = true }
40+
restate-platform = { workspace = true }
4041
restate-rocksdb = { workspace = true }
4142
restate-service-protocol = { workspace = true, features = ["codec", "message"] }
4243
restate-service-protocol-v4 = { workspace = true, features = ["entry-codec"] }

0 commit comments

Comments
 (0)