Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datadog_lambda/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ def patch_all():

if config.trace_enabled:
patch_all_dd()
# Todo: remove this for PR. This is just a testing helper
# Manually patch the durable execution integration since it may not
# be registered in the PyPI ddtrace's _monkey.py yet.
try:
from ddtrace import patch as _patch_dd
_patch_dd(aws_durable_execution_sdk_python=True)
except Exception:
pass
else:
_patch_http()
_ensure_patch_requests()
Expand Down
184 changes: 184 additions & 0 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,180 @@ def get_injected_authorizer_data(event, is_http_api) -> dict:
logger.debug("Failed to check if invocated by an authorizer. error %s", e)


def is_durable_execution_replay(event):
"""
Check if this Lambda invocation is a durable execution replay.

A replay occurs when there are existing operations in InitialExecutionState,
meaning this invocation is resuming from a previous checkpoint rather than
starting fresh.

For replay invocations, we should skip creating inferred spans because:
- The trace context is being continued from the checkpoint
- Creating an inferred span would create a duplicate

Returns:
True if this is a replay invocation (should skip inferred span)
False if this is first invocation or not a durable execution
"""
if not isinstance(event, dict):
return False

if "DurableExecutionArn" not in event:
return False

initial_state = event.get("InitialExecutionState", {})
operations = initial_state.get("Operations", [])

# The SDK always includes the EXECUTION operation itself (1 operation on first invocation).
# A replay has >1 operations (the EXECUTION + previously completed operations).
# This aligns with the SDK's ReplayStatus logic in execution.py.
is_replay = len(operations) > 1

if is_replay:
print(f"[DD-DURABLE] Detected replay invocation with {len(operations)} existing operations")
else:
print(f"[DD-DURABLE] Detected first invocation ({len(operations)} operations)")

return is_replay


def extract_context_from_durable_execution(event, lambda_context):
"""
Extract Datadog trace context from AWS Lambda Durable Execution event.

Looks for extra trace context checkpoints created by the dd-trace plugin.
These are STEP operations with Name="_dd_trace_context" that store trace
headers in their StepDetails.Result payload. Customer operation payloads
are never read or modified.

Scans operations in reverse to find the LAST trace checkpoint, which
corresponds to the most recently completed customer operation. This gives
proper parent chaining: each invocation's root span is parented to the
last operation span from the previous invocation.
"""
try:
if not isinstance(event, dict):
return None

if "DurableExecutionArn" not in event or "InitialExecutionState" not in event:
return None

print("[DD-DURABLE] Detected AWS Lambda Durable Execution event")

initial_state = event.get("InitialExecutionState", {})
operations = initial_state.get("Operations", [])

print(f"[DD-DURABLE] Found {len(operations)} operations in InitialExecutionState")

# Scan in reverse to find the LAST trace context checkpoint
# (corresponds to the most recently completed customer operation)
for idx in range(len(operations) - 1, -1, -1):
operation = operations[idx]
op_name = operation.get("Name")

if op_name != "_dd_trace_context":
continue

operation_id = operation.get("Id")
print(f"[DD-DURABLE] Found trace checkpoint: id={operation_id}, index={idx}")

# Trace context is in StepDetails.Result (standard STEP format)
step_details = operation.get("StepDetails", {})
payload_str = step_details.get("Result")

if not payload_str:
print(f"[DD-DURABLE] Trace checkpoint {operation_id} has no Result, skipping")
continue

try:
payload = json.loads(payload_str)
if not isinstance(payload, dict):
print(f"[DD-DURABLE] Trace checkpoint payload is not a dict: {type(payload)}")
continue

trace_id = payload.get("x-datadog-trace-id")
span_id = payload.get("x-datadog-parent-id")

