Skip to content

Commit 00d5a02

Browse files
committed
[VQueues] Introduce sys_vqueues datafusion table
Expose vqueue entries as a single DataFusion table with stage-aware scanning. When the query filters `stage`, only matching stage key kinds are scanned; without a stage filter, all inbox stages are scanned and merged. Also project the latest entry metadata for observability (status plus EntryStatistics timestamps and counters), and add targeted tests for stage predicate extraction and sys_vqueues stage filtering behavior.
1 parent 42f7147 commit 00d5a02

13 files changed

Lines changed: 906 additions & 4 deletions

File tree

crates/partition-store/src/tests/vqueue_table_test/mod.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
2626
use restate_clock::time::MillisSinceEpoch;
2727
use restate_storage_api::Transaction;
28+
use restate_storage_api::vqueue_table::ScanVQueueInboxStages;
2829
use restate_storage_api::vqueue_table::{
2930
EntryKey, EntryMetadata, EntryValue, Stage, Status, VQueueCursor, VQueueStore,
3031
WriteVQueueTable, stats::EntryStatistics,
3132
};
3233
use restate_types::clock::UniqueTimestamp;
3334
use restate_types::identifiers::PartitionKey;
35+
use restate_types::sharding::KeyRange;
3436
use restate_types::vqueues::{EntryId, EntryKind, VQueueId};
3537

3638
use crate::PartitionStore;
@@ -837,6 +839,67 @@ async fn concurrent_enqueue_and_delete(rocksdb: &mut PartitionStore) {
837839
);
838840
}
839841

