Skip to content

[Invoker] Wire up ReservedResource to replace the old Permit#4649

Open
AhmedSoliman wants to merge 8 commits intomainfrom
pr4649
Open

[Invoker] Wire up ReservedResource to replace the old Permit#4649
AhmedSoliman wants to merge 8 commits intomainfrom
pr4649

Conversation

I'm not loving this, but it works well enough to unblock.
NOTE: sys_user_limits is a *stub* table, the shape and contents of this table will almost definitely
change in the very near future. Consider this as a placeholder until scope+limit-keys are wired up completely.
Expose vqueue entries as a single DataFusion table with stage-aware scanning.
When the query filters `stage`, only matching stage key kinds are scanned;
without a stage filter, all inbox stages are scanned and merged.

Also project the latest entry metadata for observability (status plus
EntryStatistics timestamps and counters), and add targeted tests for stage
predicate extraction and sys_vqueues stage filtering behavior.
Baby steps towards unification and changing the ownership model of the invoker
With this change, user-limits acquisition correctly tracks the lifecycle of the invocation state machine on the invoker.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: b6bd1d192b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +137 to +141
if self
.notified_head
.is_some_and(|notified| notified == vqueue)
{
return Poll::Pending;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Advance throttling head when notified waiter doesn't acquire

poll_head keeps the throttling head in-place and then suppresses further wakeups while notified_head == vqueue, but the acquire path checks other resources (e.g. lock/user limits) before throttling (resource_manager.rs), so a woken queue can get blocked elsewhere and never call poll_acquire to consume/remove its reservation. In that state this branch returns Pending forever for the same head, queues behind it stop making progress, and reserved throttling capacity is stranded until the head happens to re-enter throttling, which can severely stall scheduling under lock+throttle contention.

Useful? React with 👍 / 👎.

@github-actions
Copy link
Copy Markdown

Test Results

  8 files  ±0    8 suites  ±0   2m 9s ⏱️ +14s
 50 tests ±0   50 ✅ ±0  0 💤 ±0  0 ❌ ±0 
218 runs  ±0  218 ✅ ±0  0 💤 ±0  0 ❌ ±0 

Results for commit b6bd1d1. ± Comparison against base commit bd7dc3c.

@github-actions
Copy link
Copy Markdown

Test Results

 5 files   -   3   5 suites   - 3   1m 37s ⏱️ -18s
27 tests  -  23  27 ✅  -  23  0 💤 ±0  0 ❌ ±0 
54 runs   - 164  54 ✅  - 164  0 💤 ±0  0 ❌ ±0 

Results for commit b6bd1d1. ± Comparison against base commit bd7dc3c.

This pull request removes 50 and adds 27 tests. Note that renamed tests count towards both.
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[1]
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[2]
dev.restate.sdktesting.tests.CallOrdering ‑ ordering(boolean[], Client)[3]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[1]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[2]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromAdminAPI(BlockingOperation, Client, URI)[3]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[1]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[2]
dev.restate.sdktesting.tests.Cancellation ‑ cancelFromContext(BlockingOperation, Client)[3]
dev.restate.sdktesting.tests.Combinators ‑ awakeableOrTimeoutUsingAwaitAny(Client)
…
dev.restate.sdktesting.tests.AwakeableIngressEndpointTest ‑ completeWithFailure(Client)
dev.restate.sdktesting.tests.AwakeableIngressEndpointTest ‑ completeWithSuccess(Client)
dev.restate.sdktesting.tests.AwakeableLeaderTransferTest ‑ awakeableCompletionsAreNotLostDuringLeaderTransfer(Client, ContainerHandle)
dev.restate.sdktesting.tests.IngressTest ‑ idempotentInvokeSend(Client)
dev.restate.sdktesting.tests.IngressTest ‑ idempotentInvokeService(Client)
dev.restate.sdktesting.tests.IngressTest ‑ idempotentInvokeVirtualObject(Client)
dev.restate.sdktesting.tests.IngressTest ‑ idempotentSendThenAttachWIthIdempotencyKey(Client)
dev.restate.sdktesting.tests.IngressTest ‑ privateService(URI, Client)
dev.restate.sdktesting.tests.InvokerMemoryTest ‑ allInvocationsCompleteUnderMemoryPressure(Client, URI, int)
dev.restate.sdktesting.tests.InvokerMemoryTest ‑ allStatefulInvocationsCompleteUnderMemoryPressure(Client, URI, int)
…

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 24, 2026

Test Results

  7 files   -  1    7 suites   - 1   2m 38s ⏱️ +43s
 47 tests  -  3   47 ✅  -  3  0 💤 ±0  0 ❌ ±0 
200 runs   - 18  200 ✅  - 18  0 💤 ±0  0 ❌ ±0 

Results for commit b6bd1d1. ± Comparison against base commit bd7dc3c.

This pull request removes 7 and adds 4 tests. Note that renamed tests count towards both.
dev.restate.sdktesting.tests.Combinators ‑ awakeableOrTimeoutUsingAwaitAny(Client)
dev.restate.sdktesting.tests.Combinators ‑ firstSuccessfulCompletedAwakeable(Client)
dev.restate.sdktesting.tests.Custom ‑ run(CustomTestConfig, URI, URI)[1]
dev.restate.sdktesting.tests.UserErrors ‑ failSeveralTimesWithMetadata(URI)
dev.restate.sdktesting.tests.UserErrors ‑ internalCallFailurePropagationWithMetadata(URI)
dev.restate.sdktesting.tests.UserErrors ‑ invokeTerminallyFailingCallWithMetadata(URI)
dev.restate.sdktesting.tests.UserErrors ‑ sideEffectWithTerminalErrorWithMetadata(URI)
dev.restate.sdktesting.tests.KafkaIngress ‑ handleEventInCounterService(URI, int, Client)
dev.restate.sdktesting.tests.KafkaIngress ‑ handleEventInEventHandler(URI, int, Client)
dev.restate.sdktesting.tests.UpgradeWithInFlightInvocation ‑ inFlightInvocation(Client, URI)
dev.restate.sdktesting.tests.UpgradeWithNewInvocation ‑ executesNewInvocationWithLatestServiceRevisions(Client, URI)

♻️ This comment has been updated with latest results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant