Skip to content

Commit a92c7df

Browse files
committed
[VQueues] Introduce sys_vqueue_stats datafusion table
1 parent dc156c7 commit a92c7df

8 files changed

Lines changed: 381 additions & 4 deletions

File tree

crates/partition-store/src/vqueue_table/mod.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,23 @@ mod running_reader;
1818
mod waiting_reader;
1919

2020
use std::io::Cursor;
21+
use std::ops::RangeInclusive;
2122

2223
pub use entry::{EntryStatusKey, StatusHeaderRaw};
2324
pub use inbox::InboxKey;
2425
pub use items::InputPayloadKey;
2526
pub use metadata::*;
2627

2728
use anyhow::Context;
28-
use bilrost::{Message, OwnedMessage};
29+
use bilrost::{BorrowedMessage, Message, OwnedMessage};
2930
use bytes::BytesMut;
3031
use rocksdb::{DBRawIteratorWithThreadMode, ReadOptions};
3132
use tracing::error;
3233

34+
use restate_rocksdb::Priority;
3335
use restate_storage_api::StorageError;
34-
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaUpdates};
36+
use restate_storage_api::vqueue_table::ScanVQueueMetaTable;
37+
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef, VQueueMetaUpdates};
3538
use restate_storage_api::vqueue_table::{
3639
EntryKey, EntryMetadata, EntryStatusHeader, EntryValue, LazyEntryStatus, ReadVQueueTable,
3740
ScanVQueueTable, Stage, Status, WriteVQueueTable, stats::EntryStatistics,
@@ -41,7 +44,11 @@ use restate_types::vqueues::{EntryId, Seq, VQueueId};
4144

4245
use self::entry::{LazyEntryStatusHolder, OwnedEntryStatusHeader, StatusHeaderRawRef};
4346
use crate::keys::{DecodeTableKey, EncodeTableKey, EncodeTableKeyPrefix, KeyKind};
44-
use crate::{PartitionDb, PartitionStoreTransaction, Result, StorageAccess, TableKind};
47+
use crate::scan::TableScan;
48+
use crate::{
49+
PartitionDb, PartitionStore, PartitionStoreTransaction, Result, StorageAccess, TableKind,
50+
break_on_err,
51+
};
4552

4653
impl ScanVQueueTable for PartitionDb {
4754
fn scan_active_vqueues(
@@ -438,6 +445,35 @@ impl ReadVQueueTable for PartitionStoreTransaction<'_> {
438445
}
439446
}
440447

448+
impl ScanVQueueMetaTable for PartitionStore {
449+
fn for_each_vqueue_meta<
450+
F: for<'a> FnMut((&'a VQueueId, &'a VQueueMetaRef<'a>)) -> std::ops::ControlFlow<()>
451+
+ Send
452+
+ Sync
453+
+ 'static,
454+
>(
455+
&self,
456+
range: RangeInclusive<PartitionKey>,
457+
mut f: F,
458+
) -> Result<impl Future<Output = Result<()>> + Send> {
459+
self.iterator_for_each(
460+
"df-vqueue-meta",
461+
Priority::Low,
462+
TableScan::FullScanPartitionKeyRange::<MetaKey>(range),
463+
move |(mut key, value)| {
464+
let meta_key = break_on_err(MetaKey::deserialize_from(&mut key))?;
465+
let meta = break_on_err(
466+
VQueueMetaRef::decode_borrowed(value).map_err(StorageError::BilrostDecode),
467+
)?;
468+
469+
let (vqueue_id,) = meta_key.split();
470+
f((&vqueue_id, &meta)).map_break(Ok)
471+
},
472+
)
473+
.map_err(|_| StorageError::OperationalError)
474+
}
475+
}
476+
441477
// ## Safety
442478
// The iterator is guaranteed to be dropped before the database is dropped, we hold to the
443479
// PartitionDb in this struct for as long as the iterator is alive.

crates/storage-api/src/vqueue_table/tables.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
use std::ops::RangeInclusive;
12+
1113
use restate_types::identifiers::PartitionKey;
1214
use restate_types::vqueues::{Seq, VQueueId};
1315

