-
Notifications
You must be signed in to change notification settings - Fork 51
fix(dsm): set dsm checkpoint for all records in event #643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
c0906a0
8230b07
e2cd3c5
5cf2059
90480bb
6936bba
268c1e5
2f8dfaa
63e41cb
e5a0903
103ffa1
6c2fbbd
2067363
f2b2dc3
1a1a1d9
6bcbd0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -248,91 +248,100 @@ def extract_context_from_sqs_or_sns_event_or_context( | |||||
| except Exception: | ||||||
| logger.debug("Failed extracting context as EventBridge to SQS.") | ||||||
|
|
||||||
| try: | ||||||
| first_record = event.get("Records")[0] | ||||||
| source_arn = first_record.get("eventSourceARN", "") | ||||||
|
|
||||||
| # logic to deal with SNS => SQS event | ||||||
| if "body" in first_record: | ||||||
| body_str = first_record.get("body") | ||||||
| try: | ||||||
| body = json.loads(body_str) | ||||||
| if body.get("Type", "") == "Notification" and "TopicArn" in body: | ||||||
| logger.debug("Found SNS message inside SQS event") | ||||||
| first_record = get_first_record(create_sns_event(body)) | ||||||
| except Exception: | ||||||
| pass | ||||||
|
|
||||||
| msg_attributes = first_record.get("messageAttributes") | ||||||
| if msg_attributes is None: | ||||||
| sns_record = first_record.get("Sns") or {} | ||||||
| # SNS->SQS event would extract SNS arn without this check | ||||||
| if event_source.equals(EventTypes.SNS): | ||||||
| source_arn = sns_record.get("TopicArn", "") | ||||||
| msg_attributes = sns_record.get("MessageAttributes") or {} | ||||||
| dd_payload = msg_attributes.get("_datadog") | ||||||
| if dd_payload: | ||||||
| # SQS uses dataType and binaryValue/stringValue | ||||||
| # SNS uses Type and Value | ||||||
| dd_json_data = None | ||||||
| dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") | ||||||
| if dd_json_data_type == "Binary": | ||||||
| import base64 | ||||||
| context = None | ||||||
| records = ( | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe let's have records always be: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| event.get("Records", []) | ||||||
| if config.data_streams_enabled | ||||||
| else [event.get("Records")[0]] | ||||||
| ) | ||||||
| for idx, record in enumerate(records): | ||||||
| try: | ||||||
| source_arn = record.get("eventSourceARN", "") | ||||||
| dsm_data = None | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same as bellow, dsm_data is not specific to data streams here. I would name it same as bellow, |
||||||
|
|
||||||
| # logic to deal with SNS => SQS event | ||||||
| if "body" in record: | ||||||
| body_str = record.get("body") | ||||||
| try: | ||||||
| body = json.loads(body_str) | ||||||
| if body.get("Type", "") == "Notification" and "TopicArn" in body: | ||||||
| logger.debug("Found SNS message inside SQS event") | ||||||
| record = get_first_record(create_sns_event(body)) | ||||||
| except Exception: | ||||||
| pass | ||||||
|
|
||||||
| msg_attributes = record.get("messageAttributes") | ||||||
| if msg_attributes is None: | ||||||
| sns_record = record.get("Sns") or {} | ||||||
| # SNS->SQS event would extract SNS arn without this check | ||||||
| if event_source.equals(EventTypes.SNS): | ||||||
| source_arn = sns_record.get("TopicArn", "") | ||||||
| msg_attributes = sns_record.get("MessageAttributes") or {} | ||||||
| dd_payload = msg_attributes.get("_datadog") | ||||||
| if dd_payload: | ||||||
| # SQS uses dataType and binaryValue/stringValue | ||||||
| # SNS uses Type and Value | ||||||
| # fmt: off | ||||||
| dd_json_data = None | ||||||
| dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") | ||||||
| if dd_json_data_type == "Binary": | ||||||
| import base64 | ||||||
|
|
||||||
| dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value") | ||||||
| if dd_json_data: | ||||||
| dd_json_data = base64.b64decode(dd_json_data) | ||||||
| elif dd_json_data_type == "String": | ||||||
| dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") | ||||||
| # fmt: on | ||||||
| else: | ||||||
| logger.debug( | ||||||
| "Datadog Lambda Python only supports extracting trace" | ||||||
| "context from String or Binary SQS/SNS message attributes" | ||||||
| ) | ||||||
|
|
||||||
| dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value") | ||||||
| if dd_json_data: | ||||||
| dd_json_data = base64.b64decode(dd_json_data) | ||||||
| elif dd_json_data_type == "String": | ||||||
| dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") | ||||||
| dd_data = json.loads(dd_json_data) | ||||||
|
|
||||||
| if is_step_function_event(dd_data): | ||||||
| try: | ||||||
| return extract_context_from_step_functions(dd_data, None) | ||||||
| except Exception: | ||||||
| logger.debug( | ||||||
| "Failed to extract Step Functions context from SQS/SNS event." | ||||||
| ) | ||||||
| if idx == 0: | ||||||
| context = propagator.extract(dd_data) | ||||||
| dsm_data = dd_data | ||||||
| else: | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this else, dsm_data is not set. Is that an issue?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not an issue. I checked in dd-trace-py and DSM never injects context into attributes.AWSTraceHeader, we can just set a checkpoint with None |
||||||
| logger.debug( | ||||||
| "Datadog Lambda Python only supports extracting trace" | ||||||
| "context from String or Binary SQS/SNS message attributes" | ||||||
| ) | ||||||
| # Handle case where trace context is injected into attributes.AWSTraceHeader | ||||||
| # example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 | ||||||
| attrs = event.get("Records")[0].get("attributes") | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here, it's accessing records[0] again. I would put this whole section in an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| if attrs: | ||||||
| x_ray_header = attrs.get("AWSTraceHeader") | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe put this logic in the (extract_context is probably not a great name, I let you find a better one)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm. I might be misinterpreting but I'm not sure we should have one function return both a Context() object and the return of a json.loads(). I ended up splitting the x-ray extractor into another helper, let me know what you think |
||||||
| if x_ray_header: | ||||||
| x_ray_context = parse_xray_header(x_ray_header) | ||||||
| trace_id_parts = x_ray_context.get("trace_id", "").split("-") | ||||||
| if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( | ||||||
| DD_TRACE_JAVA_TRACE_ID_PADDING | ||||||
| ): | ||||||
| # If it starts with eight 0's padding, | ||||||
| # then this AWSTraceHeader contains Datadog injected trace context | ||||||
| logger.debug( | ||||||
| "Found dd-trace injected trace context from AWSTraceHeader" | ||||||
| ) | ||||||
| if idx == 0: | ||||||
| context = Context( | ||||||
| trace_id=int(trace_id_parts[2][8:], 16), | ||||||
| span_id=int(x_ray_context["parent_id"], 16), | ||||||
| sampling_priority=float(x_ray_context["sampled"]), | ||||||
| ) | ||||||
| except Exception as e: | ||||||
| logger.debug("The trace extractor returned with error %s", e) | ||||||
|
|
||||||
| if dd_json_data: | ||||||
| dd_data = json.loads(dd_json_data) | ||||||
| # Set DSM checkpoint once per record | ||||||
| _dsm_set_checkpoint(dsm_data, event_type, source_arn) | ||||||
|
|
||||||
| if is_step_function_event(dd_data): | ||||||
| try: | ||||||
| return extract_context_from_step_functions(dd_data, None) | ||||||
| except Exception: | ||||||
| logger.debug( | ||||||
| "Failed to extract Step Functions context from SQS/SNS event." | ||||||
| ) | ||||||
| context = propagator.extract(dd_data) | ||||||
| _dsm_set_checkpoint(dd_data, event_type, source_arn) | ||||||
| return context | ||||||
| else: | ||||||
| # Handle case where trace context is injected into attributes.AWSTraceHeader | ||||||
| # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 | ||||||
| attrs = event.get("Records")[0].get("attributes") | ||||||
| if attrs: | ||||||
| x_ray_header = attrs.get("AWSTraceHeader") | ||||||
| if x_ray_header: | ||||||
| x_ray_context = parse_xray_header(x_ray_header) | ||||||
| trace_id_parts = x_ray_context.get("trace_id", "").split("-") | ||||||
| if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( | ||||||
| DD_TRACE_JAVA_TRACE_ID_PADDING | ||||||
| ): | ||||||
| # If it starts with eight 0's padding, | ||||||
| # then this AWSTraceHeader contains Datadog injected trace context | ||||||
| logger.debug( | ||||||
| "Found dd-trace injected trace context from AWSTraceHeader" | ||||||
| ) | ||||||
| return Context( | ||||||
| trace_id=int(trace_id_parts[2][8:], 16), | ||||||
| span_id=int(x_ray_context["parent_id"], 16), | ||||||
| sampling_priority=float(x_ray_context["sampled"]), | ||||||
| ) | ||||||
| # Still want to set a DSM checkpoint even if DSM context not propagated | ||||||
| _dsm_set_checkpoint(None, event_type, source_arn) | ||||||
| return extract_context_from_lambda_context(lambda_context) | ||||||
| except Exception as e: | ||||||
| logger.debug("The trace extractor returned with error %s", e) | ||||||
| # Still want to set a DSM checkpoint even if DSM context not propagated | ||||||
| _dsm_set_checkpoint(None, event_type, source_arn) | ||||||
| return extract_context_from_lambda_context(lambda_context) | ||||||
| return context if context else extract_context_from_lambda_context(lambda_context) | ||||||
|
|
||||||
|
|
||||||
| def _extract_context_from_eventbridge_sqs_event(event): | ||||||
|
|
@@ -393,30 +402,42 @@ def extract_context_from_kinesis_event(event, lambda_context): | |||||
| Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported. | ||||||
| """ | ||||||
| source_arn = "" | ||||||
| try: | ||||||
| record = get_first_record(event) | ||||||
| source_arn = record.get("eventSourceARN", "") | ||||||
| kinesis = record.get("kinesis") | ||||||
| if not kinesis: | ||||||
| return extract_context_from_lambda_context(lambda_context) | ||||||
| data = kinesis.get("data") | ||||||
| if data: | ||||||
| import base64 | ||||||
|
|
||||||
| b64_bytes = data.encode("ascii") | ||||||
| str_bytes = base64.b64decode(b64_bytes) | ||||||
| data_str = str_bytes.decode("ascii") | ||||||
| data_obj = json.loads(data_str) | ||||||
| dd_ctx = data_obj.get("_datadog") | ||||||
| if dd_ctx: | ||||||
| context = propagator.extract(dd_ctx) | ||||||
| _dsm_set_checkpoint(dd_ctx, "kinesis", source_arn) | ||||||
| return context | ||||||
| except Exception as e: | ||||||
| logger.debug("The trace extractor returned with error %s", e) | ||||||
| # Still want to set a DSM checkpoint even if DSM context not propagated | ||||||
| _dsm_set_checkpoint(None, "kinesis", source_arn) | ||||||
| return extract_context_from_lambda_context(lambda_context) | ||||||
| records = ( | ||||||
| [get_first_record(event)] | ||||||
| if not config.data_streams_enabled | ||||||
| else event.get("Records") | ||||||
| ) | ||||||
| context = None | ||||||
| for idx, record in enumerate(records): | ||||||
| dsm_data = None | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not specific to dsm, it's |
||||||
| try: | ||||||
| source_arn = record.get("eventSourceARN", "") | ||||||
| kinesis = record.get("kinesis") | ||||||
| if not kinesis: | ||||||
| context = ( | ||||||
| extract_context_from_lambda_context(lambda_context) | ||||||
| if idx == 0 | ||||||
| else context | ||||||
| ) | ||||||
| _dsm_set_checkpoint(None, "kinesis", source_arn) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we are setting a checkpoint
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm. This is deep enough inside the extract function where we believe that the event source is from Kinesis from parsing beforehand. However, all AWS documentation says Kinesis lambda event should have this field. To my understanding, this check is for lambda synchronous invocations with records that match Kinesis, but doesn't actually come from a Kinesis stream. @DataDog/apm-serverless Can you help confirm why this check is here in the first place? |
||||||
| continue | ||||||
| data = kinesis.get("data") | ||||||
| if data: | ||||||
| import base64 | ||||||
|
|
||||||
| b64_bytes = data.encode("ascii") | ||||||
| str_bytes = base64.b64decode(b64_bytes) | ||||||
| data_str = str_bytes.decode("ascii") | ||||||
| data_obj = json.loads(data_str) | ||||||
| dd_ctx = data_obj.get("_datadog") | ||||||
| if dd_ctx: | ||||||
| if idx == 0: | ||||||
| context = propagator.extract(dd_ctx) | ||||||
| dsm_data = dd_ctx | ||||||
| except Exception as e: | ||||||
| logger.debug("The trace extractor returned with error %s", e) | ||||||
| _dsm_set_checkpoint(dsm_data, "kinesis", source_arn) | ||||||
| return context if context else extract_context_from_lambda_context(lambda_context) | ||||||
|
|
||||||
|
|
||||||
| def _deterministic_sha256_hash(s: str, part: str) -> int: | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if context is extracted from event bridge, we don't set a checkpoint. Is that expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tracers never inject DSM context in the case of event bridge or step functions. I'm not sure this is the PR to be adding the functionality for these event types