Skip to content

Commit dec339d

Browse files
committed
[VQueues] Move scheduler status types to worker-api
This is to allow datafusion storage engine to access these types without direct dependency on restate-vqueues.
1 parent bcb1c86 commit dec339d

11 files changed

Lines changed: 136 additions & 112 deletions

File tree

Cargo.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/vqueues/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ restate-serde-util = { workspace = true }
2323
restate-storage-api = { workspace = true }
2424
restate-types = { workspace = true }
2525
restate-util-string = { workspace = true }
26+
restate-worker-api = { workspace = true }
2627

2728
arrayvec = { workspace = true }
2829
bilrost = { workspace = true, features = ["smallvec"] }

crates/vqueues/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,10 @@ mod util;
1616
// Re-exports
1717
pub use cache::{VQueuesMeta, VQueuesMetaCache};
1818
pub use metric_definitions::describe_metrics;
19-
pub use scheduler::{
20-
ResourceManager, SchedulerService, SchedulingStatus, ThrottleScope, VQueueSchedulerStatus,
19+
pub use restate_worker_api::{
20+
ResourceKind, SchedulingStatus, ThrottleScope, VQueueSchedulerStatus,
2121
};
22+
pub use scheduler::{ResourceManager, SchedulerService};
2223
use smallvec::SmallVec;
2324
use tracing::debug;
2425
pub use util::*;

crates/vqueues/src/scheduler.rs

Lines changed: 9 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,18 @@ use std::pin::Pin;
1515
use std::future::poll_fn;
1616
use std::task::Poll;
1717

18-
use restate_clock::RoughTimestamp;
1918
use restate_storage_api::StorageError;
2019
use restate_storage_api::vqueue_table::scheduler::{RunAction, SchedulerAction, YieldAction};
21-
use restate_storage_api::vqueue_table::{EntryKey, ScanVQueueTable, VQueueStore, stats::WaitStats};
22-
use restate_types::time::MillisSinceEpoch;
20+
use restate_storage_api::vqueue_table::{EntryKey, ScanVQueueTable, VQueueStore};
2321
use restate_types::vqueues::VQueueId;
22+
use restate_worker_api::{SchedulingStatus, VQueueSchedulerStatus};
2423

2524
use crate::VQueueEvent;
2625
use crate::VQueuesMetaCache;
2726
use crate::metric_definitions::publish_scheduler_decision_metrics;
2827

2928
use self::drr::DRRScheduler;
30-
use self::resource_manager::{PermitBuilder, ReservedResources, ResourceKind};
29+
use self::resource_manager::{PermitBuilder, ReservedResources};
3130
use self::vqueue_state::DetailedEligibility;
3231

3332
mod clock;
@@ -46,79 +45,16 @@ type UnconfirmedAssignments = hashbrown::HashMap<EntryKey, PermitBuilder>;
4645

4746
slotmap::new_key_type! { pub(crate) struct VQueueHandle; }
4847

49-
/// A public view of the scheduler's status of a single vqueue.
50-
///
51-
/// This struct provides introspection into the current scheduling state, and
52-
/// wait statistics for a vqueue.
53-
#[derive(Debug, Clone, Default)]
54-
pub struct VQueueSchedulerStatus {
55-
/// Statistics about the wait time experienced by the head item in the vqueue.
56-
pub wait_stats: WaitStats,
57-
/// Number of items remaining in the running stage.
58-
pub remaining_running: u32,
59-
/// Number of items waiting in the inbox stage.
60-
pub waiting_inbox: u64,
61-
/// The current scheduling status of this vqueue.
62-
pub status: SchedulingStatus,
63-
}
64-
65-
/// The current scheduling status of a vqueue.
66-
///
67-
/// This enum represents the various states a vqueue can be in from the
68-
/// scheduler's perspective.
69-
#[derive(Debug, Clone, Default, PartialEq, Eq)]
70-
pub enum SchedulingStatus {
71-
#[default]
72-
/// The vqueue is not tracked by the scheduler (e.g., it has no items).
73-
Dormant,
74-
/// The vqueue is empty.
75-
Empty,
76-
/// The vqueue head is ready to be scheduled and it's in the inbox/running stage.
77-
Ready,
78-
/// The vqueue is throttled until the specified time.
79-
Throttled {
80-
/// When the throttle expires.
81-
until: MillisSinceEpoch,
82-
/// The scope of throttling (global or per-vqueue).
83-
scope: ThrottleScope,
84-
},
85-
/// The vqueue is scheduled to be woken up at the given time because the head
86-
/// item is scheduled to run at that time.
87-
Scheduled {
88-
/// When the head item becomes visible.
89-
at: RoughTimestamp,
90-
},
91-
/// The vqueue is blocked on invoker global capacity.
92-
BlockedOn(ResourceKind),
93-
/// The vqueue is waiting to acquire a lock of a VO.
94-
BlockedOnLock,
95-
/// The vqueue is waiting for concurrency tokens. Concurrency tokens are released
96-
/// when currently running items are completed or (in some cases) when running items
97-
/// are parked.
98-
WaitingConcurrencyTokens,
99-
}
100-
101-
impl From<DetailedEligibility> for SchedulingStatus {
102-
fn from(value: DetailedEligibility) -> Self {
103-
match value {
104-
DetailedEligibility::EligibleRunning | DetailedEligibility::EligibleInbox => {
105-
SchedulingStatus::Ready
106-
}
107-
DetailedEligibility::Scheduled(ts) => SchedulingStatus::Scheduled { at: ts },
108-
DetailedEligibility::Empty => SchedulingStatus::Empty,
48+
fn status_from_detailed_eligibility(value: DetailedEligibility) -> SchedulingStatus {
49+
match value {
50+
DetailedEligibility::EligibleRunning | DetailedEligibility::EligibleInbox => {
51+
SchedulingStatus::Ready
10952
}
53+
DetailedEligibility::Scheduled(ts) => SchedulingStatus::Scheduled { at: ts },
54+
DetailedEligibility::Empty => SchedulingStatus::Empty,
11055
}
11156
}
11257

113-
/// The scope at which throttling is applied.
114-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115-
pub enum ThrottleScope {
116-
/// Throttling is applied globally across all vqueues.
117-
Global,
118-
/// Throttling is applied to a specific vqueue.
119-
VQueue,
120-
}
121-
12258
#[derive(Default, Debug)]
12359
pub struct Decisions {
12460
// Vqueue ids are ordered by partition key, this enables us to group

crates/vqueues/src/scheduler/drr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,9 @@ mod tests {
458458
use restate_types::sharding::KeyRange;
459459
use restate_types::vqueues::VQueueId;
460460
use restate_types::vqueues::{EntryId, EntryKind};
461+
use restate_worker_api::ResourceKind;
461462

462463
use crate::cache::VQueuesMetaCache;
463-
use crate::scheduler::resource_manager::ResourceKind;
464464
use crate::{SchedulingStatus, VQueue, VQueueEvent};
465465

466466
use super::*;

crates/vqueues/src/scheduler/eligible.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ use tracing::trace;
2020
use restate_storage_api::StorageError;
2121
use restate_storage_api::vqueue_table::VQueueStore;
2222
use restate_types::time::MillisSinceEpoch;
23+
use restate_worker_api::{ResourceKind, SchedulingStatus, ThrottleScope};
2324

25+
use super::VQueueHandle;
2426
use super::clock::SchedulerClock;
25-
use super::resource_manager::ResourceKind;
2627
use super::vqueue_state::VQueueState;
27-
use super::{SchedulingStatus, ThrottleScope, VQueueHandle};
2828
use crate::scheduler::vqueue_state::Eligibility;
2929

3030
#[derive(Debug, Copy, Clone)]
@@ -80,7 +80,7 @@ impl EligibilityTracker {
8080
pub fn get_status<S: VQueueStore>(&self, qstate: &VQueueState<S>) -> SchedulingStatus {
8181
match self.states.get(qstate.handle) {
8282
None | Some(State::NeedsPoll) | Some(State::Ready) => {
83-
SchedulingStatus::from(qstate.check_eligibility())
83+
super::status_from_detailed_eligibility(qstate.check_eligibility())
8484
}
8585
Some(State::Throttled { wake_up, scope }) => SchedulingStatus::Throttled {
8686
until: wake_up.ts,

crates/vqueues/src/scheduler/resource_manager.rs

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@ use tokio::sync::mpsc;
2525
use tracing::trace;
2626

2727
use restate_futures_util::concurrency::Concurrency;
28-
use restate_limiter::{Level, LimitKey, RuleHandle};
2928
use restate_memory::{MemoryPool, NonZeroByteCount};
3029
use restate_storage_api::StorageError;
3130
use restate_storage_api::lock_table::LoadLocks;
3231
use restate_storage_api::vqueue_table::{EntryKey, EntryMetadata};
3332
use restate_types::vqueues::EntryKind;
3433
use restate_types::{LockName, Scope};
35-
use restate_util_string::ReString;
34+
use restate_worker_api::ResourceKind;
3635

3736
use self::invoker::InvokerConcurrencyLimiter;
3837
use self::invoker_memory::InvokerMemoryLimiter;
@@ -62,33 +61,6 @@ pub struct ResourceManager {
6261
tx: mpsc::UnboundedSender<ResourceManagerUpdate>,
6362
}
6463

65-
#[derive(Debug, Clone, PartialEq, Eq)]
66-
pub enum ResourceKind {
67-
/// Waiting to acquire a lock of a VO.
68-
Lock {
69-
scope: Option<Scope>,
70-
lock_name: LockName,
71-
},
72-
/// Waiting to acquire invoker concurrency capacity
73-
InvokerConcurrency,
74-
/// Waiting to acquire invoker being throttled
75-
InvokerThrottling,
76-
/// Invoker needs to allocate memory for an invocation
77-
InvokerMemory,
78-
/// Waiting for deployment-level concurrency tokens to be available
79-
DeploymentConcurrency,
80-
/// Waiting for user-defined concurrency to be acquired.
81-
/// Carries routing info so the eligibility tracker can return it for waiter removal.
82-
LimitKeyConcurrency {
83-
scope: Scope,
84-
limit_key: LimitKey<ReString>,
85-
blocked_level: Level,
86-
/// Handle to the blocking rule. Resolve via the rules store for display.
87-
/// May be stale if the rule was removed since blocking.
88-
blocked_rule: Option<RuleHandle>,
89-
},
90-
}
91-
9264
#[allow(dead_code)]
9365
enum ResourceManagerUpdate {
9466
PermitReleased(SmallVec<[UserPermitKind; 1]>),

crates/vqueues/src/scheduler/vqueue_state.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use restate_clock::RoughTimestamp;
1818
use restate_storage_api::StorageError;
1919
use restate_storage_api::vqueue_table::{EntryKey, EntryValue, VQueueStore, stats::WaitStats};
2020
use restate_types::vqueues::VQueueId;
21+
use restate_worker_api::{ResourceKind, ThrottleScope};
2122

2223
use crate::metric_definitions::{
2324
VQUEUE_CONCURRENCY_RULES_WAIT_MS, VQUEUE_DEPLOYMENT_CONCURRENCY_WAIT_MS,
@@ -30,10 +31,8 @@ use super::clock::SchedulerClock;
3031
use super::queue::Queue;
3132
use super::queue_meta::MetaLiteUpdate;
3233
pub use super::queue_meta::VQueueMetaLite;
33-
use super::resource_manager::{AcquireOutcome, PermitBuilder, ResourceKind};
34-
use super::{
35-
ResourceManager, RunAction, ThrottleScope, UnconfirmedAssignments, VQueueHandle, YieldAction,
36-
};
34+
use super::resource_manager::{AcquireOutcome, PermitBuilder};
35+
use super::{ResourceManager, RunAction, UnconfirmedAssignments, VQueueHandle, YieldAction};
3736

3837
const QUANTUM: i32 = 1;
3938

crates/worker-api/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,12 @@ publish = false
1010
[dependencies]
1111
restate-workspace-hack = { workspace = true }
1212

13+
restate-clock = { workspace = true }
1314
restate-core = { workspace = true }
15+
restate-limiter = { workspace = true }
16+
restate-storage-api = { workspace = true }
1417
restate-types = { workspace = true }
18+
restate-util-string = { workspace = true }
1519

1620
thiserror = { workspace = true }
1721
tokio = { workspace = true }

crates/worker-api/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010

1111
mod partition_processor_manager;
1212
mod partition_processor_rpc_client;
13+
mod scheduler_status;
1314

1415
pub use partition_processor_manager::*;
1516
pub use partition_processor_rpc_client::*;
17+
pub use scheduler_status::*;

0 commit comments

Comments
 (0)