-
Notifications
You must be signed in to change notification settings - Fork 51
refactor: remove tracer dependencies to support dsm sqs -> lambda #612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d7ce695
623f49e
96e3d88
b5a711d
7810ced
e94eb22
54bedbf
823a07f
5356cc6
45ed35f
f729649
24f6ed9
5bc4b8f
3fa3014
c066b8f
235659b
34b9f45
ff6e3c4
1a82f68
8bb2053
5f006de
e28bac4
2509fcc
ab8c871
2089fa9
24851f3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,21 +1,33 @@ | ||
| import json | ||
| import base64 | ||
|
|
||
| from ddtrace.internal.logger import get_logger | ||
| from datadog_lambda import logger | ||
| from datadog_lambda.trigger import EventTypes | ||
|
|
||
| log = get_logger(__name__) | ||
|
|
||
| def set_dsm_context(event, event_source): | ||
|
|
||
| def set_dsm_context(event, event_source): | ||
| if event_source.equals(EventTypes.SQS): | ||
| _dsm_set_sqs_context(event) | ||
|
|
||
|
|
||
| def _dsm_set_sqs_context(event): | ||
| def _dsm_set_context_helper( | ||
| event, service_type, arn_extractor, payload_size_calculator | ||
| ): | ||
| """ | ||
| Common helper function for setting DSM context. | ||
|
|
||
| Args: | ||
| event: The Lambda event containing records | ||
| service_type: The service type string (example: sqs', 'sns') | ||
| arn_extractor: Function to extract the ARN from the record | ||
| payload_size_calculator: Function to calculate payload size | ||
| """ | ||
| from datadog_lambda.wrapper import format_err_with_traceback | ||
| from ddtrace.internal.datastreams import data_streams_processor | ||
| from ddtrace.internal.datastreams.processor import DsmPathwayCodec | ||
| from ddtrace.internal.datastreams.botocore import ( | ||
| get_datastreams_context, | ||
| calculate_sqs_payload_size, | ||
| ) | ||
|
|
||
| records = event.get("Records") | ||
| if records is None: | ||
|
|
@@ -24,15 +36,88 @@ def _dsm_set_sqs_context(event): | |
|
|
||
| for record in records: | ||
| try: | ||
| queue_arn = record.get("eventSourceARN", "") | ||
|
|
||
| contextjson = get_datastreams_context(record) | ||
| payload_size = calculate_sqs_payload_size(record) | ||
| arn = arn_extractor(record) | ||
| context_json = _get_dsm_context_from_lambda(record) | ||
| payload_size = payload_size_calculator(record, context_json) | ||
|
|
||
| ctx = DsmPathwayCodec.decode(contextjson, processor) | ||
| ctx = DsmPathwayCodec.decode(context_json, processor) | ||
| ctx.set_checkpoint( | ||
| ["direction:in", f"topic:{queue_arn}", "type:sqs"], | ||
| ["direction:in", f"topic:{arn}", f"type:{service_type}"], | ||
| payload_size=payload_size, | ||
| ) | ||
| except Exception as e: | ||
| logger.error(format_err_with_traceback(e)) | ||
|
|
||
|
|
||
| def _dsm_set_sqs_context(event): | ||
| from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size | ||
|
|
||
| def sqs_payload_calculator(record, context_json): | ||
| return calculate_sqs_payload_size(record) | ||
|
|
||
| def sqs_arn_extractor(record): | ||
| return record.get("eventSourceARN", "") | ||
|
michael-zhao459 marked this conversation as resolved.
Outdated
|
||
|
|
||
| _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) | ||
|
|
||
|
|
||
| def _get_dsm_context_from_lambda(message): | ||
| """ | ||
| Lambda-specific message formats: | ||
| - message.messageAttributes._datadog.stringValue (SQS -> lambda) | ||
| - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) | ||
| - message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw) | ||
| - message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda) | ||
| - message.kinesis.data.decode()._datadog (Kinesis -> lambda) | ||
| """ | ||
| context_json = None | ||
| message_body = message | ||
|
|
||
| if "kinesis" in message: | ||
|
michael-zhao459 marked this conversation as resolved.
Outdated
|
||
| try: | ||
| kinesis_data = json.loads( | ||
| base64.b64decode(message["kinesis"]["data"]).decode() | ||
| ) | ||
| return kinesis_data.get("_datadog") | ||
| except (ValueError, TypeError, KeyError): | ||
| log.debug("Unable to parse kinesis data for lambda message") | ||
| return None | ||
| elif "Sns" in message: | ||
| message_body = message["Sns"] | ||
| else: | ||
| try: | ||
| body = message.get("body") | ||
| if body: | ||
| message_body = json.loads(body) | ||
| except (ValueError, TypeError): | ||
| log.debug("Unable to parse lambda message body as JSON, treat as non-json") | ||
|
|
||
| message_attributes = message_body.get("MessageAttributes") or message_body.get( | ||
| "messageAttributes" | ||
| ) | ||
| if not message_attributes: | ||
| log.debug("DataStreams skipped lambda message: %r", message) | ||
| return None | ||
|
|
||
| if "_datadog" not in message_attributes: | ||
| log.debug("DataStreams skipped lambda message: %r", message) | ||
| return None | ||
|
|
||
| datadog_attr = message_attributes["_datadog"] | ||
|
|
||
| if message_body.get("Type") == "Notification": | ||
| # SNS -> lambda notification | ||
| if datadog_attr.get("Type") == "Binary": | ||
| context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode()) | ||
| elif "stringValue" in datadog_attr: | ||
| # SQS -> lambda | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use the I would recommend creating a separate |
||
| context_json = json.loads(datadog_attr["stringValue"]) | ||
| elif "binaryValue" in datadog_attr: | ||
| # SNS -> SQS -> lambda, raw message delivery | ||
| context_json = json.loads( | ||
| base64.b64decode(datadog_attr["binaryValue"]).decode() | ||
| ) | ||
| else: | ||
| log.debug("DataStreams did not handle lambda message: %r", message) | ||
|
|
||
| return context_json | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,17 @@ | ||
| import unittest | ||
| import json | ||
| import base64 | ||
| from unittest.mock import patch, MagicMock | ||
|
|
||
| from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context | ||
| from datadog_lambda.dsm import ( | ||
| set_dsm_context, | ||
| _dsm_set_sqs_context, | ||
| _get_dsm_context_from_lambda, | ||
| ) | ||
| from datadog_lambda.trigger import EventTypes, _EventSource | ||
|
|
||
|
|
||
| class TestDsmSQSContext(unittest.TestCase): | ||
| class TestDSMContext(unittest.TestCase): | ||
| def setUp(self): | ||
| patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") | ||
| self.mock_dsm_set_sqs_context = patcher.start() | ||
|
|
@@ -110,3 +116,230 @@ def test_sqs_multiple_records_process_each_record(self): | |
| self.assertIn(f"topic:{expected_arns[i]}", tags) | ||
| self.assertIn("type:sqs", tags) | ||
| self.assertEqual(kwargs["payload_size"], 100) | ||
|
|
||
|
|
||
| class TestGetDSMContext(unittest.TestCase): | ||
| def test_sqs_to_lambda_string_value_format(self): | ||
| """Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)""" | ||
| trace_context = { | ||
| "x-datadog-trace-id": "789123456", | ||
| "x-datadog-parent-id": "321987654", | ||
| "dd-pathway-ctx": "test-pathway-ctx", | ||
| } | ||
|
|
||
| lambda_record = { | ||
| "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", | ||
| "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", | ||
| "body": "Test message.", | ||
| "attributes": { | ||
| "ApproximateReceiveCount": "1", | ||
| "SentTimestamp": "1545082649183", | ||
| "SenderId": "AIDAIENQZJOLO23YVJ4VO", | ||
| "ApproximateFirstReceiveTimestamp": "1545082649185", | ||
| }, | ||
| "messageAttributes": { | ||
| "_datadog": { | ||
| "stringValue": json.dumps(trace_context), | ||
| "stringListValues": [], | ||
| "binaryListValues": [], | ||
| "dataType": "String", | ||
| }, | ||
| "myAttribute": { | ||
| "stringValue": "myValue", | ||
| "stringListValues": [], | ||
| "binaryListValues": [], | ||
| "dataType": "String", | ||
| }, | ||
| }, | ||
| "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", | ||
| "eventSource": "aws:sqs", | ||
| "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", | ||
| "awsRegion": "us-east-2", | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(lambda_record) | ||
|
|
||
| assert result is not None | ||
| assert result == trace_context | ||
| assert result["x-datadog-trace-id"] == "789123456" | ||
| assert result["x-datadog-parent-id"] == "321987654" | ||
| assert result["dd-pathway-ctx"] == "test-pathway-ctx" | ||
|
|
||
| def test_sns_to_lambda_format(self): | ||
| """Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)""" | ||
| trace_context = { | ||
| "x-datadog-trace-id": "111111111", | ||
| "x-datadog-parent-id": "222222222", | ||
| "dd-pathway-ctx": "test-pathway-ctx", | ||
| } | ||
| binary_data = base64.b64encode( | ||
| json.dumps(trace_context).encode("utf-8") | ||
| ).decode("utf-8") | ||
|
|
||
| sns_lambda_record = { | ||
| "EventSource": "aws:sns", | ||
| "EventSubscriptionArn": ( | ||
| "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012" | ||
| ), | ||
| "Sns": { | ||
| "Type": "Notification", | ||
| "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", | ||
| "TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic", | ||
| "Subject": "Test Subject", | ||
| "Message": "Hello from SNS!", | ||
| "Timestamp": "2023-01-01T12:00:00.000Z", | ||
| "MessageAttributes": { | ||
| "_datadog": {"Type": "Binary", "Value": binary_data} | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(sns_lambda_record) | ||
|
|
||
| assert result is not None | ||
| assert result == trace_context | ||
| assert result["x-datadog-trace-id"] == "111111111" | ||
| assert result["x-datadog-parent-id"] == "222222222" | ||
| assert result["dd-pathway-ctx"] == "test-pathway-ctx" | ||
|
|
||
| def test_sns_to_sqs_to_lambda_binary_value_format(self): | ||
| """Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)""" | ||
| trace_context = { | ||
| "x-datadog-trace-id": "777666555", | ||
| "x-datadog-parent-id": "444333222", | ||
| "dd-pathway-ctx": "test-pathway-ctx", | ||
| } | ||
| binary_data = base64.b64encode( | ||
| json.dumps(trace_context).encode("utf-8") | ||
| ).decode("utf-8") | ||
|
|
||
| lambda_record = { | ||
| "messageId": "test-message-id", | ||
| "receiptHandle": "test-receipt-handle", | ||
| "body": "Test message body", | ||
| "messageAttributes": { | ||
| "_datadog": {"binaryValue": binary_data, "dataType": "Binary"} | ||
| }, | ||
| "eventSource": "aws:sqs", | ||
| "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue", | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(lambda_record) | ||
|
|
||
| assert result is not None | ||
| assert result == trace_context | ||
| assert result["x-datadog-trace-id"] == "777666555" | ||
| assert result["x-datadog-parent-id"] == "444333222" | ||
| assert result["dd-pathway-ctx"] == "test-pathway-ctx" | ||
|
|
||
| def test_sns_to_sqs_to_lambda_body_format(self): | ||
| """Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)""" | ||
| trace_context = { | ||
| "x-datadog-trace-id": "123987456", | ||
| "x-datadog-parent-id": "654321987", | ||
| "x-datadog-sampling-priority": "1", | ||
| "dd-pathway-ctx": "test-pathway-ctx", | ||
| } | ||
|
|
||
| message_body = { | ||
| "Type": "Notification", | ||
| "MessageId": "test-message-id", | ||
| "Message": "Test message from SNS", | ||
| "MessageAttributes": { | ||
| "_datadog": { | ||
| "Type": "Binary", | ||
| "Value": base64.b64encode( | ||
| json.dumps(trace_context).encode("utf-8") | ||
| ).decode("utf-8"), | ||
| } | ||
| }, | ||
| } | ||
|
|
||
| lambda_record = { | ||
| "messageId": "lambda-message-id", | ||
| "body": json.dumps(message_body), | ||
| "eventSource": "aws:sqs", | ||
| "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue", | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(lambda_record) | ||
|
|
||
| assert result is not None | ||
| assert result == trace_context | ||
| assert result["x-datadog-trace-id"] == "123987456" | ||
| assert result["x-datadog-parent-id"] == "654321987" | ||
| assert result["dd-pathway-ctx"] == "test-pathway-ctx" | ||
|
|
||
| def test_kinesis_to_lambda_format(self): | ||
| """Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)""" | ||
| trace_context = { | ||
| "x-datadog-trace-id": "555444333", | ||
| "x-datadog-parent-id": "888777666", | ||
| "dd-pathway-ctx": "test-pathway-ctx", | ||
| } | ||
|
|
||
| # Create the kinesis data payload | ||
| kinesis_payload = { | ||
| "_datadog": trace_context, | ||
| "actualData": "some business data", | ||
| } | ||
| encoded_kinesis_data = base64.b64encode( | ||
| json.dumps(kinesis_payload).encode("utf-8") | ||
| ).decode("utf-8") | ||
|
|
||
| kinesis_lambda_record = { | ||
| "eventSource": "aws:kinesis", | ||
| "eventSourceARN": ( | ||
| "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" | ||
| ), | ||
| "kinesis": { | ||
| "data": encoded_kinesis_data, | ||
| "partitionKey": "partition-key-1", | ||
| "sequenceNumber": ( | ||
| "49590338271490256608559692538361571095921575989136588898" | ||
| ), | ||
| }, | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(kinesis_lambda_record) | ||
|
|
||
| assert result is not None | ||
| assert result == trace_context | ||
| assert result["x-datadog-trace-id"] == "555444333" | ||
| assert result["x-datadog-parent-id"] == "888777666" | ||
| assert result["dd-pathway-ctx"] == "test-pathway-ctx" | ||
|
|
||
| def test_no_message_attributes(self): | ||
| """Test message without MessageAttributes returns None.""" | ||
| message = { | ||
| "messageId": "test-message-id", | ||
| "body": "Test message without attributes", | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(message) | ||
|
|
||
| assert result is None | ||
|
|
||
| def test_no_datadog_attribute(self): | ||
| """Test message with MessageAttributes but no _datadog attribute returns None.""" | ||
| message = { | ||
| "messageId": "test-message-id", | ||
| "body": "Test message", | ||
| "messageAttributes": { | ||
| "customAttribute": {"stringValue": "custom-value", "dataType": "String"} | ||
| }, | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(message) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nit] newline here |
||
| assert result is None | ||
|
|
||
| def test_empty_datadog_attribute(self): | ||
| """Test message with empty _datadog attribute returns None.""" | ||
| message = { | ||
| "messageId": "test-message-id", | ||
| "messageAttributes": {"_datadog": {}}, | ||
| } | ||
|
|
||
| result = _get_dsm_context_from_lambda(message) | ||
|
|
||
| assert result is None | ||
Uh oh!
There was an error while loading. Please reload this page.