Skip to content

Commit 5776af0

Browse files
committed
[VQueues] Introduce sys_vqueue_stats datafusion table
1 parent 2492b5d commit 5776af0

8 files changed

Lines changed: 382 additions & 5 deletions

File tree

crates/partition-store/src/vqueue_table/mod.rs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,26 @@ mod running_reader;
1818
mod waiting_reader;
1919

2020
use std::io::Cursor;
21+
use std::ops::RangeInclusive;
2122

2223
pub use entry::{EntryStateKey, StateHeaderRaw};
2324
pub use inbox::{ActiveKey, InboxKey};
2425
pub use items::ItemsKey;
2526
pub use metadata::*;
2627

2728
use anyhow::Context;
28-
use bilrost::{Message, OwnedMessage};
29+
use bilrost::{BorrowedMessage, Message, OwnedMessage};
2930
use bytes::BytesMut;
3031
use rocksdb::{DBRawIteratorWithThreadMode, ReadOptions};
3132
use tracing::error;
3233

34+
use restate_rocksdb::Priority;
3335
use restate_storage_api::StorageError;
34-
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates};
36+
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef, VQueueMetaUpdates};
3537
use restate_storage_api::vqueue_table::{
3638
EntryKey, EntryMetadata, EntryState, EntryStateHeader, EntryStatistics, EntryValue,
37-
IdentifiesEntry, LazyEntryState, ReadVQueueTable, ScanVQueueTable, Stage, WriteVQueueTable,
39+
IdentifiesEntry, LazyEntryState, ReadVQueueTable, ScanVQueueMetaTable, ScanVQueueTable, Stage,
40+
WriteVQueueTable,
3841
};
3942
use restate_types::clock::UniqueTimestamp;
4043
use restate_types::identifiers::PartitionKey;
@@ -43,7 +46,11 @@ use restate_types::vqueues::{EntryId, VQueueId};
4346
use self::entry::{LazyEntryStateHolder, OwnedEntryStateHeader, StateHeaderRawRef};
4447
use self::key_codec::HasLock;
4548
use crate::keys::{DecodeTableKey, EncodeTableKey, EncodeTableKeyPrefix, KeyKind};
46-
use crate::{PartitionDb, PartitionStoreTransaction, Result, StorageAccess, TableKind};
49+
use crate::scan::TableScan;
50+
use crate::{
51+
PartitionDb, PartitionStore, PartitionStoreTransaction, Result, StorageAccess, TableKind,
52+
break_on_err,
53+
};
4754

4855
impl ScanVQueueTable for PartitionDb {
4956
fn scan_active_vqueues(
@@ -468,6 +475,35 @@ impl ReadVQueueTable for PartitionStoreTransaction<'_> {
468475
}
469476
}
470477

478+
impl ScanVQueueMetaTable for PartitionStore {
479+
fn for_each_vqueue_meta<
480+
F: for<'a> FnMut((&'a VQueueId, &'a VQueueMetaRef<'a>)) -> std::ops::ControlFlow<()>
481+
+ Send
482+
+ Sync
483+
+ 'static,
484+
>(
485+
&self,
486+
range: RangeInclusive<PartitionKey>,
487+
mut f: F,
488+
) -> Result<impl Future<Output = Result<()>> + Send> {
489+
self.iterator_for_each(
490+
"df-vqueue-meta",
491+
Priority::Low,
492+
TableScan::FullScanPartitionKeyRange::<MetaKey>(range),
493+
move |(mut key, value)| {
494+
let meta_key = break_on_err(MetaKey::deserialize_from(&mut key))?;
495+
let meta = break_on_err(
496+
VQueueMetaRef::decode_borrowed(value).map_err(StorageError::BilrostDecode),
497+
)?;
498+
499+
let (vqueue_id,) = meta_key.split();
500+
f((&vqueue_id, &meta)).map_break(Ok)
501+
},
502+
)
503+
.map_err(|_| StorageError::OperationalError)
504+
}
505+
}
506+
471507
// ## Safety
472508
// The iterator is guaranteed to be dropped before the database is dropped, we hold to the
473509
// PartitionDb in this struct for as long as the iterator is alive.

crates/storage-api/src/vqueue_table/tables.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::ops::RangeInclusive;
12+
1113
use restate_types::clock::UniqueTimestamp;
1214
use restate_types::identifiers::PartitionKey;
1315
use restate_types::vqueues::VQueueId;
1416

