Skip to content

Commit 65a554a

Browse files
committed
Enhance tracing for Lettuce reactive commands and add instrumentation for context propagation
1 parent c66cf73 commit 65a554a

10 files changed

Lines changed: 534 additions & 2 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v5;
20+
21+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
22+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
26+
import reactor.core.publisher.Flux;
27+
import reactor.core.publisher.Mono;
28+
import reactor.util.context.Context;
29+
30+
import java.lang.reflect.Method;
31+
import java.util.function.Function;
32+
33+
/**
34+
* Intercepts reactive publisher factory methods (createMono/createFlux)
35+
* to ensure the SkyWalking context snapshot is propagated via Reactor Context.
36+
*
37+
* <p>If the Reactor Context does not already contain a snapshot, this interceptor
38+
* captures the current active context and writes it into the subscriber context
39+
* as a fallback propagation mechanism.</p>
40+
*/
41+
public class RedisReactiveCreatePublisherMethodInterceptorV5 implements InstanceMethodsAroundInterceptorV2 {
42+
43+
private static final String SNAPSHOT_KEY = "SKYWALKING_CONTEXT_SNAPSHOT";
44+
45+
@Override
46+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
47+
MethodInvocationContext context) {
48+
}
49+
50+
@Override
51+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
52+
53+
if (!(ret instanceof Mono) && !(ret instanceof Flux)) {
54+
return ret;
55+
}
56+
57+
final ContextSnapshot snapshot;
58+
if (ContextManager.isActive()) {
59+
snapshot = ContextManager.capture();
60+
} else {
61+
return ret;
62+
}
63+
64+
Function<Context, Context> contextFunction = ctx -> {
65+
if (ctx.hasKey(SNAPSHOT_KEY)) {
66+
return ctx;
67+
}
68+
return ctx.put(SNAPSHOT_KEY, snapshot);
69+
};
70+
71+
if (ret instanceof Mono) {
72+
Mono<?> original = (Mono<?>) ret;
73+
return original.subscriberContext(contextFunction);
74+
} else {
75+
Flux<?> original = (Flux<?>) ret;
76+
return original.subscriberContext(contextFunction);
77+
}
78+
}
79+
80+
@Override
81+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
82+
}
83+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v5.define;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import org.apache.skywalking.apm.agent.core.plugin.WitnessMethod;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
27+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
28+
29+
import java.util.Collections;
30+
import java.util.List;
31+
32+
import static net.bytebuddy.matcher.ElementMatchers.named;
33+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
34+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
35+
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
36+
37+
/**
38+
*
39+
*/
40+
public class RedisReactiveCommandsInstrumentationV5 extends ClassInstanceMethodsEnhancePluginDefineV2 {
41+
42+
private static final String ENHANCE_CLASS = "io.lettuce.core.AbstractRedisReactiveCommands";
43+
44+
private static final String REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.v5.RedisReactiveCreatePublisherMethodInterceptorV5";
45+
46+
@Override
47+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
48+
return new InstanceMethodsInterceptV2Point[]{
49+
new InstanceMethodsInterceptV2Point() {
50+
@Override
51+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
52+
return namedOneOf(
53+
"createMono",
54+
"createFlux",
55+
"createDissolvingFlux"
56+
).and(takesArguments(1));
57+
}
58+
59+
@Override
60+
public String getMethodsInterceptorV2() {
61+
return REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR;
62+
}
63+
64+
@Override
65+
public boolean isOverrideArgs() {
66+
return false;
67+
}
68+
}
69+
};
70+
}
71+
72+
@Override
73+
public ClassMatch enhanceClass() {
74+
return byHierarchyMatch(ENHANCE_CLASS);
75+
}
76+
77+
@Override
78+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
79+
return new ConstructorInterceptPoint[0];
80+
}
81+
82+
@Override
83+
protected List<WitnessMethod> witnessMethods() {
84+
return Collections.singletonList(new WitnessMethod(
85+
"reactor.core.publisher.Mono",
86+
named("subscriberContext")
87+
));
88+
}
89+
}

