|
1 | | -import { SNSMessage, SQSEvent } from "aws-lambda"; |
2 | | -import { EventTraceExtractor } from "../extractor"; |
| 1 | +import { SQSEvent } from "aws-lambda"; |
3 | 2 | import { TracerWrapper } from "../../tracer-wrapper"; |
4 | 3 | import { logDebug } from "../../../utils"; |
| 4 | +import { EventTraceExtractor } from "../extractor"; |
5 | 5 | import { SpanContextWrapper } from "../../span-context-wrapper"; |
6 | | -import { XrayService } from "../../xray-service"; |
7 | | -import { StepFunctionContextService } from "../../step-function-service"; |
| 6 | +import { extractTraceContext, extractFromAWSTraceHeader, handleExtractionError } from "../extractor-utils"; |
8 | 7 |
|
9 | 8 | export class SNSSQSEventTraceExtractor implements EventTraceExtractor { |
10 | 9 | constructor(private tracerWrapper: TracerWrapper) {} |
11 | 10 |
|
12 | 11 | extract(event: SQSEvent): SpanContextWrapper | null { |
13 | 12 | try { |
14 | | - // First try to extract trace context from message attributes |
15 | | - if (event?.Records?.[0]?.body) { |
16 | | - const parsedBody = JSON.parse(event?.Records?.[0]?.body) as SNSMessage; |
17 | | - const messageAttribute = parsedBody?.MessageAttributes?._datadog; |
18 | | - if (messageAttribute?.Value) { |
| 13 | + // Try to extract trace context from SNS wrapped in SQS |
| 14 | + const body = event?.Records?.[0]?.body; |
| 15 | + if (body) { |
| 16 | + const parsedBody = JSON.parse(body); |
| 17 | + const snsMessageAttribute = parsedBody?.MessageAttributes?._datadog; |
| 18 | + if (snsMessageAttribute?.Value) { |
19 | 19 | let headers; |
20 | | - if (messageAttribute.Type === "String") { |
21 | | - headers = JSON.parse(messageAttribute.Value); |
| 20 | + if (snsMessageAttribute.Type === "String") { |
| 21 | + headers = JSON.parse(snsMessageAttribute.Value); |
22 | 22 | } else { |
23 | | - const decodedValue = Buffer.from(messageAttribute.Value, "base64").toString("ascii"); |
| 23 | + // Try decoding base64 values |
| 24 | + const decodedValue = Buffer.from(snsMessageAttribute.Value, "base64").toString("ascii"); |
24 | 25 | headers = JSON.parse(decodedValue); |
25 | 26 | } |
26 | 27 |
|
27 | | - // First try to extract as regular trace headers |
28 | | - const traceContext = this.tracerWrapper.extract(headers); |
| 28 | + const traceContext = extractTraceContext(headers, this.tracerWrapper); |
29 | 29 | if (traceContext) { |
30 | | - logDebug("Extracted trace context from SNS-SQS event"); |
31 | 30 | return traceContext; |
32 | 31 | } |
33 | | - |
34 | | - // If that fails, check if this is a Step Function context |
35 | | - const stepFunctionInstance = StepFunctionContextService.instance(headers); |
36 | | - const stepFunctionContext = stepFunctionInstance.context; |
37 | | - |
38 | | - if (stepFunctionContext !== undefined) { |
39 | | - const spanContext = stepFunctionInstance.spanContext; |
40 | | - if (spanContext !== null) { |
41 | | - logDebug("Extracted Step Function trace context from SNS-SQS event", { spanContext, event }); |
42 | | - return spanContext; |
43 | | - } |
44 | | - } |
45 | | - |
46 | 32 | logDebug("Failed to extract trace context from SNS-SQS event"); |
47 | 33 | } |
48 | 34 | } |
49 | 35 |
|
50 | | - // Check SQS message attributes for Step Function context |
51 | | - const sqsMessageAttribute = event?.Records?.[0]?.messageAttributes?._datadog?.stringValue; |
52 | | - if (sqsMessageAttribute) { |
53 | | - const parsedHeaders = JSON.parse(sqsMessageAttribute); |
54 | | - |
55 | | - // First try to extract as regular trace headers |
56 | | - const traceContext = this.tracerWrapper.extract(parsedHeaders); |
| 36 | + // Check SQS message attributes as a fallback |
| 37 | + const sqsMessageAttribute = event?.Records?.[0]?.messageAttributes?._datadog; |
| 38 | + if (sqsMessageAttribute?.stringValue) { |
| 39 | + const headers = JSON.parse(sqsMessageAttribute.stringValue); |
| 40 | + const traceContext = extractTraceContext(headers, this.tracerWrapper); |
57 | 41 | if (traceContext) { |
58 | | - logDebug("Extracted trace context from SQS messageAttributes"); |
59 | 42 | return traceContext; |
60 | 43 | } |
61 | | - |
62 | | - // If that fails, check if this is a Step Function context |
63 | | - const stepFunctionInstance = StepFunctionContextService.instance(parsedHeaders); |
64 | | - const stepFunctionContext = stepFunctionInstance.context; |
65 | | - |
66 | | - if (stepFunctionContext !== undefined) { |
67 | | - const spanContext = stepFunctionInstance.spanContext; |
68 | | - if (spanContext !== null) { |
69 | | - logDebug("Extracted Step Function trace context from SQS messageAttributes", { spanContext, event }); |
70 | | - return spanContext; |
71 | | - } |
72 | | - } |
73 | 44 | } |
74 | 45 |
|
75 | | - // Then try to extract trace context from attributes.AWSTraceHeader. (Upstream Java apps can |
76 | | - // pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case) |
77 | | - if (event?.Records?.[0]?.attributes?.AWSTraceHeader !== undefined) { |
78 | | - const traceContext = XrayService.extraceDDContextFromAWSTraceHeader(event.Records[0].attributes.AWSTraceHeader); |
79 | | - if (traceContext) { |
80 | | - logDebug("Extracted trace context from SNS-SQS event attributes.AWSTraceHeader"); |
81 | | - return traceContext; |
82 | | - } else { |
83 | | - logDebug("No Datadog trace context found from SNS-SQS event attributes.AWSTraceHeader"); |
84 | | - } |
| 46 | + // Else try to extract trace context from attributes.AWSTraceHeader |
| 47 | + // (Upstream Java apps can pass down Datadog trace context in the attributes.AWSTraceHeader in SQS case) |
| 48 | + const awsTraceHeader = event?.Records?.[0]?.attributes?.AWSTraceHeader; |
| 49 | + if (awsTraceHeader !== undefined) { |
| 50 | + return extractFromAWSTraceHeader(awsTraceHeader, "SNS-SQS"); |
85 | 51 | } |
86 | 52 | } catch (error) { |
87 | | - if (error instanceof Error) { |
88 | | - logDebug("Unable to extract trace context from SNS-SQS event", error); |
89 | | - } |
| 53 | + handleExtractionError(error, "SQS"); |
90 | 54 | } |
91 | 55 |
|
92 | 56 | return null; |
|
0 commit comments