-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathdsm.py
More file actions
69 lines (50 loc) · 1.98 KB
/
dsm.py
File metadata and controls
69 lines (50 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
import json
from datadog_lambda.trigger import EventTypes
logger = logging.getLogger(__name__)
def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
def _dsm_set_sqs_context(event):
records = event.get("Records")
if records is None:
return
for record in records:
arn = record.get("eventSourceARN", "")
try:
context_json = _get_dsm_context_from_sqs_lambda(record)
if not context_json:
return
_set_dsm_context_for_record(context_json, "sqs", arn)
except Exception as e:
logger.error(f"Unable to set dsm context: {e}")
def _set_dsm_context_for_record(context_json, type, arn):
from ddtrace.data_streams import set_consume_checkpoint
carrier_get = _create_carrier_get(context_json)
set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False)
def _get_dsm_context_from_sqs_lambda(message):
"""
Lambda-specific message shape for SQS -> Lambda:
- message.messageAttributes._datadog.stringValue
"""
context_json = None
message_attributes = message.get("messageAttributes")
if not message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
if "_datadog" not in message_attributes:
logger.debug("DataStreams skipped lambda message: %r", message)
return None
datadog_attr = message_attributes["_datadog"]
if "stringValue" in datadog_attr:
context_json = json.loads(datadog_attr["stringValue"])
if not isinstance(context_json, dict):
logger.debug("DataStreams did not handle lambda message: %r", message)
return None
else:
logger.debug("DataStreams did not handle lambda message: %r", message)
return context_json
def _create_carrier_get(context_json):
def carrier_get(key):
return context_json.get(key)
return carrier_get