Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 97 additions & 12 deletions datadog_lambda/dsm.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
import json
import base64

from ddtrace.internal.logger import get_logger
Comment thread
michael-zhao459 marked this conversation as resolved.
Outdated
from datadog_lambda import logger
from datadog_lambda.trigger import EventTypes

log = get_logger(__name__)

def set_dsm_context(event, event_source):

def set_dsm_context(event, event_source):
if event_source.equals(EventTypes.SQS):
_dsm_set_sqs_context(event)


def _dsm_set_sqs_context(event):
def _dsm_set_context_helper(
event, service_type, arn_extractor, payload_size_calculator
):
"""
Common helper function for setting DSM context.

Args:
event: The Lambda event containing records
service_type: The service type string (example: sqs', 'sns')
arn_extractor: Function to extract the ARN from the record
payload_size_calculator: Function to calculate payload size
"""
from datadog_lambda.wrapper import format_err_with_traceback
from ddtrace.internal.datastreams import data_streams_processor
from ddtrace.internal.datastreams.processor import DsmPathwayCodec
from ddtrace.internal.datastreams.botocore import (
get_datastreams_context,
calculate_sqs_payload_size,
)

records = event.get("Records")
if records is None:
Expand All @@ -24,15 +36,88 @@ def _dsm_set_sqs_context(event):

for record in records:
try:
queue_arn = record.get("eventSourceARN", "")

contextjson = get_datastreams_context(record)
payload_size = calculate_sqs_payload_size(record)
arn = arn_extractor(record)
context_json = _get_dsm_context_from_lambda(record)
payload_size = payload_size_calculator(record, context_json)

ctx = DsmPathwayCodec.decode(contextjson, processor)
ctx = DsmPathwayCodec.decode(context_json, processor)
ctx.set_checkpoint(
["direction:in", f"topic:{queue_arn}", "type:sqs"],
["direction:in", f"topic:{arn}", f"type:{service_type}"],
payload_size=payload_size,
)
except Exception as e:
logger.error(format_err_with_traceback(e))


def _dsm_set_sqs_context(event):
from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size

def sqs_payload_calculator(record, context_json):
return calculate_sqs_payload_size(record)

def sqs_arn_extractor(record):
return record.get("eventSourceARN", "")
Comment thread
michael-zhao459 marked this conversation as resolved.
Outdated

_dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator)


def _get_dsm_context_from_lambda(message):
"""
Lambda-specific message formats:
- message.messageAttributes._datadog.stringValue (SQS -> lambda)
- message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)
- message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)
- message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)
- message.kinesis.data.decode()._datadog (Kinesis -> lambda)
"""
context_json = None
message_body = message

if "kinesis" in message:
Comment thread
michael-zhao459 marked this conversation as resolved.
Outdated
try:
kinesis_data = json.loads(
base64.b64decode(message["kinesis"]["data"]).decode()
)
return kinesis_data.get("_datadog")
except (ValueError, TypeError, KeyError):
log.debug("Unable to parse kinesis data for lambda message")
return None
elif "Sns" in message:
message_body = message["Sns"]
else:
try:
body = message.get("body")
if body:
message_body = json.loads(body)
except (ValueError, TypeError):
log.debug("Unable to parse lambda message body as JSON, treat as non-json")

message_attributes = message_body.get("MessageAttributes") or message_body.get(
"messageAttributes"
)
if not message_attributes:
log.debug("DataStreams skipped lambda message: %r", message)
return None

if "_datadog" not in message_attributes:
log.debug("DataStreams skipped lambda message: %r", message)
return None

datadog_attr = message_attributes["_datadog"]

if message_body.get("Type") == "Notification":
# SNS -> lambda notification
if datadog_attr.get("Type") == "Binary":
context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode())
elif "stringValue" in datadog_attr:
# SQS -> lambda
Copy link
Copy Markdown
Contributor

@purple4reina purple4reina Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the event_type to avoid doing unnecessary work. We should already mostly know the shape of the event. Without doing so, this method is gonna get insanely large.

I would recommend creating a separate _get_dsm_context for each event type.

context_json = json.loads(datadog_attr["stringValue"])
elif "binaryValue" in datadog_attr:
# SNS -> SQS -> lambda, raw message delivery
context_json = json.loads(
base64.b64decode(datadog_attr["binaryValue"]).decode()
)
else:
log.debug("DataStreams did not handle lambda message: %r", message)

return context_json
237 changes: 235 additions & 2 deletions tests/test_dsm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import unittest
import json
import base64
from unittest.mock import patch, MagicMock

from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context
from datadog_lambda.dsm import (
set_dsm_context,
_dsm_set_sqs_context,
_get_dsm_context_from_lambda,
)
from datadog_lambda.trigger import EventTypes, _EventSource


