-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathdsm.py
More file actions
68 lines (49 loc) · 1.89 KB
/
dsm.py
File metadata and controls
68 lines (49 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import logging
import json
from datadog_lambda.trigger import EventTypes
logger = logging.getLogger(__name__)
def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
def _dsm_set_sqs_context(event):
records = event.get("Records")
if records is None:
return
for record in records:
arn = record.get("eventSourceARN", "")
_set_dsm_context_for_record(record, "sqs", arn)
def _set_dsm_context_for_record(record, type, arn):
from ddtrace.data_streams import set_consume_checkpoint
try:
context_json = _get_dsm_context_from_lambda(record)
if not context_json:
logger.debug("DataStreams skipped lambda message: %r", record)
return
carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")
def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
datadog_attr = message_attributes["_datadog"]
if "stringValue" in datadog_attr:
# SQS -> lambda
context_json = json.loads(datadog_attr["stringValue"])
else:
logger.debug("DataStreams did not handle lambda message: %r", message)
return context_json
def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)
return carrier_get