-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathdsm.py
More file actions
61 lines (44 loc) · 1.67 KB
/
dsm.py
File metadata and controls
61 lines (44 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import logging
import json
from datadog_lambda.trigger import EventTypes
logger = logging.getLogger(__name__)
def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
def _dsm_set_sqs_context(event):
from ddtrace.data_streams import set_consume_checkpoint
records = event.get("Records")
if records is None:
return
for record in records:
arn = record.get("eventSourceARN", "")
context_json = _get_dsm_context_from_lambda(record)
if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return None
carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint("sqs", arn, carrier_get)
def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
datadog_attr = message_attributes["_datadog"]
if "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
else:
logger.debug("DataStreams did not handle lambda message: %r", message)
return context_json
def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)
return carrier_get