1416
use super::Status;
15-
use super::metadata::VQueueMeta;
17+
use super::metadata::{VQueueMeta, VQueueMetaRef};
1618
use super::{
1719
EntryId, EntryKey, EntryMetadata, EntryStatusHeader, EntryValue, LazyEntryStatus,
1820
stats::EntryStatistics,
@@ -207,3 +209,17 @@ pub trait ScanVQueueTable {
207209
on_item: impl FnMut(VQueueId, super::metadata::VQueueMeta),
208210
) -> Result<()>;
209211
}
212+
213+
pub trait ScanVQueueMetaTable {
214+
/// Used for data-fusion queries
215+
fn for_each_vqueue_meta<
216+
F: for<'a> FnMut((&'a VQueueId, &'a VQueueMetaRef<'a>)) -> std::ops::ControlFlow<()>
217+
+ Send
218+
+ Sync
219+
+ 'static,
220+
>(
221+
&self,
222+
range: RangeInclusive<PartitionKey>,
223+
f: F,
224+
) -> Result<impl Future<Output = Result<()>> + Send>;
225+
}

crates/storage-query-datafusion/src/context.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,13 @@ where
238238
self.partition_store_manager.clone(),
239239
&self.remote_scanner_manager,
240240
)?;
241+
// VQueues Tables
242+
crate::vqueue_stats::register_self(
243+
ctx,
244+
self.partition_selector.clone(),
245+
self.partition_store_manager.clone(),
246+
&self.remote_scanner_manager,
247+
)?;
241248

242249
ctx.datafusion_context.sql(SYS_INVOCATION_VIEW).await?;
243250

