diff --git a/datadog_lambda/patch.py b/datadog_lambda/patch.py index 6d2af0dc..134e42d8 100644 --- a/datadog_lambda/patch.py +++ b/datadog_lambda/patch.py @@ -31,6 +31,14 @@ def patch_all(): if config.trace_enabled: patch_all_dd() + # Todo: remove this for PR. This is just a testing helper + # Manually patch the durable execution integration since it may not + # be registered in the PyPI ddtrace's _monkey.py yet. + try: + from ddtrace import patch as _patch_dd + _patch_dd(aws_durable_execution_sdk_python=True) + except Exception: + pass else: _patch_http() _ensure_patch_requests() diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 3c7d9f11..29bc8064 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -621,6 +621,180 @@ def get_injected_authorizer_data(event, is_http_api) -> dict: logger.debug("Failed to check if invocated by an authorizer. error %s", e) +def is_durable_execution_replay(event): + """ + Check if this Lambda invocation is a durable execution replay. + + A replay occurs when there are existing operations in InitialExecutionState, + meaning this invocation is resuming from a previous checkpoint rather than + starting fresh. + + For replay invocations, we should skip creating inferred spans because: + - The trace context is being continued from the checkpoint + - Creating an inferred span would create a duplicate + + Returns: + True if this is a replay invocation (should skip inferred span) + False if this is first invocation or not a durable execution + """ + if not isinstance(event, dict): + return False + + if "DurableExecutionArn" not in event: + return False + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + # The SDK always includes the EXECUTION operation itself (1 operation on first invocation). + # A replay has >1 operations (the EXECUTION + previously completed operations). + # This aligns with the SDK's ReplayStatus logic in execution.py. + is_replay = len(operations) > 1 + + if is_replay: + print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations") + else: + print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)") + + return is_replay + + +def extract_context_from_durable_execution(event, lambda_context): + """ + Extract Datadog trace context from AWS Lambda Durable Execution event. + + Looks for extra trace context checkpoints created by the dd-trace plugin. + These are STEP operations with Name="_dd_trace_context" that store trace + headers in their StepDetails.Result payload. Customer operation payloads + are never read or modified. + + Scans operations in reverse to find the LAST trace checkpoint, which + corresponds to the most recently completed customer operation. This gives + proper parent chaining: each invocation's root span is parented to the + last operation span from the previous invocation. + """ + try: + if not isinstance(event, dict): + return None + + if "DurableExecutionArn" not in event or "InitialExecutionState" not in event: + return None + + print("[DD-DURABLE] Detected AWS Lambda Durable Execution event") + + initial_state = event.get("InitialExecutionState", {}) + operations = initial_state.get("Operations", []) + + print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState") + + # Scan in reverse to find the LAST trace context checkpoint + # (corresponds to the most recently completed customer operation) + for idx in range(len(operations) - 1, -1, -1): + operation = operations[idx] + op_name = operation.get("Name") + + if op_name != "_dd_trace_context": + continue + + operation_id = operation.get("Id") + print(f"[DD-DURABLE] Found trace checkpoint: id={operation_id}, index={idx}") + + # Trace context is in StepDetails.Result (standard STEP format) + step_details = operation.get("StepDetails", {}) + payload_str = step_details.get("Result") + + if not payload_str: + print(f"[DD-DURABLE] Trace checkpoint {operation_id} has no Result, skipping") + continue + + try: + payload = json.loads(payload_str) + if not isinstance(payload, dict): + print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}") + continue + + trace_id = payload.get("x-datadog-trace-id") + span_id = payload.get("x-datadog-parent-id") + + if trace_id and span_id: + # Use HTTPPropagator to restore full context including + # baggage, _dd.p.* tags, origin, and sampling priority + context = propagator.extract(payload) + if context and context.trace_id: + print(f"[DD-DURABLE] Extracted trace context from trace checkpoint {operation_id}") + print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, headers={list(payload.keys())}") + logger.debug( + "Extracted Datadog trace context from trace checkpoint %s: %s", + operation_id, + context, + ) + return context + except (json.JSONDecodeError, TypeError, ValueError) as e: + print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}") + logger.debug("Failed to parse trace checkpoint payload: %s", e) + continue + + print("[DD-DURABLE] No trace context checkpoints found in operations") + except Exception as e: + logger.debug("Failed to extract trace context from durable execution: %s", e) + + return None + + +def create_durable_execution_root_span(event): + """ + Create the durable execution root span on the FIRST invocation only. + + Component 1 & 4 of extracheckpoint trace propagation: + - First invocation (no checkpoint context): creates root span, returns it + - Subsequent invocations (checkpoint context found): returns None + (context already activated by extract_context_from_durable_execution, + no need to recreate root span) + + Returns the root span (caller must call span.finish() when invocation ends), + or None if not a durable execution or if this is a replay. + """ + print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}") + try: + if not isinstance(event, dict): + return None + + execution_arn = event.get("DurableExecutionArn") + has_initial_state = "InitialExecutionState" in event + if not execution_arn or not has_initial_state: + return None + + # Component 4: On replay, context is already activated from checkpoint. + # Don't recreate root span — it was already emitted in a prior invocation. + if is_durable_execution_replay(event): + print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)") + return None + + # Component 1: First invocation — create new root span + service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution" + resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn + + span = tracer.trace( + "aws.durable-execution", + service=service_name, + resource=resource, + span_type="serverless", + ) + + if span: + span.set_tag("durable.execution_arn", execution_arn) + print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}") + else: + print("[DD-DURABLE] tracer.trace() returned None") + + return span + + except Exception as e: + logger.debug("Failed to create durable execution root span: %s", e) + print(f"[DD-DURABLE] Failed to create root span: {e}") + return None + + def extract_dd_trace_context( event, lambda_context, extractor=None, decode_authorizer_context: bool = True ): @@ -634,6 +808,16 @@ def extract_dd_trace_context( trace_context_source = None event_source = parse_event_source(event) + # Check for AWS Lambda Durable Execution events first (before other checks) + # This ensures trace context is properly continued across durable invocations + durable_context = extract_context_from_durable_execution(event, lambda_context) + if _is_context_complete(durable_context): + logger.debug("Extracted Datadog trace context from durable execution") + dd_trace_context = durable_context + trace_context_source = TraceContextSource.EVENT + logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context) + return dd_trace_context, trace_context_source, event_source + if extractor is not None: context = extract_context_custom_extractor(extractor, event, lambda_context) elif isinstance(event, (set, dict)) and "request" in event: diff --git a/datadog_lambda/wrapper.py b/datadog_lambda/wrapper.py index 767816a5..292fa25c 100644 --- a/datadog_lambda/wrapper.py +++ b/datadog_lambda/wrapper.py @@ -39,6 +39,8 @@ create_inferred_span, InferredSpanInfo, is_authorizer_response, + is_durable_execution_replay, + create_durable_execution_root_span, tracer, propagator, ) @@ -149,6 +151,8 @@ def __init__(self, func): self.inferred_span = None self.response = None self.blocking_response = None + self.durable_root_span = None + self.is_durable = False if config.profiling_enabled and profiler: self.prof = profiler.Profiler(env=config.env, service=config.service) @@ -269,7 +273,9 @@ def _before(self, event, context): if config.trace_enabled: set_dd_trace_py_root(trace_context_source, config.merge_xray_traces) - if config.make_inferred_span: + # Skip inferred span for durable execution replays to avoid duplicates + # For replays, trace context comes from checkpoint, not from event trigger + if config.make_inferred_span and not is_durable_execution_replay(event): self.inferred_span = create_inferred_span( event, context, event_source, config.decode_authorizer_context ) @@ -277,6 +283,40 @@ def _before(self, event, context): if config.appsec_enabled: asm_set_context(event_source) + # For durable executions: create root span BEFORE aws.lambda span + # so aws.lambda becomes a child of the root durable execution span. + self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event + if self.is_durable: + # Set _reactivate on the active context so it persists after + # all spans close. This prevents ddtrace from purging the + # context when the last span (aws.lambda or root) finishes. + active_ctx = tracer.context_provider.active() + if active_ctx and hasattr(active_ctx, '_reactivate'): + active_ctx._reactivate = True + print(f"[DD-DURABLE] Set _reactivate=True on active context") + + # For replay: copy _meta from extracted context for _dd.p.* tag propagation + # set_dd_trace_py_root only copies trace_id/span_id/sampling_priority, + # so propagation tags from the checkpoint would be lost without this. + if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'): + for k, v in dd_context._meta.items(): + if k not in active_ctx._meta: + active_ctx._meta[k] = v + + # Component 1: Create root span (first invocation only) + # Component 4: On replays, returns None (context from checkpoint) + self.durable_root_span = create_durable_execution_root_span(event) + + # Store root span reference for Component 2 (checkpoint save) + if self.durable_root_span: + try: + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import set_durable_root_span + set_durable_root_span(self.durable_root_span) + except Exception: + pass + + # Create aws.lambda span — child of root durable span (first invocation) + # or child of checkpoint context (replay), or normal parent otherwise self.span = create_function_execution_span( context=context, function_name=config.function_name, @@ -289,6 +329,7 @@ def _before(self, event, context): parent_span=self.inferred_span, span_pointers=calculate_span_pointers(event_source, event), ) + if config.appsec_enabled: asm_start_request(self.span, event, event_source, self.trigger_tags) self.blocking_response = get_asm_blocked_response(self.event_source) @@ -352,6 +393,41 @@ def _after(self, event, context): self.span.finish() + # Component 3: After aws.lambda closes but BEFORE root span closes, + # check if trace context was enriched during this invocation. + # The context is still alive because _reactivate=True on the parent context. + # This is best-effort for the end-of-invocation case; the piggyback in + # _patched_create_checkpoint handles saves during handler execution. + if self.is_durable: + try: + from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import ( + _has_context_changed, save_updated_trace_checkpoint, + _get_current_execution_state, _inject_current_context, + ) + state = _get_current_execution_state() + if state: + before_headers = getattr(state, "_dd_before_trace_headers", None) + if before_headers is not None and _has_context_changed(before_headers): + print("[DD-DURABLE] Context changed at end of invocation, saving updated checkpoint") + state._dd_saving_trace_checkpoint = True + try: + save_updated_trace_checkpoint(state) + finally: + state._dd_saving_trace_checkpoint = False + current = _inject_current_context() + if current: + state._dd_before_trace_headers = current + except Exception as e: + print(f"[DD-DURABLE] Best-effort context change check in _after: {e}") + + # Finish durable execution root span LAST (Component 1) + # Component 2 (root checkpoint) is handled in _patched_create_checkpoint + # piggybacked on the first operation's checkpoint call. + if self.durable_root_span: + self.durable_root_span.finish() + print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}") + self.durable_root_span = None + if status_code: self.trigger_tags["http.status_code"] = status_code mark_trace_as_error_for_5xx_responses(context, status_code, self.span)