Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions crates/invoker-api/src/effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ pub enum EffectKind {
End,
/// This is sent when the invoker exhausted all its attempts to make progress on the specific invocation.
Failed(InvocationError),
/// This is sent when the invoker exhausted all its retry attempts and the on_max_attempts policy is Kill.
/// Unlike [`Self::Failed`], this carries the last error as a killed journal event so the UI can
/// distinguish a kill-after-max-retries from a plain failure.
KilledAfterMaxAttempts {
killed_event: RawEvent,
},
// New journal entry v2 which only carries the raw entry.
// Introduced in v1.6.0
// Start writing in v1.7.0
Expand Down
38 changes: 35 additions & 3 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ use restate_types::invocation::InvocationTarget;
use restate_types::journal::EntryIndex;
use restate_types::journal::enriched::EnrichedRawEntry;
use restate_types::journal_events::raw::RawEvent;
use restate_types::journal_events::{Event, PausedEvent, TransientErrorEvent};
use restate_types::journal_events::{Event, KilledEvent, PausedEvent, TransientErrorEvent};
use restate_types::journal_v2::raw::{RawCommand, RawNotification};
use restate_types::journal_v2::{CommandIndex, EntryMetadata, NotificationId};
use restate_types::live::{Live, LiveLoad};
Expand Down Expand Up @@ -1661,13 +1661,45 @@ where
"Error when executing the invocation, not going to retry.");
self.status_store.on_end(&partition, &invocation_id);

let journal_v2_related_command_type =
if let InvokerError::SdkV2(SdkInvocationErrorV2 {
related_command: Some(ref related_entry),
..
}) = error
{
related_entry
.related_entry_type
.and_then(|e| e.try_as_command_ref().copied())
} else {
None
};
let invocation_error_report = error.into_invocation_error_report();
let killed_event = KilledEvent {
last_failure: Some(TransientErrorEvent {
error_code: invocation_error_report.err.code(),
error_message: invocation_error_report.err.message().to_owned(),
error_stacktrace: invocation_error_report
.err
.stacktrace()
.map(|s| s.to_owned()),
restate_doc_error_code: invocation_error_report
.doc_error_code
.map(|c| c.code().to_owned()),
related_command_index: invocation_error_report.related_entry_index,
related_command_name: invocation_error_report.related_entry_name.clone(),
related_command_type: journal_v2_related_command_type,
}),
};

let _ = self
.invocation_state_machine_manager
.resolve_partition_sender(partition)
.expect("Partition should be registered")
.send(Box::new(Effect {
invocation_id,
kind: EffectKind::Failed(error.into_invocation_error()),
kind: EffectKind::KilledAfterMaxAttempts {
killed_event: RawEvent::from(Event::Killed(killed_event)),
},
}))
.await;
}
Expand Down Expand Up @@ -2858,7 +2890,7 @@ mod tests {
*effect,
pat!(Effect {
invocation_id: eq(invocation_id),
kind: pat!(EffectKind::Failed(_))
kind: pat!(EffectKind::KilledAfterMaxAttempts { .. })
})
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ define_table!(sys_journal_events (
/// When the entry was appended to the journal.
appended_at: TimestampMillisecond,

/// The event type.
/// The event type. Possible values: `TransientError`, `Paused`, `Killed`.
event_type: DataType::LargeUtf8,

/// The event serialized as a JSON string.
Expand Down
4 changes: 4 additions & 0 deletions crates/types/protobuf/restate/journal_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,8 @@ message TransientErrorEvent {

message PausedEvent {
TransientErrorEvent last_failure = 1;
}

message KilledEvent {
TransientErrorEvent last_failure = 1;
}
10 changes: 10 additions & 0 deletions crates/types/src/journal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub enum EventType {
Unknown = 0,
TransientError = 1,
Paused = 2,
Killed = 3,
}

#[derive(
Expand All @@ -43,6 +44,7 @@ pub enum EventType {
pub enum Event {
TransientError(TransientErrorEvent),
Paused(PausedEvent),
Killed(KilledEvent),
/// This is used when it's not possible to parse in this Restate version the event.
Unknown,
}
Expand Down Expand Up @@ -71,3 +73,11 @@ pub struct PausedEvent {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_failure: Option<TransientErrorEvent>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct KilledEvent {
/// The last transient error before being killed, if any.
/// Empty when killed via admin API without prior retry failures.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub last_failure: Option<TransientErrorEvent>,
}
23 changes: 23 additions & 0 deletions crates/types/src/journal_events/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ fn decode(ty: EventType, value: Bytes) -> Result<Event, EventDecodingError> {
pb::TransientErrorEvent::decode(value)?.try_into()?,
)),
EventType::Paused => Ok(Event::Paused(pb::PausedEvent::decode(value)?.try_into()?)),
EventType::Killed => Ok(Event::Killed(pb::KilledEvent::decode(value)?.try_into()?)),
EventType::Unknown => Ok(Event::Unknown),
}
}
Expand All @@ -91,6 +92,10 @@ fn encode(event: Event) -> RawEvent {
EventType::Paused,
pb::PausedEvent::from(e).encode_to_vec().into(),
),
Event::Killed(e) => RawEvent::new(
EventType::Killed,
pb::KilledEvent::from(e).encode_to_vec().into(),
),
Event::Unknown => RawEvent::unknown(),
}
}
Expand Down Expand Up @@ -233,4 +238,22 @@ mod pb {
})
}
}