crates/storage-query-datafusion/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ mod statistics;
3939
pub mod table_docs;
4040
mod table_macro;
4141
mod table_providers;
42+
mod vqueue_stats;
4243
pub use table_providers::Scan;
4344
pub mod table_util;
4445

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
mod row;
12+
pub(crate) mod schema;
13+
mod table;
14+
15+
pub(crate) use table::register_self;
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use restate_storage_api::vqueue_table::metadata::VQueueMetaRef;
12+
use restate_types::vqueues::VQueueId;
13+
14+
use super::schema::SysVqueueStatsBuilder;
15+
16+
#[inline]
17+
pub(crate) fn append_vqueues_meta_row<'a>(
18+
builder: &mut SysVqueueStatsBuilder,
19+
qid: &'a VQueueId,
20+
meta: &'a VQueueMetaRef<'a>,
21+
) {
22+
let mut row = builder.row();
23+
24+
row.partition_key(qid.partition_key());
25+
if row.is_id_defined() {
26+
row.fmt_id(qid);
27+
}
28+
if row.is_scope_defined()
29+
&& let Some(scope) = meta.scope
30+
{
31+
row.scope(scope);
32+
}
33+
34+
if row.is_service_name_defined()
35+
&& let Some(service_name) = meta.service_name()
36+
{
37+
row.service_name(service_name);
38+
}
39+
40+
if row.is_is_active_defined() {
41+
row.is_active(meta.is_active());
42+
}
43+
if row.is_queue_is_paused_defined() {
44+
row.queue_is_paused(meta.queue_is_paused);
45+
}
46+
47+
if row.is_limit_key_defined() {
48+
row.fmt_limit_key(&meta.limit_key);
49+
}
50+
51+
if row.is_lock_name_defined()
52+
&& let Some(lock_name) = meta.lock_name()
53+
{
54+
row.fmt_lock_name(lock_name);
55+
}
56+
57+
if row.is_created_at_defined() {
58+
row.created_at(meta.stats.created_at().as_u64() as i64);
59+
}
60+
61+
if row.is_last_enqueued_at_defined()
62+
&& let Some(last_enqueued_at) = meta.stats.last_enqueued_at()
63+
{
64+
row.last_enqueued_at(last_enqueued_at.as_u64() as i64);
65+
}
66+
67+
if row.is_last_start_at_defined()
68+
&& let Some(last_start_at) = meta.stats.last_start_at()
69+
{
70+
row.last_start_at(last_start_at.as_u64() as i64);
71+
}
72+
73+
if row.is_last_attempt_at_defined()
74+
&& let Some(last_attempt_at) = meta.stats.last_attempt_at()
75+
{
76+
row.last_attempt_at(last_attempt_at.as_u64() as i64);
77+
}
78+
79+
if row.is_last_finish_at_defined()
80+
&& let Some(last_finish_at) = meta.stats.last_finish_at()
81+
{
82+
row.last_finish_at(last_finish_at.as_u64() as i64);
83+
}
84+
85+
if row.is_avg_queue_duration_defined() {
86+
row.avg_queue_duration(meta.stats.avg_queue_duration_ms() as i64);
87+
}
88+
89+
if row.is_avg_inbox_duration_defined() {
90+
row.avg_inbox_duration(meta.stats.avg_inbox_duration_ms() as i64);
91+
}
92+
93+
if row.is_avg_run_duration_defined() {
94+
row.avg_run_duration(meta.stats.avg_run_duration_ms() as i64);
95+
}
96+
97+
if row.is_avg_suspension_duration_defined() {
98+
row.avg_suspension_duration(meta.stats.avg_suspension_duration_ms() as i64);
99+
}
100+
101+
if row.is_avg_end_to_end_duration_defined() {
102+
row.avg_end_to_end_duration(meta.stats.avg_end_to_end_duration_ms() as i64);
103+
}
104+
105+
if row.is_num_inbox_defined() {
106+
row.num_inbox(meta.stats.num_inbox());
107+
}
108+
109+
if row.is_num_running_defined() {
110+
row.num_running(meta.stats.num_running());
111+
}
112+
113+
if row.is_num_suspended_defined() {
114+
row.num_suspended(meta.stats.num_suspended());
115+
}
116+
117+
if row.is_num_paused_defined() {
118+
row.num_paused(meta.stats.num_paused());
119+
}
120+
121+
if row.is_num_finished_defined() {
122+
row.num_finished(meta.stats.num_finished());
123+
}
124+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
// Copyright (c) 2023 - 2026 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use crate::table_macro::*;
12+
13+
use datafusion::arrow::datatypes::DataType;
14+
15+
define_sort_order!(sys_vqueue_stats(partition_key, scope, lock_name));
16+
17+
define_table!(sys_vqueue_stats(
18+
/// Internal column that is used for partitioning. Can be ignored.
19+
partition_key: DataType::UInt64,
20+
21+
/// The VQueue Identifier (vq_...)
22+
id: DataType::Utf8,
23+
24+
/// Whether this vqueue is active or not. An active vqueue
25+
/// is a vqueue that is not paused, and has non-finished
26+
/// items.
27+
is_active: DataType::Boolean,
28+
29+
/// Whether this vqueue is paused.
30+
queue_is_paused: DataType::Boolean,
31+
32+
/// Service name linked to this vqueue
33+
service_name: DataType::Utf8,
34+
35+
/// The scope of this vqueue.
36+
scope: DataType::Utf8,
37+
38+
/// The name of the limit-key assigned to this vqueue
39+
limit_key: DataType::Utf8,
40+
41+
/// The name of the lock (in the format of `service/key`)
42+
///
43+
/// This is only set if this is a vqueue for a virtual object.
44+
lock_name: DataType::Utf8,
45+
46+
/// When was this vqueue first created
47+
created_at: TimestampMillisecond,
48+
49+
/// Last timestamp an entry moved into Inbox.
50+
///
51+
/// This covers items enqueued for the first time only.
52+
last_enqueued_at: TimestampMillisecond,
53+
54+
/// Last timestamp an entry first transitioned to Run.
55+
last_start_at: TimestampMillisecond,
56+
57+
/// Last timestamp an entry transitioned to Run.
58+
last_attempt_at: TimestampMillisecond,
59+
60+
/// Last timestamp an entry transitioned to Finished.
61+
last_finish_at: TimestampMillisecond,
62+
63+
/// Exponential moving average (EMA) of first-attempt queue wait time.
64+
avg_queue_duration: DataType::Duration,
65+
66+
/// Exponential moving average (EMA) of how long entries stayed in inbox.
67+
avg_inbox_duration: DataType::Duration,
68+
69+
/// Exponential moving average (EMA) of how long entries stayed running.
70+
avg_run_duration: DataType::Duration,
71+
72+
/// Exponential moving average (EMA) of how long entries stayed suspended.
73+
avg_suspension_duration: DataType::Duration,
74+
75+
/// Exponential moving average (EMA) of end-to-end entry lifetime from first-runnable time to completion.
76+
/// Note that this only tracks entries that were not killed/cancelled or failed/paused.
77+
avg_end_to_end_duration: DataType::Duration,
78+
79+
/// The number of entries that are in the inbox. The inbox is the priority
80+
/// queue that the scheduler uses to choose which entries to run next.
81+
num_inbox: DataType::UInt64,
82+
83+
/// The number of entries that are currently running.
84+
num_running: DataType::UInt64,
85+
86+
/// The number of entries that are suspended.
87+
num_suspended: DataType::UInt64,
88+
89+
/// The number of entries that are paused.
90+
num_paused: DataType::UInt64,
91+
92+
/// The number of entries that have finished processing and are pending
93+
/// deletion or archival.
94+
num_finished: DataType::UInt64,
95+
96+
));

0 commit comments

Comments
 (0)