Skip to content

Commit 29e1215

Browse files
lym953claude
andauthored
feat(logs): [SVLS-8582] Hold logs and add durable context to durable function logs (#1053)
## Summary If the function is a durable function, then add two attributes to every log: - `lambda.durable_function.execution_id` - `lambda.durable_function.execution_name` ## Background - In previous PRs (DataDog/datadog-lambda-python#728, DataDog/datadog-lambda-js#730), tracer adds attributes `aws_lambda.durable_function.execution_id` and `aws_lambda.durable_function.execution_name` to the `aws.lambda` span ## Details ### Data flow 1. `TraceAgent::handle_traces()` detects an `aws.lambda` span carrying `request_id`, `durable_function_execution_id`, and `durable_function_execution_name` in its meta tags 2. It sends a `ProcessorCommand::ForwardDurableContext { request_id, execution_id, execution_name }` to `InvocationProcessorService` 3. `Processor::forward_durable_context()` in the lifecycle processor relays this as a `DurableContextUpdate` to the logs pipeline via an mpsc channel, using `send().await` to guarantee delivery 4. `LogsAgent::spin()` receives the update and calls `LogsProcessor::process_durable_context_update()`, which inserts the entry into `LambdaProcessor::durable_context_map` and drains any held logs for that `request_id` ### Log holding and draining - After cold start, the logs processor holds all incoming logs without flushing them, because it does not yet know whether this is a durable function - Held logs are stored in `held_logs: HashMap<String, Vec<IntakeLog>>`, keyed by `request_id` - Logs without a `request_id` (e.g. in managed instance mode) are pushed directly to `ready_logs` and never held, since they cannot carry durable context - `durable_context_map: HashMap<String, DurableExecutionContext>` maps `request_id` to `(execution_id, execution_name)`. It has a fixed capacity (500 entries) with FIFO eviction - When the logs processor receives a `PlatformInitStart` event, it learns whether the function is a durable function: - If **not** a durable function: drain all held logs (mark them ready for aggregation and flush) - If **durable**: drain held logs whose `request_id` is already in `durable_context_map` (tag them with `lambda.durable_function.execution_id` and `lambda.durable_function.execution_name`); keep the rest held until their context arrives - When an entry is inserted into `durable_context_map`, any held logs for that `request_id` are drained immediately ### Memory safety and resilience - `held_logs` is capped at **50 keys** (intentionally small — see below). Insertion order is tracked in `held_logs_order: VecDeque<String>` for FIFO eviction - When `held_logs` is at capacity and a new `request_id` arrives, the **oldest key is evicted**: its logs are serialized and pushed to `ready_logs` without durable context tags. This ensures logs are always eventually sent to Datadog even if the tracer is not installed and context never arrives - The cap is kept small (50) to limit the size of the batch flushed at shutdown, reducing the risk of the final flush timing out when held logs are drained without durable context - At **shutdown**, after draining the telemetry channel: 1. The `durable_context_rx` channel is drained to apply any pending context updates, maximising the chance logs are decorated before flushing 2. All remaining `held_logs` are drained to `ready_logs` without durable context tags, so no logs are lost ### Types - `DurableContextUpdate { request_id, execution_id, execution_name }` — message sent from trace agent through lifecycle processor to logs pipeline - `DurableExecutionContext { execution_id, execution_name }` — value type stored in `durable_context_map` ## Test plan ### Manual test #### Steps Build a layer, install it on a function, and invoke it. #### Result 1. In Datadog, all the logs for this durable execution have the two new attributes <img width="734" height="421" alt="image" src="https://github.com/user-attachments/assets/173e3be2-8bb1-4e08-be97-521c63679bf1" /> 2. The logs query > source:lambda @lambda.arn:"arn:aws:lambda:us-east-2:425362996713:function:yiming-durable-py-custom-tracer" @lambda.durable_function.execution_name:c949fb3d-a8f5-4ae6-a802-b1458149a4b2 returns all the logs for two invocations of this durable execution. It returns 98 logs, equal to 49 logs for the first invocation + 49 logs for the second invocation. ([query link](https://ddserverless.datadoghq.com/logs?query=source%3Alambda%20%40lambda.arn%3A%22arn%3Aaws%3Alambda%3Aus-east-2%3A425362996713%3Afunction%3Ayiming-durable%22%20%40lambda.durable_execution_name%3A2f492839-75df-4acb-9f2a-30b1b36d5c8f&agg_m=count&agg_m_source=base&agg_t=count&cols=host%2Cservice&fromUser=true&messageDisplay=inline&refresh_mode=paused&storage=flex_tier&stream_sort=time%2Cdesc&viz=stream&from_ts=1772655235033&to_ts=1772655838416&live=false)) ### Unit tests Passed the added unit tests --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent faecfb8 commit 29e1215

File tree

13 files changed

+693
-37
lines changed

13 files changed

+693
-37
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use bottlecap::{
5151
AggregatorHandle as LogsAggregatorHandle, AggregatorService as LogsAggregatorService,
5252
},
5353
flusher::LogsFlusher,
54+
lambda::DurableContextUpdate,
5455
},
5556
otlp::{agent::Agent as OtlpAgent, should_enable_otlp_agent},
5657
proxy::{interceptor, should_start_proxy},
@@ -298,15 +299,20 @@ async fn extension_loop_active(
298299
// and shares the connection pool.
299300
let shared_client = bottlecap::http::get_client(config);
300301

301-
let (logs_agent_channel, logs_flusher, logs_agent_cancel_token, logs_aggregator_handle) =
302-
start_logs_agent(
303-
config,
304-
Arc::clone(&api_key_factory),
305-
&tags_provider,
306-
event_bus_tx.clone(),
307-
aws_config.is_managed_instance_mode(),
308-
&shared_client,
309-
);
302+
let (
303+
logs_agent_channel,
304+
logs_flusher,
305+
logs_agent_cancel_token,
306+
logs_aggregator_handle,
307+
durable_context_tx,
308+
) = start_logs_agent(
309+
config,
310+
Arc::clone(&api_key_factory),
311+
&tags_provider,
312+
event_bus_tx.clone(),
313+
aws_config.is_managed_instance_mode(),
314+
&shared_client,
315+
);
310316

311317
let (metrics_flushers, metrics_aggregator_handle, dogstatsd_cancel_token) = start_dogstatsd(
312318
tags_provider.clone(),
@@ -325,6 +331,7 @@ async fn extension_loop_active(
325331
Arc::clone(&aws_config),
326332
metrics_aggregator_handle.clone(),
327333
Arc::clone(&propagator),
334+
durable_context_tx,
328335
);
329336
tokio::spawn(async move {
330337
invocation_processor_service.run().await;
@@ -1039,14 +1046,15 @@ fn start_logs_agent(
10391046
LogsFlusher,
10401047
CancellationToken,
10411048
LogsAggregatorHandle,
1049+
Sender<DurableContextUpdate>,
10421050
) {
10431051
let (aggregator_service, aggregator_handle) = LogsAggregatorService::default();
10441052
// Start service in background
10451053
tokio::spawn(async move {
10461054
aggregator_service.run().await;
10471055
});
10481056

1049-
let (mut agent, tx) = LogsAgent::new(
1057+
let (mut agent, tx, durable_context_tx) = LogsAgent::new(
10501058
Arc::clone(tags_provider),
10511059
Arc::clone(config),
10521060
event_bus,
@@ -1068,7 +1076,13 @@ fn start_logs_agent(
10681076
config.clone(),
10691077
client.clone(),
10701078
);
1071-
(tx, flusher, cancel_token, aggregator_handle)
1079+
(
1080+
tx,
1081+
flusher,
1082+
cancel_token,
1083+
aggregator_handle,
1084+
durable_context_tx,
1085+
)
10721086
}
10731087

10741088
#[allow(clippy::type_complexity)]

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ use libdd_trace_protobuf::pb::Span;
1616
use libdd_trace_utils::tracer_header_tags;
1717
use serde_json::Value;
1818
use tokio::time::Instant;
19-
use tracing::{debug, trace, warn};
19+
use tracing::{debug, error, trace, warn};
20+
21+
use tokio::sync::mpsc;
2022

2123
use crate::{
2224
config::{self, aws::AwsConfig},
@@ -31,6 +33,7 @@ use crate::{
3133
span_inferrer::{self, SpanInferrer},
3234
triggers::get_default_service_name,
3335
},
36+
logs::lambda::DurableContextUpdate,
3437
metrics::enhanced::lambda::{EnhancedMetricData, Lambda as EnhancedMetrics},
3538
proc::{
3639
self, CPUData, NetworkData,
@@ -89,6 +92,10 @@ pub struct Processor {
8992
/// Tracks whether if first invocation after init has been received in Managed Instance mode.
9093
/// Used to determine if we should search for the empty context on an invocation.
9194
awaiting_first_invocation: bool,
95+
/// Sender used to forward durable execution context extracted from `aws.lambda` spans to the
96+
/// logs agent. Decouples the trace agent from the logs agent: the trace agent sends spans
97+
/// to the lifecycle processor, which extracts durable context and relays it here.
98+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
9299
}
93100

94101
impl Processor {
@@ -99,6 +106,7 @@ impl Processor {
99106
aws_config: Arc<AwsConfig>,
100107
metrics_aggregator: dogstatsd::aggregator::AggregatorHandle,
101108
propagator: Arc<DatadogCompositePropagator>,
109+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
102110
) -> Self {
103111
let resource = tags_provider
104112
.get_canonical_resource_name()
@@ -128,6 +136,7 @@ impl Processor {
128136
dynamic_tags: HashMap::new(),
129137
active_invocations: 0,
130138
awaiting_first_invocation: false,
139+
durable_context_tx,
131140
}
132141
}
133142

@@ -1358,6 +1367,27 @@ impl Processor {
13581367
.add_tracer_span(request_id, span, client_computed_stats);
13591368
}
13601369
}
1370+
1371+
/// Forwards durable execution context extracted from an `aws.lambda` span to the logs
1372+
/// pipeline so it can release held logs and tag them with durable execution metadata.
1373+
pub async fn forward_durable_context(
1374+
&mut self,
1375+
request_id: &str,
1376+
execution_id: &str,
1377+
execution_name: &str,
1378+
) {
1379+
if let Err(e) = self
1380+
.durable_context_tx
1381+
.send(DurableContextUpdate {
1382+
request_id: request_id.to_owned(),
1383+
execution_id: execution_id.to_owned(),
1384+
execution_name: execution_name.to_owned(),
1385+
})
1386+
.await
1387+
{
1388+
error!("Invocation Processor | Failed to forward durable context to logs agent: {e}");
1389+
}
1390+
}
13611391
}
13621392

13631393
#[cfg(test)]
@@ -1403,7 +1433,15 @@ mod tests {
14031433
tokio::spawn(service.run());
14041434

14051435
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
1406-
Processor::new(tags_provider, config, aws_config, handle, propagator)
1436+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
1437+
Processor::new(
1438+
tags_provider,
1439+
config,
1440+
aws_config,
1441+
handle,
1442+
propagator,
1443+
durable_context_tx,
1444+
)
14071445
}
14081446

14091447
#[test]
@@ -1940,7 +1978,15 @@ mod tests {
19401978

19411979
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
19421980

1943-
let processor = Processor::new(tags_provider, config, aws_config, handle, propagator);
1981+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
1982+
let processor = Processor::new(
1983+
tags_provider,
1984+
config,
1985+
aws_config,
1986+
handle,
1987+
propagator,
1988+
durable_context_tx,
1989+
);
19441990

19451991
assert!(
19461992
processor.is_managed_instance_mode(),
@@ -2160,12 +2206,14 @@ mod tests {
21602206
AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service");
21612207
tokio::spawn(aggregator_service.run());
21622208
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
2209+
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
21632210
let mut p = Processor::new(
21642211
Arc::clone(&tags_provider),
21652212
Arc::clone(&config),
21662213
aws_config,
21672214
aggregator_handle,
21682215
propagator,
2216+
durable_context_tx,
21692217
);
21702218

21712219
let (trace_tx, mut trace_rx) = mpsc::channel(10);

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::{
1818
context::{Context, ReparentingInfo},
1919
processor::Processor,
2020
},
21+
logs::lambda::DurableContextUpdate,
2122
tags::provider,
2223
traces::{propagation::DatadogCompositePropagator, trace_processor::SendingTraceProcessor},
2324
};
@@ -110,6 +111,11 @@ pub enum ProcessorCommand {
110111
span: Box<Span>,
111112
client_computed_stats: bool,
112113
},
114+
ForwardDurableContext {
115+
request_id: String,
116+
execution_id: String,
117+
execution_name: String,
118+
},
113119
OnOutOfMemoryError {
114120
timestamp: i64,
115121
},
@@ -381,6 +387,21 @@ impl InvocationProcessorHandle {
381387
.await
382388
}
383389

390+
pub async fn forward_durable_context(
391+
&self,
392+
request_id: String,
393+
execution_id: String,
394+
execution_name: String,
395+
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
396+
self.sender
397+
.send(ProcessorCommand::ForwardDurableContext {
398+
request_id,
399+
execution_id,
400+
execution_name,
401+
})
402+
.await
403+
}
404+
384405
pub async fn on_out_of_memory_error(
385406
&self,
386407
timestamp: i64,
@@ -431,6 +452,7 @@ impl InvocationProcessorService {
431452
aws_config: Arc<AwsConfig>,
432453
metrics_aggregator_handle: AggregatorHandle,
433454
propagator: Arc<DatadogCompositePropagator>,
455+
durable_context_tx: mpsc::Sender<DurableContextUpdate>,
434456
) -> (InvocationProcessorHandle, Self) {
435457
let (sender, receiver) = mpsc::channel(1000);
436458

@@ -440,6 +462,7 @@ impl InvocationProcessorService {
440462
aws_config,
441463
metrics_aggregator_handle,
442464
propagator,
465+
durable_context_tx,
443466
);
444467

445468
let handle = InvocationProcessorHandle { sender };
@@ -592,6 +615,15 @@ impl InvocationProcessorService {
592615
} => {
593616
self.processor.add_tracer_span(&span, client_computed_stats);
594617
}
618+
ProcessorCommand::ForwardDurableContext {
619+
request_id,
620+
execution_id,
621+
execution_name,
622+
} => {
623+
self.processor
624+
.forward_durable_context(&request_id, &execution_id, &execution_name)
625+
.await;
626+
}
595627
ProcessorCommand::OnOutOfMemoryError { timestamp } => {
596628
self.processor.on_out_of_memory_error(timestamp);
597629
}

bottlecap/src/logs/agent.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ use crate::{LAMBDA_RUNTIME_SLUG, config};
1212

1313
const DRAIN_LOG_INTERVAL: Duration = Duration::from_millis(100);
1414

15+
use crate::logs::lambda::DurableContextUpdate;
16+
1517
#[allow(clippy::module_name_repetitions)]
1618
pub struct LogsAgent {
1719
rx: mpsc::Receiver<TelemetryEvent>,
20+
durable_context_rx: mpsc::Receiver<DurableContextUpdate>,
1821
processor: LogsProcessor,
1922
aggregator_handle: AggregatorHandle,
2023
cancel_token: CancellationToken,
@@ -28,7 +31,7 @@ impl LogsAgent {
2831
event_bus: Sender<Event>,
2932
aggregator_handle: AggregatorHandle,
3033
is_managed_instance_mode: bool,
31-
) -> (Self, Sender<TelemetryEvent>) {
34+
) -> (Self, Sender<TelemetryEvent>, Sender<DurableContextUpdate>) {
3235
let processor = LogsProcessor::new(
3336
Arc::clone(&datadog_config),
3437
tags_provider,
@@ -38,16 +41,18 @@ impl LogsAgent {
3841
);
3942

4043
let (tx, rx) = mpsc::channel::<TelemetryEvent>(1000);
44+
let (durable_context_tx, durable_context_rx) = mpsc::channel::<DurableContextUpdate>(500);
4145
let cancel_token = CancellationToken::new();
4246

4347
let agent = Self {
4448
rx,
49+
durable_context_rx,
4550
processor,
4651
aggregator_handle,
4752
cancel_token,
4853
};
4954

50-
(agent, tx)
55+
(agent, tx, durable_context_tx)
5156
}
5257

5358
pub async fn spin(&mut self) {
@@ -56,10 +61,13 @@ impl LogsAgent {
5661
Some(event) = self.rx.recv() => {
5762
self.processor.process(event, &self.aggregator_handle).await;
5863
}
64+
Some(update) = self.durable_context_rx.recv() => {
65+
self.processor.process_durable_context_update(update, &self.aggregator_handle);
66+
}
5967
() = self.cancel_token.cancelled() => {
6068
debug!("LOGS_AGENT | Received shutdown signal, draining remaining events");
6169

62-
// Drain remaining events
70+
// Drain remaining telemetry events
6371
let mut last_drain_log_time = Instant::now().checked_sub(DRAIN_LOG_INTERVAL).expect("Failed to subtract interval from now");
6472
'drain_logs_loop: loop {
6573
match self.rx.try_recv() {
@@ -82,6 +90,15 @@ impl LogsAgent {
8290
}
8391
}
8492

93+
// Drain any pending durable context updates before flushing held logs,
94+
// to maximise the chance of decorating logs with execution context.
95+
while let Ok(update) = self.durable_context_rx.try_recv() {
96+
self.processor.process_durable_context_update(update, &self.aggregator_handle);
97+
}
98+
99+
// Drain remaining held logs without durable context tags so no logs are lost.
100+
self.processor.drain_held_logs(&self.aggregator_handle);
101+
85102
break;
86103
}
87104
}

bottlecap/src/logs/aggregator.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ mod tests {
106106
lambda: Lambda {
107107
arn: "arn".to_string(),
108108
request_id: Some("request_id".to_string()),
109+
..Lambda::default()
109110
},
110111
timestamp: 0,
111112
status: "status".to_string(),
@@ -130,6 +131,7 @@ mod tests {
130131
lambda: Lambda {
131132
arn: "arn".to_string(),
132133
request_id: Some("request_id".to_string()),
134+
..Lambda::default()
133135
},
134136
timestamp: 0,
135137
status: "status".to_string(),
@@ -156,6 +158,7 @@ mod tests {
156158
lambda: Lambda {
157159
arn: "arn".to_string(),
158160
request_id: Some("request_id".to_string()),
161+
..Lambda::default()
159162
},
160163
timestamp: 0,
161164
status: "status".to_string(),
@@ -196,6 +199,7 @@ mod tests {
196199
lambda: Lambda {
197200
arn: "arn".to_string(),
198201
request_id: Some("request_id".to_string()),
202+
..Lambda::default()
199203
},
200204
timestamp: 0,
201205
status: "status".to_string(),

bottlecap/src/logs/aggregator_service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ mod tests {
122122
lambda: Lambda {
123123
arn: "arn".to_string(),
124124
request_id: Some("request_id".to_string()),
125+
..Lambda::default()
125126
},
126127
timestamp: 0,
127128
status: "status".to_string(),

0 commit comments

Comments
 (0)