if trace_id and span_id:
# Use HTTPPropagator to restore full context including
# baggage, _dd.p.* tags, origin, and sampling priority
context = propagator.extract(payload)
if context and context.trace_id:
print(f"[DD-DURABLE] Extracted trace context from trace checkpoint {operation_id}")
print(f"[DD-DURABLE] trace_id={trace_id}, span_id={span_id}, headers={list(payload.keys())}")
logger.debug(
"Extracted Datadog trace context from trace checkpoint %s: %s",
operation_id,
context,
)
return context
except (json.JSONDecodeError, TypeError, ValueError) as e:
print(f"[DD-DURABLE] Failed to parse trace checkpoint payload: {e}")
logger.debug("Failed to parse trace checkpoint payload: %s", e)
continue

print("[DD-DURABLE] No trace context checkpoints found in operations")
except Exception as e:
logger.debug("Failed to extract trace context from durable execution: %s", e)

return None


def create_durable_execution_root_span(event):
"""
Create the durable execution root span on the FIRST invocation only.

Component 1 & 4 of extracheckpoint trace propagation:
- First invocation (no checkpoint context): creates root span, returns it
- Subsequent invocations (checkpoint context found): returns None
(context already activated by extract_context_from_durable_execution,
no need to recreate root span)

Returns the root span (caller must call span.finish() when invocation ends),
or None if not a durable execution or if this is a replay.
"""
print(f"[DD-DURABLE] create_durable_execution_root_span called, event type={type(event).__name__}")
try:
if not isinstance(event, dict):
return None

execution_arn = event.get("DurableExecutionArn")
has_initial_state = "InitialExecutionState" in event
if not execution_arn or not has_initial_state:
return None

# Component 4: On replay, context is already activated from checkpoint.
# Don't recreate root span — it was already emitted in a prior invocation.
if is_durable_execution_replay(event):
print("[DD-DURABLE] Replay invocation — skipping root span creation (context from checkpoint)")
return None

# Component 1: First invocation — create new root span
service_name = os.environ.get("DD_DURABLE_EXECUTION_SERVICE") or "aws.durable-execution"
resource = execution_arn.split(":")[-1] if ":" in execution_arn else execution_arn

span = tracer.trace(
"aws.durable-execution",
service=service_name,
resource=resource,
span_type="serverless",
)

if span:
span.set_tag("durable.execution_arn", execution_arn)
print(f"[DD-DURABLE] Created root span: trace_id={span.trace_id}, span_id={span.span_id}, resource={resource}")
else:
print("[DD-DURABLE] tracer.trace() returned None")

return span

except Exception as e:
logger.debug("Failed to create durable execution root span: %s", e)
print(f"[DD-DURABLE] Failed to create root span: {e}")
return None


def extract_dd_trace_context(
event, lambda_context, extractor=None, decode_authorizer_context: bool = True
):
Expand All @@ -634,6 +808,16 @@ def extract_dd_trace_context(
trace_context_source = None
event_source = parse_event_source(event)

# Check for AWS Lambda Durable Execution events first (before other checks)
# This ensures trace context is properly continued across durable invocations
durable_context = extract_context_from_durable_execution(event, lambda_context)
if _is_context_complete(durable_context):
logger.debug("Extracted Datadog trace context from durable execution")
dd_trace_context = durable_context
trace_context_source = TraceContextSource.EVENT
logger.debug("extracted dd trace context from durable execution: %s", dd_trace_context)
return dd_trace_context, trace_context_source, event_source

if extractor is not None:
context = extract_context_custom_extractor(extractor, event, lambda_context)
elif isinstance(event, (set, dict)) and "request" in event:
Expand Down
78 changes: 77 additions & 1 deletion datadog_lambda/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
create_inferred_span,
InferredSpanInfo,
is_authorizer_response,
is_durable_execution_replay,
create_durable_execution_root_span,
tracer,
propagator,
)
Expand Down Expand Up @@ -149,6 +151,8 @@ def __init__(self, func):
self.inferred_span = None
self.response = None
self.blocking_response = None
self.durable_root_span = None
self.is_durable = False

if config.profiling_enabled and profiler:
self.prof = profiler.Profiler(env=config.env, service=config.service)
Expand Down Expand Up @@ -269,14 +273,50 @@ def _before(self, event, context):