class TestDsmSQSContext(unittest.TestCase):
class TestDSMContext(unittest.TestCase):
def setUp(self):
patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context")
self.mock_dsm_set_sqs_context = patcher.start()
Expand Down Expand Up @@ -110,3 +116,230 @@ def test_sqs_multiple_records_process_each_record(self):
self.assertIn(f"topic:{expected_arns[i]}", tags)
self.assertIn("type:sqs", tags)
self.assertEqual(kwargs["payload_size"], 100)


class TestGetDSMContext(unittest.TestCase):
def test_sqs_to_lambda_string_value_format(self):
"""Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "789123456",
"x-datadog-parent-id": "321987654",
"dd-pathway-ctx": "test-pathway-ctx",
}

lambda_record = {
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185",
},
"messageAttributes": {
"_datadog": {
"stringValue": json.dumps(trace_context),
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
"myAttribute": {
"stringValue": "myValue",
"stringListValues": [],
"binaryListValues": [],
"dataType": "String",
},
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
"awsRegion": "us-east-2",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "789123456"
assert result["x-datadog-parent-id"] == "321987654"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_lambda_format(self):
"""Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "111111111",
"x-datadog-parent-id": "222222222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8")

sns_lambda_record = {
"EventSource": "aws:sns",
"EventSubscriptionArn": (
"arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012"
),
"Sns": {
"Type": "Notification",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic",
"Subject": "Test Subject",
"Message": "Hello from SNS!",
"Timestamp": "2023-01-01T12:00:00.000Z",
"MessageAttributes": {
"_datadog": {"Type": "Binary", "Value": binary_data}
},
},
}

result = _get_dsm_context_from_lambda(sns_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "111111111"
assert result["x-datadog-parent-id"] == "222222222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_binary_value_format(self):
"""Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)"""
trace_context = {
"x-datadog-trace-id": "777666555",
"x-datadog-parent-id": "444333222",
"dd-pathway-ctx": "test-pathway-ctx",
}
binary_data = base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8")

lambda_record = {
"messageId": "test-message-id",
"receiptHandle": "test-receipt-handle",
"body": "Test message body",
"messageAttributes": {
"_datadog": {"binaryValue": binary_data, "dataType": "Binary"}
},
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "777666555"
assert result["x-datadog-parent-id"] == "444333222"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_sns_to_sqs_to_lambda_body_format(self):
"""Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)"""
trace_context = {
"x-datadog-trace-id": "123987456",
"x-datadog-parent-id": "654321987",
"x-datadog-sampling-priority": "1",
"dd-pathway-ctx": "test-pathway-ctx",
}

message_body = {
"Type": "Notification",
"MessageId": "test-message-id",
"Message": "Test message from SNS",
"MessageAttributes": {
"_datadog": {
"Type": "Binary",
"Value": base64.b64encode(
json.dumps(trace_context).encode("utf-8")
).decode("utf-8"),
}
},
}

lambda_record = {
"messageId": "lambda-message-id",
"body": json.dumps(message_body),
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue",
}

result = _get_dsm_context_from_lambda(lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "123987456"
assert result["x-datadog-parent-id"] == "654321987"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_kinesis_to_lambda_format(self):
"""Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)"""
trace_context = {
"x-datadog-trace-id": "555444333",
"x-datadog-parent-id": "888777666",
"dd-pathway-ctx": "test-pathway-ctx",
}

# Create the kinesis data payload
kinesis_payload = {
"_datadog": trace_context,
"actualData": "some business data",
}
encoded_kinesis_data = base64.b64encode(
json.dumps(kinesis_payload).encode("utf-8")
).decode("utf-8")

kinesis_lambda_record = {
"eventSource": "aws:kinesis",
"eventSourceARN": (
"arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
),
"kinesis": {
"data": encoded_kinesis_data,
"partitionKey": "partition-key-1",
"sequenceNumber": (
"49590338271490256608559692538361571095921575989136588898"
),
},
}

result = _get_dsm_context_from_lambda(kinesis_lambda_record)

assert result is not None
assert result == trace_context
assert result["x-datadog-trace-id"] == "555444333"
assert result["x-datadog-parent-id"] == "888777666"
assert result["dd-pathway-ctx"] == "test-pathway-ctx"

def test_no_message_attributes(self):
"""Test message without MessageAttributes returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message without attributes",
}

result = _get_dsm_context_from_lambda(message)

assert result is None

def test_no_datadog_attribute(self):
"""Test message with MessageAttributes but no _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"body": "Test message",
"messageAttributes": {
"customAttribute": {"stringValue": "custom-value", "dataType": "String"}
},
}

result = _get_dsm_context_from_lambda(message)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] newline here

assert result is None

def test_empty_datadog_attribute(self):
"""Test message with empty _datadog attribute returns None."""
message = {
"messageId": "test-message-id",
"messageAttributes": {"_datadog": {}},
}

result = _get_dsm_context_from_lambda(message)

assert result is None
Loading