15-
use super::metadata::VQueueMeta;
17+
use super::metadata::{VQueueMeta, VQueueMetaRef};
1618
use super::{
1719
EntryId, EntryKey, EntryMetadata, EntryState, EntryStateHeader, EntryStatistics, EntryValue,
1820
IdentifiesEntry, LazyEntryState,
@@ -189,3 +191,17 @@ pub trait ScanVQueueTable {
189191
on_item: impl FnMut(VQueueId, super::metadata::VQueueMeta),
190192
) -> Result<()>;
191193
}
194+
195+
pub trait ScanVQueueMetaTable {
196+
/// Used for data-fusion queries
197+
fn for_each_vqueue_meta<
198+
F: for<'a> FnMut((&'a VQueueId, &'a VQueueMetaRef<'a>)) -> std::ops::ControlFlow<()>
199+
+ Send
200+
+ Sync
201+
+ 'static,
202+
>(
203+
&self,
204+
range: RangeInclusive<PartitionKey>,
205+
f: F,
206+
) -> Result<impl Future<Output = Result<()>> + Send>;
207+
}

crates/storage-query-datafusion/src/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,13 @@ where
238238
self.partition_store_manager.clone(),
239239
&self.remote_scanner_manager,
240240
)?;
241+
// VQueues Tables
242+
crate::vqueue_stats::register_self(
243+
ctx,
244+
self.partition_selector.clone(),
245+
self.partition_store_manager.clone(),
246+
&self.remote_scanner_manager,
247+
)?;
241248

242249
ctx.datafusion_context.sql(SYS_INVOCATION_VIEW).await?;
243250

