-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathdurable.py
More file actions
70 lines (59 loc) · 2.42 KB
/
durable.py
File metadata and controls
70 lines (59 loc) · 2.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# Unless explicitly stated otherwise all files in this repository are licensed
# under the Apache License Version 2.0.
# This product includes software developed at Datadog (https://www.datadoghq.com/).
# Copyright 2019 Datadog, Inc.
import json
import logging
import re
import sys
logger = logging.getLogger(__name__)
# When changing the schema of the durable invocation log, bump this version so
# the extension can handle compatibility between old and new schemas.
DURABLE_INVOCATION_LOG_SCHEMA_VERSION = "1.0.0"
def _parse_durable_execution_arn(arn):
"""
Parses a DurableExecutionArn to extract execution name and ID.
ARN format:
arn:aws:lambda:{region}:{account}:function:{func}:{version}/durable-execution/{name}/{id}
Returns (execution_name, execution_id) or None if parsing fails.
"""
match = re.search(r"/durable-execution/([^/]+)/([^/]+)$", arn)
if not match:
return None
execution_name, execution_id = match.group(1), match.group(2)
if not execution_name or not execution_id:
return None
return execution_name, execution_id
def extract_durable_function_tags(event):
"""
Extracts durable function tags from the Lambda event payload.
Returns a dict with durable function tags, or an empty dict if the event
is not a durable function invocation.
"""
if not isinstance(event, dict):
return {}
durable_execution_arn = event.get("DurableExecutionArn")
if not isinstance(durable_execution_arn, str):
return {}
parsed = _parse_durable_execution_arn(durable_execution_arn)
if not parsed:
logger.error("Failed to parse DurableExecutionArn: %s", durable_execution_arn)
return {}
execution_name, execution_id = parsed
return {
"durable_function_execution_name": execution_name,
"durable_function_execution_id": execution_id,
}
def emit_durable_execution_log(request_id, execution_name, execution_id):
"""
Emits a structured JSON log to stdout mapping the Lambda request_id to the
durable execution name and ID. This is consumed by the Lambda extension layer
to correlate request IDs with durable executions.
"""
log = {
"request_id": request_id,
"durable_execution_name": execution_name,
"durable_execution_id": execution_id,
"schema_version": DURABLE_INVOCATION_LOG_SCHEMA_VERSION,
}
print(json.dumps(log), file=sys.stdout, flush=True)