Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 39 additions & 4 deletions crates/partition-store/src/vqueue_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,30 @@ pub use input::InputPayloadKey;
pub use metadata::*;

use anyhow::Context;
use bilrost::{Message, OwnedMessage};
use bilrost::{BorrowedMessage, Message, OwnedMessage};
use bytes::BytesMut;
use rocksdb::{DBRawIteratorWithThreadMode, ReadOptions};
use tracing::error;

use restate_rocksdb::Priority;
use restate_storage_api::StorageError;
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates};
use restate_storage_api::vqueue_table::ScanVQueueMetaTable;
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef, VQueueMetaUpdates};
use restate_storage_api::vqueue_table::{
EntryKey, EntryMetadata, EntryStatusHeader, EntryValue, LazyEntryStatus, ReadVQueueTable,
ScanVQueueTable, Stage, Status, WriteVQueueTable, stats::EntryStatistics,
};
use restate_types::identifiers::PartitionKey;
use restate_types::sharding::{KeyRange, PartitionKey};
use restate_types::vqueues::{EntryId, Seq, VQueueId};

use self::entry::{LazyEntryStatusHolder, OwnedEntryStatusHeader, StatusHeaderRawRef};
use crate::keys::{DecodeTableKey, EncodeTableKey, EncodeTableKeyPrefix, KeyKind};
use crate::scan::TableScan;
use crate::vqueue_table::input::InputPayloadKeyRef;
use crate::{PartitionDb, PartitionStoreTransaction, Result, StorageAccess, TableKind};
use crate::{
PartitionDb, PartitionStore, PartitionStoreTransaction, Result, StorageAccess, TableKind,
break_on_err,
};

impl ScanVQueueTable for PartitionDb {
fn scan_active_vqueues(
Expand Down Expand Up @@ -439,6 +445,35 @@ impl ReadVQueueTable for PartitionStoreTransaction<'_> {
}
}

impl ScanVQueueMetaTable for PartitionStore {
fn for_each_vqueue_meta<
F: for<'a> FnMut((&'a VQueueId, &'a VQueueMetaRef<'a>)) -> std::ops::ControlFlow<()>
+ Send
+ Sync
+ 'static,
>(
&self,
range: KeyRange,
mut f: F,
) -> Result<impl Future<Output = Result<()>> + Send> {
self.iterator_for_each(
"df-vqueue-meta",
Priority::Low,
TableScan::FullScanPartitionKeyRange::<MetaKey>(range),
move |(mut key, value)| {
let meta_key = break_on_err(MetaKey::deserialize_from(&mut key))?;
let meta = break_on_err(
VQueueMetaRef::decode_borrowed(value).map_err(StorageError::BilrostDecode),
)?;

let (vqueue_id,) = meta_key.split();
f((&vqueue_id, &meta)).map_break(Ok)
},
)
.map_err(|_| StorageError::OperationalError)
}
}

// ## Safety
// The iterator is guaranteed to be dropped before the database is dropped, we hold to the
// PartitionDb in this struct for as long as the iterator is alive.
Expand Down
1 change: 1 addition & 0 deletions crates/storage-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ restate-workspace-hack = { workspace = true }
restate-clock = { workspace = true }
restate-limiter = { workspace = true, features = ["bilrost"] }
restate-memory = { workspace = true }
restate-sharding = { workspace = true }
restate-types = { workspace = true }
restate-util-string = { workspace = true, features = ["bilrost", "bytes"] }

Expand Down
143 changes: 143 additions & 0 deletions crates/storage-api/src/vqueue_table/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ pub struct VQueueStatistics {
/// Note that this only tracks entries that were not killed/cancelled or failed/paused.
#[bilrost(tag(15))]
pub(crate) avg_end_to_end_duration_ms: u64,
/// Exponential moving average (EMA) of time the head item spent blocked on
/// user-defined concurrency rules before entering `Running`. Sampled on
/// every Inbox → Running transition (every run attempt, including retries).
#[bilrost(tag(16))]
pub(crate) avg_blocked_on_concurrency_rules_ms: u64,
/// Exponential moving average (EMA) of time the head item spent in
/// node-level invoker throttling before entering `Running`. Sampled on
/// every Inbox → Running transition (every run attempt, including retries).
#[bilrost(tag(17))]
pub(crate) avg_blocked_on_invoker_throttling_ms: u64,
}

impl VQueueStatistics {
Expand All @@ -100,6 +110,8 @@ impl VQueueStatistics {
avg_run_duration_ms: 0,
avg_suspension_duration_ms: 0,
avg_end_to_end_duration_ms: 0,
avg_blocked_on_concurrency_rules_ms: 0,
avg_blocked_on_invoker_throttling_ms: 0,
}
}

Expand All @@ -123,6 +135,16 @@ impl VQueueStatistics {
self.avg_end_to_end_duration_ms = Self::ema(self.avg_end_to_end_duration_ms, latency_ms);
}