crates/storage-query-datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod statistics;
3939
pub mod table_docs;
4040
mod table_macro;
4141
mod table_providers;
42+
mod vqueue_stats;
4243
pub use table_providers::Scan;
4344
pub mod table_util;
4445

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
mod row;
12+
pub(crate) mod schema;
13+
mod table;
14+
15+
pub(crate) use table::register_self;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use restate_storage_api::vqueue_table::metadata::VQueueMetaRef;
12+
use restate_types::vqueues::VQueueId;
13+
14+
use super::schema::SysVqueueStatsBuilder;
15+
16+
#[inline]
17+
pub(crate) fn append_vqueues_meta_row<'a>(
18+
builder: &mut SysVqueueStatsBuilder,
19+
qid: &'a VQueueId,
20+
meta: &'a VQueueMetaRef<'a>,
21+
) {
22+
let mut row = builder.row();
23+
24+
row.partition_key(qid.partition_key());
25+
if row.is_id_defined() {
26+
row.fmt_id(qid);
27+
}
28+
if row.is_scope_defined()
29+
&& let Some(scope) = meta.scope
30+
{
31+
row.scope(scope);
32+
}
33+
34+
if row.is_service_name_defined()
35+
&& let Some(service_name) = meta.service_name()
36+
{
37+
row.service_name(service_name);
38+
}
39+
40+
if row.is_is_active_defined() {
41+
row.is_active(meta.is_active());
42+
}
43+
if row.is_queue_is_paused_defined() {
44+
row.queue_is_paused(meta.queue_is_paused);
45+
}
46+
47+
if row.is_limit_key_defined() {
48+
row.fmt_limit_key(&meta.limit_key);
49+
}
50+
51+
if row.is_lock_name_defined()
52+
&& let Some(lock_name) = meta.lock_name()
53+
{
54+
row.fmt_lock_name(lock_name);
55+
}
56+
57+
if row.is_created_at_defined() {
58+
row.created_at(meta.stats.created_at().as_u64() as i64);
59+
}
60+
61+
if row.is_last_enqueued_at_defined()
62+
&& let Some(last_enqueued_at) = meta.stats.last_enqueued_at()
63+
{
64+
row.last_enqueued_at(last_enqueued_at.as_u64() as i64);
65+
}
66+
67+
if row.is_last_start_at_defined()
68+
&& let Some(last_start_at) = meta.stats.last_start_at()
69+
{
70+
row.last_start_at(last_start_at.as_u64() as i64);
71+
}
72+
73+
if row.is_last_attempt_at_defined()
74+
&& let Some(last_attempt_at) = meta.stats.last_attempt_at()
75+
{
76+
row.last_attempt_at(last_attempt_at.as_u64() as i64);
77+
}
78+
79+
if row.is_last_finish_at_defined()
80+
&& let Some(last_finish_at) = meta.stats.last_finish_at()
81+
{
82+
row.last_finish_at(last_finish_at.as_u64() as i64);
83+
}
84+
85+
if row.is_avg_queue_duration_defined() {
86+
row.avg_queue_duration(meta.stats.avg_queue_duration_ms() as i64);
87+
}
88+
89+
if row.is_avg_inbox_duration_defined() {
90+
row.avg_inbox_duration(meta.stats.avg_inbox_duration_ms() as i64);
91+
}
92+
93+
if row.is_avg_run_duration_defined() {
94+
row.avg_run_duration(meta.stats.avg_run_duration_ms() as i64);
95+
}
96+
97+
if row.is_avg_suspension_duration_defined() {
98+
row.avg_suspension_duration(meta.stats.avg_suspension_duration_ms() as i64);
99+
}
100+
101+
if row.is_avg_end_to_end_duration_defined() {
102+
row.avg_end_to_end_duration(meta.stats.avg_end_to_end_duration_ms() as i64);
103+
}
104+
105+
if row.is_num_inbox_defined() {
106+
row.num_inbox(meta.stats.num_inbox());
107+
}
108+
109+
if row.is_num_running_defined() {
110+
row.num_running(meta.stats.num_running());
111+
}
112+
113+
if row.is_num_suspended_defined() {
114+
row.num_suspended(meta.stats.num_suspended());
115+
}
116+
117+
if row.is_num_paused_defined() {
118+
row.num_paused(meta.stats.num_paused());
119+
}
120+
121+
if row.is_num_finished_defined() {
122+
row.num_finished(meta.stats.num_finished());
123+
}
124+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use crate::table_macro::*;
12+
13+
use datafusion::arrow::datatypes::DataType;
14+
15+
define_sort_order!(sys_vqueue_stats(partition_key, scope, lock_name));
16+
17+
define_table!(sys_vqueue_stats(
18+
/// Internal column that is used for partitioning. Can be ignored.
19+
partition_key: DataType::UInt64,
20+
21+
/// The VQueue Identifier (vq_...)
22+
id: DataType::LargeUtf8,
23+
24+
/// Whether this vqueue is active or not. An active vqueue
25+
/// is a vqueue that is not paused, and has non-finished
26+
/// items.
27+
is_active: DataType::Boolean,
28+
29+
/// Whether this vqueue is paused.
30+
queue_is_paused: DataType::Boolean,
31+
32+
/// Service name linked to this vqueue
33+
service_name: DataType::LargeUtf8,
34+
35+
/// The scope of this vqueue.
36+
scope: DataType::LargeUtf8,
37+
38+
/// The name of the limit-key assigned to this vqueue
39+
limit_key: DataType::LargeUtf8,
40+
41+
/// The name of the lock (in the format of `service/key`)
42+
///
43+
/// This is only set if this is a vqueue for a virtual object.
44+
lock_name: DataType::LargeUtf8,
45+
46+
/// When was this vqueue first created
47+
created_at: TimestampMillisecond,
48+
49+
/// Last timestamp an entry moved into Inbox.
50+
///
51+
/// This covers items enqueued for the first time only.
52+
last_enqueued_at: TimestampMillisecond,
53+
54+
/// Last timestamp an entry first transitioned to Run.
55+
last_start_at: TimestampMillisecond,
56+
57+
/// Last timestamp an entry transitioned to Run.
58+
last_attempt_at: TimestampMillisecond,
59+
60+
/// Last timestamp an entry transitioned to Finished.
61+
last_finish_at: TimestampMillisecond,
62+
63+
/// Exponential moving average (EMA) of first-attempt queue wait time.
64+
avg_queue_duration: DataType::Duration,
65+
66+
/// Exponential moving average (EMA) of how long entries stayed in inbox.
67+
avg_inbox_duration: DataType::Duration,
68+
69+
/// Exponential moving average (EMA) of how long entries stayed running.
70+
avg_run_duration: DataType::Duration,
71+
72+
/// Exponential moving average (EMA) of how long entries stayed suspended.
73+
avg_suspension_duration: DataType::Duration,
74+
75+
/// Exponential moving average (EMA) of end-to-end entry lifetime from first-runnable time to completion.
76+
/// Note that this only tracks entries that were not killed/cancelled or failed/paused.
77+
avg_end_to_end_duration: DataType::Duration,
78+
79+
/// The number of entries that are in the inbox. The inbox is the priority
80+
/// queue that the scheduler uses to choose which entries to run next.
81+
num_inbox: DataType::UInt64,
82+
83+
/// The number of entries that are currently running.
84+
num_running: DataType::UInt64,
85+
86+
/// The number of entries that are suspended.
87+
num_suspended: DataType::UInt64,
88+
89+
/// The number of entries that are paused.
90+
num_paused: DataType::UInt64,
91+
92+
/// The number of entries that have finished processing and are pending
93+
/// deletion or archival.
94+
num_finished: DataType::UInt64,
95+
96+
));

0 commit comments

Comments
 (0)