forked from DataDog/datadog-lambda-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdsm.py
More file actions
38 lines (29 loc) · 1.19 KB
/
dsm.py
File metadata and controls
38 lines (29 loc) · 1.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from datadog_lambda import logger
from datadog_lambda.trigger import EventTypes
def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)
def _dsm_set_sqs_context(event):
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)
records = event.get("Records")
if records is None:
return
processor = data_streams_processor()
for record in records:
try:
queue_arn = record.get("eventSourceARN", "")
contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)
ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
payload_size=payload_size,
)
except Exception as e:
logger.error(format_err_with_traceback(e))