-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathdsm.py
More file actions
121 lines (91 loc) · 3.5 KB
/
dsm.py
File metadata and controls
121 lines (91 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import logging
import json
import base64
from datadog_lambda.trigger import EventTypes
logger = logging.getLogger(__name__)
def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
elif event_source.equals(EventTypes.SNS):
_dsm_set_sns_context(event)
elif event_source.equals(EventTypes.KINESIS):
_dsm_set_kinesis_context(event)
def _dsm_set_sqs_context(event):
records = event.get("Records")
if records is None:
return
for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "sqs", arn)
def _dsm_set_sns_context(event):
records = event.get("Records")
if records is None:
return
for record in records:
sns_data = record.get("Sns")
if not sns_data:
return
arn = sns_data.get("TopicArn", "")
_set_dsm_context_for_record(sns_data, "sns", arn)
def _dsm_set_kinesis_context(event):
records = event.get("Records")
if records is None:
return
for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "kinesis", arn)
def _set_dsm_context_for_record(record, type, arn):
from ddtrace.data_streams import set_consume_checkpoint
try:
context_json = _get_dsm_context_from_lambda(record)
if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return
carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")
def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
"""
context_json = None
message_body = message
if "kinesis" in message:
try:
kinesis_data = json.loads(
base64.b64decode(message["kinesis"]["data"]).decode()
)
return kinesis_data.get("_datadog")
except (ValueError, TypeError, KeyError):
logger.debug("Unable to parse kinesis data for lambda message")
return None
if "Sns" in message:
message_body = message["Sns"]
message_attributes = message_body.get("MessageAttributes") or message_body.get(
"messageAttributes"
)
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
datadog_attr = message_attributes["_datadog"]
if message_body.get("Type") == "Notification":
# SNS -> lambda notification
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
elif "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
else:
logger.debug("DataStreams did not handle lambda message: %r", message)
return context_json
def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)
return carrier_get