impl From<event::KilledEvent> for KilledEvent {
fn from(event::KilledEvent { last_failure }: event::KilledEvent) -> Self {
KilledEvent {
last_failure: last_failure.map(Into::into),
}
}
}

impl TryFrom<KilledEvent> for event::KilledEvent {
type Error = anyhow::Error;

fn try_from(KilledEvent { last_failure }: KilledEvent) -> Result<Self, Self::Error> {
Ok(event::KilledEvent {
last_failure: last_failure.map(|f| f.try_into()).transpose()?,
})
}
}
}
66 changes: 65 additions & 1 deletion crates/worker/src/partition/state_machine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use restate_storage_api::invocation_status_table::{
WriteInvocationStatusTable,
};
use restate_storage_api::invocation_status_table::{InvocationStatus, ScheduledInvocation};
use restate_storage_api::journal_events::WriteJournalEventsTable;
use restate_storage_api::journal_events::{EventView, WriteJournalEventsTable};
use restate_storage_api::journal_table::ReadJournalTable;
use restate_storage_api::journal_table::{JournalEntry, WriteJournalTable};
use restate_storage_api::journal_table_v2;
Expand Down Expand Up @@ -95,6 +95,8 @@ use restate_types::journal::enriched::{
AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader,
};
use restate_types::journal::raw::{EntryHeader, RawEntryCodec, RawEntryCodecError};
use restate_types::journal_events::raw::RawEvent;
use restate_types::journal_events::{Event, KilledEvent};
use restate_types::journal_v2::command::{OutputCommand, OutputResult};
use restate_types::journal_v2::raw::RawEntry;
use restate_types::journal_v2::{
Expand Down Expand Up @@ -1980,6 +1982,12 @@ impl<S> StateMachineApplyContext<'_, S> {
+ WriteVQueueTable
+ WriteJournalEventsTable,
{
let after_journal_entry_index = metadata
.journal_metadata
.length
.checked_sub(1)
.unwrap_or_default();

self.kill_child_invocations(&invocation_id, metadata.journal_metadata.length, &metadata)
.await?;

Expand All @@ -1989,6 +1997,18 @@ impl<S> StateMachineApplyContext<'_, S> {
Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)),
)
.await?;

// Write after end_invocation so journal cleanup (do_drop_journal) doesn't delete it
self.storage.put_journal_event(
invocation_id,
EventView {
append_time: self.record_created_at,
after_journal_entry_index,
event: RawEvent::from(Event::Killed(KilledEvent { last_failure: None })),
},
self.record_lsn.as_u64(),
)?;

self.do_send_abort_invocation_to_invoker(invocation_id);
Ok(())
}
Expand Down Expand Up @@ -2016,6 +2036,12 @@ impl<S> StateMachineApplyContext<'_, S> {
+ WriteVQueueTable
+ WriteJournalEventsTable,
{
let after_journal_entry_index = metadata
.journal_metadata
.length
.checked_sub(1)
.unwrap_or_default();

self.kill_child_invocations(&invocation_id, metadata.journal_metadata.length, &metadata)
.await?;

Expand All @@ -2025,6 +2051,18 @@ impl<S> StateMachineApplyContext<'_, S> {
Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)),
)
.await?;

// Write after end_invocation so journal cleanup (do_drop_journal) doesn't delete it
self.storage.put_journal_event(
invocation_id,
EventView {
append_time: self.record_created_at,
after_journal_entry_index,
event: RawEvent::from(Event::Killed(KilledEvent { last_failure: None })),
},
self.record_lsn.as_u64(),
)?;