842+
/// Test: Stage scan reads only the requested stage key kind.
843+
///
844+
/// This validates the datafusion-oriented scan API and ensures stage-specific
845+
/// scans do not leak rows from adjacent stage key kinds or partition keys.
846+
async fn stage_scan_is_filtered_by_stage(rocksdb: &mut PartitionStore) {
847+
let target_partition_key = PartitionKey::from(9_300u64);
848+
let other_partition_key = PartitionKey::from(9_301u64);
849+
let target_qid = VQueueId::custom(target_partition_key, "scan-target");
850+
let other_qid = VQueueId::custom(other_partition_key, "scan-target");
851+
852+
let stages = [
853+
Stage::Inbox,
854+
Stage::Running,
855+
Stage::Suspended,
856+
Stage::Paused,
857+
Stage::Finished,
858+
];
859+
860+
{
861+
let mut txn = rocksdb.transaction();
862+
for (index, stage) in stages.into_iter().enumerate() {
863+
let entry_id = 100 + index as u8;
864+
let target_entry = default_entry(entry_id);
865+
let other_entry = default_entry(entry_id + 10);
866+
867+
txn.put_vqueue_inbox(&target_qid, stage, &target_entry.0, &target_entry.1);
868+
txn.put_vqueue_inbox(&other_qid, stage, &other_entry.0, &other_entry.1);
869+
}
870+
txn.commit().await.expect("commit should succeed");
871+
}
872+
873+
let range = KeyRange::from(target_partition_key..=target_partition_key);
874+
875+
for (index, stage) in stages.into_iter().enumerate() {
876+
let expected_key = default_entry(100 + index as u8).0;
877+
let rows = std::sync::Arc::new(std::sync::Mutex::new(Vec::new()));
878+
let rows_for_scan = rows.clone();
879+
880+
rocksdb
881+
.for_each_vqueue_inbox_entry(range, stage, move |(qid, got_stage, key, _)| {
882+
rows_for_scan
883+
.lock()
884+
.expect("stage scan lock should not be poisoned")
885+
.push((qid.clone(), got_stage, *key));
886+
std::ops::ControlFlow::Continue(())
887+
})
888+
.expect("stage scan setup should succeed")
889+
.await
890+
.expect("stage scan should succeed");
891+
892+
let rows = rows
893+
.lock()
894+
.expect("stage scan lock should not be poisoned")
895+
.clone();
896+
assert_eq!(rows.len(), 1, "stage {stage} should return one row");
897+
assert_eq!(rows[0].0, target_qid, "stage {stage} returned wrong qid");
898+
assert_eq!(rows[0].1, stage, "stage {stage} returned wrong stage");
899+
assert_eq!(rows[0].2, expected_key, "stage {stage} returned wrong key");
900+
}
901+
}
902+
840903
pub(crate) async fn run_tests(mut rocksdb: PartitionStore) {
841904
let mut txn = rocksdb.transaction();
842905

@@ -863,6 +926,8 @@ pub(crate) async fn run_tests(mut rocksdb: PartitionStore) {
863926
verify_waiting_cursor_boundary_is_respected(db);
864927
verify_waiting_cursor_partition_prefix_boundary_is_respected(db);
865928

929+
stage_scan_is_filtered_by_stage(&mut rocksdb).await;
930+
866931
// Tailing iterator tests - these need mutable access to rocksdb for writes
867932
tailing_iterator_sees_new_items_on_reseek(&mut rocksdb).await;
868933
reseek_shows_new_higher_order_items(&mut rocksdb).await;

crates/partition-store/src/vqueue_table/mod.rs

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ mod running_reader;
1818
mod waiting_reader;
1919

2020
use std::io::Cursor;
21+
use std::pin::Pin;
2122

2223
pub use entry::{EntryStatusKey, EntryStatusKeyRef, StatusHeaderRaw};
2324
pub use inbox::InboxKey;
@@ -32,12 +33,12 @@ use tracing::error;
3233

3334
use restate_rocksdb::Priority;
3435
use restate_storage_api::StorageError;
35-
use restate_storage_api::vqueue_table::ScanVQueueMetaTable;
3636
use restate_storage_api::vqueue_table::metadata::{VQueueMeta, VQueueMetaRef, VQueueMetaUpdates};
3737
use restate_storage_api::vqueue_table::{
3838
EntryKey, EntryMetadata, EntryStatusHeader, EntryValue, LazyEntryStatus, ReadVQueueTable,
3939
ScanVQueueTable, Stage, Status, WriteVQueueTable, stats::EntryStatistics,
4040
};
41+
use restate_storage_api::vqueue_table::{ScanVQueueInboxStages, ScanVQueueMetaTable};
4142
use restate_types::sharding::{KeyRange, PartitionKey};
4243
use restate_types::vqueues::{EntryId, Seq, VQueueId};
4344

@@ -474,6 +475,118 @@ impl ScanVQueueMetaTable for PartitionStore {
474475
}
475476
}
476477

478+
fn stage_from_key_kind(key_kind: KeyKind) -> Option<Stage> {
479+
match key_kind {
480+
KeyKind::VQueueInboxStage => Some(Stage::Inbox),
481+
KeyKind::VQueueRunningStage => Some(Stage::Running),
482+
KeyKind::VQueueSuspendedStage => Some(Stage::Suspended),
483+
KeyKind::VQueuePausedStage => Some(Stage::Paused),
484+
KeyKind::VQueueFinishedStage => Some(Stage::Finished),
485+
_ => None,
486+
}
487+
}
488+
489+
fn scan_vqueue_inbox_stage<'store, K, F>(
490+
partition_store: &'store PartitionStore,
491+
scanner_name: &'static str,
492+
range: KeyRange,
493+
stage: Stage,
494+
mut f: F,
495+
) -> Result<Pin<Box<dyn Future<Output = Result<()>> + Send + 'store>>>
496+
where
497+
K: EncodeTableKeyPrefix + 'store,
498+
F: for<'row> FnMut(
499+
(&'row VQueueId, Stage, &'row EntryKey, &'row EntryValue),
500+
) -> std::ops::ControlFlow<()>
501+
+ Send
502+
+ Sync
503+
+ 'static,
504+
{
505+
let future = partition_store
506+
.iterator_for_each(
507+
scanner_name,
508+
Priority::Low,
509+
TableScan::FullScanPartitionKeyRange::<K>(range),
510+
move |(mut key, mut value)| {
511+
let key_kind = break_on_err(KeyKind::deserialize(&mut key))?;
512+
let key_stage = break_on_err(
513+
stage_from_key_kind(key_kind).ok_or(StorageError::DataIntegrityError),
514+
)?;
515+
if key_stage != stage {
516+
return std::ops::ControlFlow::Break(Err(StorageError::DataIntegrityError));
517+
}
518+
519+
let qid: VQueueId = break_on_err(crate::keys::deserialize(&mut key))?;
520+
let entry_key: EntryKey = break_on_err(crate::keys::deserialize(&mut key))?;
521+
let entry = break_on_err(
522+
EntryValue::decode(&mut value).map_err(StorageError::BilrostDecode),
523+
)?;
524+
525+
f((&qid, stage, &entry_key, &entry)).map_break(Ok)
526+
},
527+
)
528+
.map_err(|_| StorageError::OperationalError)?;
529+
530+
Ok(Box::pin(future))
531+
}
532+
533+
impl ScanVQueueInboxStages for PartitionStore {
534+
fn for_each_vqueue_inbox_entry<
535+
F: for<'a> FnMut(
536+
(&'a VQueueId, Stage, &'a EntryKey, &'a EntryValue),
537+
) -> std::ops::ControlFlow<()>
538+
+ Send
539+
+ Sync
540+
+ 'static,
541+
>(
542+
&self,
543+
range: KeyRange,
544+
stage: Stage,
545+
f: F,
546+
) -> Result<impl Future<Output = Result<()>> + Send> {
547+
match stage {
548+
Stage::Unknown => Err(StorageError::Generic(anyhow::anyhow!(
549+
"Unknown stage can't be scanned"
550+
))),
551+
Stage::Inbox => scan_vqueue_inbox_stage::<inbox::InboxKey, _>(
552+
self,
553+
"df-vqueue-inbox",
554+
range,
555+
stage,
556+
f,
557+
),
558+
Stage::Running => scan_vqueue_inbox_stage::<inbox::RunningKey, _>(
559+
self,
560+
"df-vqueue-running",
561+
range,
562+
stage,
563+
f,
564+
),
565+
Stage::Suspended => scan_vqueue_inbox_stage::<inbox::SuspendedKey, _>(
566+
self,
567+
"df-vqueue-suspended",
568+
range,
569+
stage,
570+
f,
571+
),
572+
Stage::Paused => scan_vqueue_inbox_stage::<inbox::PausedKey, _>(
573+
self,
574+
"df-vqueue-paused",
575+
range,
576+
stage,
577+
f,
578+
),
579+
Stage::Finished => scan_vqueue_inbox_stage::<inbox::FinishedKey, _>(
580+
self,
581+
"df-vqueue-finished",
582+
range,
583+
stage,
584+
f,
585+
),
586+
}
587+
}
588+
}
589+
477590
// ## Safety
478591
// The iterator is guaranteed to be dropped before the database is dropped, we hold to the
479592
// PartitionDb in this struct for as long as the iterator is alive.