if config.trace_enabled:
set_dd_trace_py_root(trace_context_source, config.merge_xray_traces)
if config.make_inferred_span:
# Skip inferred span for durable execution replays to avoid duplicates
# For replays, trace context comes from checkpoint, not from event trigger
if config.make_inferred_span and not is_durable_execution_replay(event):
self.inferred_span = create_inferred_span(
event, context, event_source, config.decode_authorizer_context
)

if config.appsec_enabled:
asm_set_context(event_source)

# For durable executions: create root span BEFORE aws.lambda span
# so aws.lambda becomes a child of the root durable execution span.
self.is_durable = isinstance(event, dict) and "DurableExecutionArn" in event
if self.is_durable:
# Set _reactivate on the active context so it persists after
# all spans close. This prevents ddtrace from purging the
# context when the last span (aws.lambda or root) finishes.
active_ctx = tracer.context_provider.active()
if active_ctx and hasattr(active_ctx, '_reactivate'):
active_ctx._reactivate = True
print(f"[DD-DURABLE] Set _reactivate=True on active context")

# For replay: copy _meta from extracted context for _dd.p.* tag propagation
# set_dd_trace_py_root only copies trace_id/span_id/sampling_priority,
# so propagation tags from the checkpoint would be lost without this.
if dd_context and hasattr(dd_context, '_meta') and active_ctx and hasattr(active_ctx, '_meta'):
for k, v in dd_context._meta.items():
if k not in active_ctx._meta:
active_ctx._meta[k] = v

# Component 1: Create root span (first invocation only)
# Component 4: On replays, returns None (context from checkpoint)
self.durable_root_span = create_durable_execution_root_span(event)

# Store root span reference for Component 2 (checkpoint save)
if self.durable_root_span:
try:
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import set_durable_root_span
set_durable_root_span(self.durable_root_span)
except Exception:
pass

# Create aws.lambda span — child of root durable span (first invocation)
# or child of checkpoint context (replay), or normal parent otherwise
self.span = create_function_execution_span(
context=context,
function_name=config.function_name,
Expand All @@ -289,6 +329,7 @@ def _before(self, event, context):
parent_span=self.inferred_span,
span_pointers=calculate_span_pointers(event_source, event),
)

if config.appsec_enabled:
asm_start_request(self.span, event, event_source, self.trigger_tags)
self.blocking_response = get_asm_blocked_response(self.event_source)
Expand Down Expand Up @@ -352,6 +393,41 @@ def _after(self, event, context):

self.span.finish()

# Component 3: After aws.lambda closes but BEFORE root span closes,
# check if trace context was enriched during this invocation.
# The context is still alive because _reactivate=True on the parent context.
# This is best-effort for the end-of-invocation case; the piggyback in
# _patched_create_checkpoint handles saves during handler execution.
if self.is_durable:
try:
from ddtrace.contrib.internal.aws_durable_execution_sdk_python.patch import (
_has_context_changed, save_updated_trace_checkpoint,
_get_current_execution_state, _inject_current_context,
)
state = _get_current_execution_state()
if state:
before_headers = getattr(state, "_dd_before_trace_headers", None)
if before_headers is not None and _has_context_changed(before_headers):
print("[DD-DURABLE] Context changed at end of invocation, saving updated checkpoint")
state._dd_saving_trace_checkpoint = True
try:
save_updated_trace_checkpoint(state)
finally:
state._dd_saving_trace_checkpoint = False
current = _inject_current_context()
if current:
state._dd_before_trace_headers = current
except Exception as e:
print(f"[DD-DURABLE] Best-effort context change check in _after: {e}")

# Finish durable execution root span LAST (Component 1)
# Component 2 (root checkpoint) is handled in _patched_create_checkpoint
# piggybacked on the first operation's checkpoint call.
if self.durable_root_span:
self.durable_root_span.finish()
print(f"[DD-DURABLE] Finished root span: trace_id={self.durable_root_span.trace_id}, span_id={self.durable_root_span.span_id}")
self.durable_root_span = None

if status_code:
self.trigger_tags["http.status_code"] = status_code
mark_trace_as_error_for_5xx_responses(context, status_code, self.span)
Expand Down
Loading