apm-sniffer/apm-sdk-plugin/lettuce-plugins/lettuce-5.x-6.4.x-plugin/src/main/resources/skywalking-plugin.def

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
lettuce-5.x-6.4.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisChannelWriterInstrumentationV5
17+
lettuce-5.x-6.4.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisChannelWriterInstrumentationV5
18+
lettuce-5.x-6.4.x=org.apache.skywalking.apm.plugin.lettuce.v5.define.RedisReactiveCommandsInstrumentationV5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v65;
20+
21+
import org.apache.skywalking.apm.agent.core.context.ContextManager;
22+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
26+
import org.reactivestreams.Publisher;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.Mono;
29+
30+
import java.lang.reflect.Method;
31+
32+
/**
33+
* Intercepts reactive publisher factory methods (createMono/createFlux)
34+
* to ensure the SkyWalking context snapshot is propagated via Reactor Context.
35+
*
36+
* <p>If the Reactor Context does not already contain a snapshot, this interceptor
37+
* captures the current active context and writes it into the subscriber context
38+
* as a fallback propagation mechanism.</p>
39+
*/
40+
public class RedisReactiveCreatePublisherMethodInterceptorV65 implements InstanceMethodsAroundInterceptorV2 {
41+
42+
private static final String SNAPSHOT_KEY = "SKYWALKING_CONTEXT_SNAPSHOT";
43+
44+
@Override
45+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
46+
MethodInvocationContext context) {
47+
}
48+
49+
@Override
50+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
51+
52+
if (!(ret instanceof Mono) && !(ret instanceof Flux)) {
53+
return ret;
54+
}
55+
56+
final ContextSnapshot snapshot;
57+
if (ContextManager.isActive()) {
58+
snapshot = ContextManager.capture();
59+
} else {
60+
return ret;
61+
}
62+
63+
return wrapPublisher((Publisher<?>) ret, snapshot);
64+
}
65+
66+
private <T> Publisher<T> wrapPublisher(Publisher<T> original, ContextSnapshot snapshot) {
67+
if (original instanceof Mono) {
68+
return Mono.deferContextual(ctxView -> {
69+
if (ctxView.hasKey(SNAPSHOT_KEY)) {
70+
return (Mono<T>) original;
71+
}
72+
return ((Mono<T>) original).contextWrite(c -> c.put(SNAPSHOT_KEY, snapshot));
73+
});
74+
} else {
75+
return Flux.deferContextual(ctxView -> {
76+
if (ctxView.hasKey(SNAPSHOT_KEY)) {
77+
return original;
78+
}
79+
return ((Flux<T>) original).contextWrite(c -> c.put(SNAPSHOT_KEY, snapshot));
80+
});
81+
}
82+
}
83+
84+
@Override
85+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
86+
}
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.v65.define;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import net.bytebuddy.matcher.ElementMatchers;
24+
import org.apache.skywalking.apm.agent.core.plugin.WitnessMethod;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
26+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
27+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
28+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
29+
30+
import java.util.Collections;
31+
import java.util.List;
32+
33+
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
34+
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
35+
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
36+
37+
/**
38+
*
39+
*/
40+
public class RedisReactiveCommandsInstrumentationV65 extends ClassInstanceMethodsEnhancePluginDefineV2 {
41+
42+
private static final String ENHANCE_CLASS = "io.lettuce.core.AbstractRedisReactiveCommands";
43+
44+
private static final String REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.v65.RedisReactiveCreatePublisherMethodInterceptorV65";
45+
46+
@Override
47+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
48+
return new InstanceMethodsInterceptV2Point[]{
49+
new InstanceMethodsInterceptV2Point() {
50+
@Override
51+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
52+
return namedOneOf(
53+
"createMono",
54+
"createFlux",
55+
"createDissolvingFlux"
56+
).and(takesArguments(1));
57+
}
58+
59+
@Override
60+
public String getMethodsInterceptorV2() {
61+
return REDIS_REACTIVE_CREATEMONO_METHOD_INTERCEPTOR;
62+
}
63+
64+
@Override
65+
public boolean isOverrideArgs() {
66+
return false;
67+
}
68+
}
69+
};
70+
}
71+
72+
@Override
73+
public ClassMatch enhanceClass() {
74+
return byHierarchyMatch(ENHANCE_CLASS);
75+
}
76+
77+
@Override
78+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
79+
return new ConstructorInterceptPoint[0];
80+
}
81+
82+
@Override
83+
protected List<WitnessMethod> witnessMethods() {
84+
return Collections.singletonList(new WitnessMethod(
85+
"reactor.core.publisher.Mono",
86+
ElementMatchers.named("deferContextual")
87+
));
88+
}
89+
}

apm-sniffer/apm-sdk-plugin/lettuce-plugins/lettuce-6.5.x-plugin/src/main/resources/skywalking-plugin.def

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
lettuce-6.5.x=org.apache.skywalking.apm.plugin.lettuce.v65.define.RedisChannelWriterInstrumentationV65
17+
lettuce-6.5.x=org.apache.skywalking.apm.plugin.lettuce.v65.define.RedisChannelWriterInstrumentationV65
18+
lettuce-6.5.x=org.apache.skywalking.apm.plugin.lettuce.v65.define.RedisReactiveCommandsInstrumentationV65

0 commit comments

Comments
 (0)