fn update_avg_blocked_on_concurrency_rules(&mut self, latency_ms: u64) {
self.avg_blocked_on_concurrency_rules_ms =
Self::ema(self.avg_blocked_on_concurrency_rules_ms, latency_ms);
}

fn update_avg_blocked_on_invoker_throttling(&mut self, latency_ms: u64) {
self.avg_blocked_on_invoker_throttling_ms =
Self::ema(self.avg_blocked_on_invoker_throttling_ms, latency_ms);
}

fn ema(previous: u64, sample_ms: u64) -> u64 {
if previous == 0 {
sample_ms
Expand Down Expand Up @@ -166,6 +188,14 @@ impl VQueueStatistics {
self.avg_end_to_end_duration_ms
}

pub const fn avg_blocked_on_concurrency_rules_ms(&self) -> u64 {
self.avg_blocked_on_concurrency_rules_ms
}

pub const fn avg_blocked_on_invoker_throttling_ms(&self) -> u64 {
self.avg_blocked_on_invoker_throttling_ms
}

pub const fn last_enqueued_at(&self) -> Option<UniqueTimestamp> {
self.last_enqueued_at
}
Expand Down Expand Up @@ -450,6 +480,17 @@ impl VQueueMeta {
self.stats.num_running += 1;
self.stats.last_attempt_at = Some(now);

// EMAs for per-run-attempt blocking time. Sampled on
// every Inbox → Running transition (including retries),
// unlike `avg_queue_duration_ms` which only tracks the
// first attempt.
self.stats.update_avg_blocked_on_concurrency_rules(
metrics.blocked_on_concurrency_rules_ms as u64,
);
self.stats.update_avg_blocked_on_invoker_throttling(
metrics.blocked_on_invoker_throttling_ms as u64,
);

if !metrics.has_started {
let first_wait_ms = now_ms.saturating_sub_ms(metrics.first_runnable_at);
self.stats.update_avg_queue_duration(first_wait_ms);
Expand Down Expand Up @@ -499,6 +540,16 @@ pub struct MoveMetrics {
/// Earliest timestamp at which the entry can realistically start.
#[bilrost(tag(3))]
pub first_runnable_at: MillisSinceEpoch,
/// Milliseconds the head item spent blocked on user-defined concurrency
/// rules during this run attempt. Only populated on Inbox → Running moves;
/// zero for every other transition. Feeds `avg_blocked_on_concurrency_rules_ms`.
#[bilrost(tag(4))]
pub blocked_on_concurrency_rules_ms: u32,
/// Milliseconds the head item spent in node-level invoker throttling
/// during this run attempt. Only populated on Inbox → Running moves; zero
/// for every other transition. Feeds `avg_blocked_on_invoker_throttling_ms`.
#[bilrost(tag(5))]
pub blocked_on_invoker_throttling_ms: u32,
}

impl VQueueMetaUpdates {
Expand Down Expand Up @@ -590,10 +641,28 @@ mod tests {
last_transition_at_ms: u64,
first_runnable_at_ms: u64,
has_started: bool,
) -> MoveMetrics {
metrics_with_wait(
last_transition_at_ms,
first_runnable_at_ms,
has_started,
0,
0,
)
}

fn metrics_with_wait(
last_transition_at_ms: u64,
first_runnable_at_ms: u64,
has_started: bool,
blocked_on_concurrency_rules_ms: u32,
blocked_on_invoker_throttling_ms: u32,
) -> MoveMetrics {
MoveMetrics {
last_transition_at: ts(last_transition_at_ms),
has_started,
blocked_on_concurrency_rules_ms,
blocked_on_invoker_throttling_ms,
first_runnable_at: MillisSinceEpoch::new(first_runnable_at_ms),
}
}
Expand Down Expand Up @@ -646,6 +715,78 @@ mod tests {
assert_eq!(meta.stats.last_attempt_at, Some(ts(BASE_TS_MS + 15_000)));
}

#[test]
fn blocking_emas_sample_every_run_attempt() {
// These two EMAs (`avg_blocked_on_concurrency_rules_ms`,
// `avg_blocked_on_invoker_throttling_ms`) are sampled on EVERY Inbox→Running
// transition, not just the first attempt like `avg_queue_duration_ms`.
// This test pins that distinction down.
let created_at = ts(BASE_TS_MS + 1_000);
let mut meta = VQueueMeta::new(created_at, None, LimitKey::None, VQueueLink::None);

// Enqueue.
meta.apply_update(&Update::new(
created_at,
Action::Move {
prev_stage: None,
next_stage: Stage::Inbox,
metrics: metrics(BASE_TS_MS + 1_000, BASE_TS_MS + 1_000, false),
},
));

// First Inbox→Running: 1_000 ms on concurrency rules, 500 ms on global
// invoker throttling. First sample — EMA equals the sample.
meta.apply_update(&Update::new(
ts(BASE_TS_MS + 2_000),
Action::Move {
prev_stage: Some(Stage::Inbox),
next_stage: Stage::Running,
metrics: metrics_with_wait(
BASE_TS_MS + 1_000,
BASE_TS_MS + 1_000,
false,
1_000,
500,
),
},
));
assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_000);
assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 500);
assert_eq!(meta.stats.avg_queue_duration_ms, 1_000);

// Yield back Running→Inbox. Neither the new EMAs nor the old
// `avg_queue_duration_ms` should move — this is not a Running arm.
meta.apply_update(&Update::new(
ts(BASE_TS_MS + 3_000),
Action::Move {
prev_stage: Some(Stage::Running),
next_stage: Stage::Inbox,
metrics: metrics(BASE_TS_MS + 2_000, BASE_TS_MS + 1_000, true),
},
));
assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_000);
assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 500);

// Second Inbox→Running (a retry, `has_started = true`): 2_000 ms /
// 0 ms. The new EMAs MUST continue sampling even though has_started.
// With α = 0.05: 1000*0.95 + 2000*0.05 = 1050, 500*0.95 + 0*0.05 = 475.
meta.apply_update(&Update::new(
ts(BASE_TS_MS + 4_000),
Action::Move {
prev_stage: Some(Stage::Inbox),
next_stage: Stage::Running,
metrics: metrics_with_wait(BASE_TS_MS + 3_000, BASE_TS_MS + 1_000, true, 2_000, 0),
},
));
assert_eq!(meta.stats.avg_blocked_on_concurrency_rules_ms, 1_050);
assert_eq!(meta.stats.avg_blocked_on_invoker_throttling_ms, 475);

// `avg_queue_duration_ms` must NOT have moved on the retry — the
// first-attempt gate (`has_started = false`) is still the existing
// behavior for that EMA.
assert_eq!(meta.stats.avg_queue_duration_ms, 1_000);
}

#[test]
fn stage_emas_update_on_transitions() {
// Inbox→Running→Suspended→Inbox→Finished exercises every tracked
Expand Down Expand Up @@ -784,6 +925,8 @@ mod tests {
avg_run_duration_ms: 12,
avg_suspension_duration_ms: 13,
avg_end_to_end_duration_ms: 14,
avg_blocked_on_concurrency_rules_ms: 15,
avg_blocked_on_invoker_throttling_ms: 16,
},
scope: Some(Scope::new("scope-a")),
limit_key: "tenant-1/user-1".parse::<LimitKey<ReString>>().unwrap(),
Expand Down
26 changes: 15 additions & 11 deletions crates/storage-api/src/vqueue_table/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ use restate_clock::{RoughTimestamp, UniqueTimestamp};
pub struct WaitStats {
/// Total milliseconds the item spent waiting on global invoker capacity
#[bilrost(tag(1))]
pub blocked_on_global_capacity_ms: u32,
/// Total milliseconds the item was throttled on vqueue's "start" token bucket
pub blocked_on_invoker_concurrency_ms: u32,
/// Total milliseconds the item was blocked on user-defined per-vqueue
/// throttling rules.
#[bilrost(tag(2))]
pub vqueue_start_throttling_ms: u32,
/// Total milliseconds the item was throttled on global "run" token bucket
pub blocked_on_throttling_rules_ms: u32,
/// Total milliseconds the item was blocked on node-level invoker throttling.
#[bilrost(tag(3))]
pub global_invoker_throttling_ms: u32,
pub blocked_on_invoker_throttling_ms: u32,
/// Total milliseconds the item spent waiting on invoker memory pool
#[bilrost(tag(4))]
pub blocked_on_invoker_memory_ms: u32,
/// Total milliseconds the item spent waiting on user-defined concurrency limits
#[bilrost(tag(5))]
pub blocked_on_concurrency_rules_ms: u32,
/// Total milliseconds the item spent waiting to acquire a virtual object lock
#[bilrost(tag(6))]
pub blocked_on_lock_ms: u32,
/// Total milliseconds the item spent blocked on deployment concurrency capacity
#[bilrost(tag(8))]
pub blocked_on_deployment_concurrency_ms: u32,
}

#[derive(Debug, Clone, bilrost::Message)]
Expand Down Expand Up @@ -64,12 +74,6 @@ pub struct EntryStatistics {
/// inflating the first-attempt wait time.
#[bilrost(tag(9))]
pub first_runnable_at: MillisSinceEpoch,
// todo:
// pub time_spent_running: u32,
// pub time_spent_parked: u32,
// pub time_spent_ready_in_inbox: u32,
// pub time_spent_waiting_for_retry: u32,
// pub last_updated_at: MillisSinceEpoch,
}

impl EntryStatistics {
Expand Down
Loading
Loading