crates/storage-api/src/vqueue_table/entry.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl<'a> From<&'a EntryMetadata> for EntryMetadataRef<'a> {
205205
pub struct EntryMetadata {
206206
// todo: This is temporary placeholder, type and name _will_ change.
207207
#[bilrost(tag(1))]
208-
deployment: Option<String>,
208+
pub deployment: Option<String>,
209209
}
210210

211211
#[cfg(test)]

crates/storage-api/src/vqueue_table/tables.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,3 +221,20 @@ pub trait ScanVQueueMetaTable {
221221
f: F,
222222
) -> Result<impl Future<Output = Result<()>> + Send>;
223223
}
224+
225+
pub trait ScanVQueueInboxStages {
226+
/// Used for data-fusion queries
227+
fn for_each_vqueue_inbox_entry<
228+
F: for<'a> FnMut(
229+
(&'a VQueueId, Stage, &'a EntryKey, &'a EntryValue),
230+
) -> std::ops::ControlFlow<()>
231+
+ Send
232+
+ Sync
233+
+ 'static,
234+
>(
235+
&self,
236+
range: KeyRange,
237+
stage: Stage,
238+
f: F,
239+
) -> Result<impl Future<Output = Result<()>> + Send>;
240+
}

crates/storage-query-datafusion/src/context.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,12 @@ where
288288
self.partition_store_manager.clone(),
289289
&self.remote_scanner_manager,
290290
)?;
291+
crate::vqueues::register_self(
292+
ctx,
293+
self.partition_selector.clone(),
294+
self.partition_store_manager.clone(),
295+
&self.remote_scanner_manager,
296+
)?;
291297

292298
ctx.datafusion_context.sql(SYS_INVOCATION_VIEW).await?;
293299

0 commit comments

Comments
 (0)