self.do_send_abort_invocation_to_invoker(invocation_id);
Ok(())
}
Expand Down Expand Up @@ -2556,6 +2594,32 @@ impl<S> StateMachineApplyContext<'_, S> {
)
.await?;
}
InvokerEffectKind::KilledAfterMaxAttempts { killed_event } => {
let metadata = invocation_status
.into_invocation_metadata()
.expect("Must be present if status is invoked");
let after_journal_entry_index = metadata
.journal_metadata
.length
.checked_sub(1)
.unwrap_or_default();
self.end_invocation(
effect.invocation_id,
metadata,
Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)),
)
.await?;
// Write after end_invocation so journal cleanup (do_drop_journal) doesn't delete it
self.storage.put_journal_event(
effect.invocation_id,
EventView {
append_time: self.record_created_at,
after_journal_entry_index,
event: killed_event,
},
self.record_lsn.as_u64(),
)?;
}
InvokerEffectKind::Yield(ref reason) => {
let invocation_metadata = invocation_status
.into_invocation_metadata()
Expand Down
82 changes: 81 additions & 1 deletion crates/worker/src/partition/state_machine/tests/kill_cancel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,24 @@
// by the Apache License, Version 2.0.

use super::{fixtures, matchers, *};

use assert2::assert;
use assert2::let_assert;
use googletest::any;
use googletest::elements_are;
use prost::Message;
use restate_invoker_api::EffectKind as InvokerEffectKind;
use restate_storage_api::journal_table;
use restate_storage_api::journal_table::WriteJournalTable;
use restate_storage_api::timer_table::{
ReadTimerTable, Timer, TimerKey, TimerKeyKind, WriteTimerTable,
};
use restate_types::deployment::PinnedDeployment;
use restate_types::errors::KILLED_INVOCATION_ERROR;
use restate_types::identifiers::EntryIndex;
use restate_types::invocation::{IngressInvocationResponseSink, TerminationFlavor};
use restate_types::journal::enriched::EnrichedEntryHeader;
use restate_types::journal_events::raw::RawEvent;
use restate_types::journal_events::{Event, KilledEvent, TransientErrorEvent};
use restate_types::journal_v2::NotificationId;
use restate_types::service_protocol;
use rstest::rstest;
Expand Down Expand Up @@ -771,3 +775,79 @@ fn create_termination_journal(
)),
]
}

/// Admin API kill should write a KilledEvent with no last_failure.
#[restate_core::test]
async fn kill_invoked_writes_killed_journal_event() -> anyhow::Result<()> {
let mut test_env = TestEnv::create().await;
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await;

let _ = test_env
.apply(Command::TerminateInvocation(InvocationTermination {
invocation_id,
flavor: TerminationFlavor::Kill,
response_sink: None,
}))
.await;

assert_that!(
test_env.read_journal_events(invocation_id).await,
elements_are![eq(Event::Killed(KilledEvent { last_failure: None }))]
);

test_env.shutdown().await;
Ok(())
}

/// Invoker kill-after-max-attempts should write a KilledEvent with the last error
/// and complete the invocation with KILLED_INVOCATION_ERROR (not the raw service error).
#[restate_core::test]
async fn killed_after_max_attempts_writes_killed_event_with_last_failure() -> anyhow::Result<()> {
let mut test_env = TestEnv::create().await;
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await;

let last_error = TransientErrorEvent {
error_code: 500u16.into(),
error_message: "service blew up".to_string(),
error_stacktrace: None,
restate_doc_error_code: None,
related_command_index: None,
related_command_name: None,
related_command_type: None,
};

let _ = test_env
.apply(Command::InvokerEffect(Box::new(
restate_invoker_api::Effect {
invocation_id,
kind: InvokerEffectKind::KilledAfterMaxAttempts {
killed_event: RawEvent::from(Event::Killed(KilledEvent {
last_failure: Some(last_error.clone()),
})),
},
},
)))
.await;

// Journal event must carry the last_failure
assert_that!(
test_env.read_journal_events(invocation_id).await,
elements_are![eq(Event::Killed(KilledEvent {
last_failure: Some(last_error)
}))]
);

// Invocation must no longer be active
assert_that!(
test_env
.storage
.get_invocation_status(&invocation_id)
.await?,
not(pat!(InvocationStatus::Invoked { .. }))
);

test_env.shutdown().await;
Ok(())
}
2 changes: 1 addition & 1 deletion crates/worker/src/partition/state_machine/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use restate_storage_api::service_status_table::{
use restate_storage_api::state_table::{ReadStateTable, WriteStateTable};
use restate_test_util::matchers::*;
use restate_types::config::StorageOptions;
use restate_types::errors::{InvocationError, KILLED_INVOCATION_ERROR, codes};
use restate_types::errors::{InvocationError, codes};
use restate_types::identifiers::{
AwakeableIdentifier, InvocationId, PartitionId, PartitionKey, PartitionProcessorRpcRequestId,
ServiceId,
Expand Down
Loading
Loading