Skip to content

Commit df5c04c

Browse files
committed
Added support for Lettuce reactive Redis commands
1 parent 2a61027 commit df5c04c

30 files changed

Lines changed: 1166 additions & 2 deletions

File tree

.github/workflows/plugins-jdk17-test.1.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ jobs:
6565
- c3p0-0.9.2.x-0.10.x-scenario
6666
- spring-scheduled-6.x-scenario
6767
- caffeine-3.x-scenario
68+
- lettuce-webflux-6x-scenario
6869
steps:
6970
- uses: actions/checkout@v2
7071
with:

.github/workflows/plugins-test.1.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ jobs:
7373
- kotlin-coroutine-scenario
7474
- lettuce-scenario
7575
- lettuce-6.5.x-scenario
76+
- lettuce-webflux-5x-scenario
7677
- mongodb-3.x-scenario
7778
- mongodb-4.x-scenario
7879
- netty-socketio-scenario

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ Release Notes.
2929
* Add `eclipse-temurin:25-jre` as another base image.
3030
* Add JDK25 plugin tests for Spring 6.
3131
* Ignore classes starting with "sun.nio.cs" in bytebuddy due to potential class loading deadlock.
32+
* Added support for Lettuce reactive Redis commands.
3233

3334
All issues and pull requests are [here](https://github.com/apache/skywalking/milestone/242?closed=1)
3435

apm-sniffer/apm-sdk-plugin/lettuce-plugins/lettuce-common/src/main/java/org/apache/skywalking/apm/plugin/lettuce/common/RedisChannelWriterInterceptor.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.lettuce.core.protocol.RedisCommand;
2626
import org.apache.skywalking.apm.agent.core.conf.Constants;
2727
import org.apache.skywalking.apm.agent.core.context.ContextManager;
28+
import org.apache.skywalking.apm.agent.core.context.ContextSnapshot;
2829
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
2930
import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
3031
import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
@@ -57,8 +58,10 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
5758
}
5859
EnhancedInstance enhancedCommand = (EnhancedInstance) spanCarrierCommand;
5960

61+
Object skyWalkingDynamicField = enhancedCommand.getSkyWalkingDynamicField();
62+
6063
// command has been handle by another channel writer (cluster or sentinel case)
61-
if (enhancedCommand.getSkyWalkingDynamicField() != null) {
64+
if (skyWalkingDynamicField instanceof AbstractSpan) {
6265
//set peer in last channel writer (delegate)
6366
if (peer != null) {
6467
AbstractSpan span = (AbstractSpan) enhancedCommand.getSkyWalkingDynamicField();
@@ -82,6 +85,16 @@ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allAr
8285
command = "BATCH_WRITE";
8386
}
8487
AbstractSpan span = ContextManager.createExitSpan(operationName, peer);
88+
89+
if (skyWalkingDynamicField instanceof ContextSnapshot) {
90+
ContextSnapshot snapshot = (ContextSnapshot) skyWalkingDynamicField;
91+
if (!ContextManager.isActive()) {
92+
AbstractSpan localSpan = ContextManager.createLocalSpan("RedisReactive/local");
93+
localSpan.setComponent(ComponentsDefine.LETTUCE);
94+
}
95+
ContextManager.continued(snapshot);
96+
}
97+
8598
span.setComponent(ComponentsDefine.LETTUCE);
8699
Tags.CACHE_TYPE.set(span, "Redis");
87100
if (StringUtil.isNotEmpty(key)) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.common;
20+
21+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
22+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
23+
24+
/**
25+
* Interceptor for RedisSubscription constructor.
26+
* <p>
27+
* This interceptor captures the {@link io.lettuce.core.protocol.RedisCommand} instance
28+
* at subscription construction time and stores it into SkyWalking dynamic field.
29+
* </p>
30+
*/
31+
public class RedisSubscriptionConstructorInterceptor implements InstanceConstructorInterceptor {
32+
33+
@Override
34+
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
35+
// allArguments[1] is the RedisCommand passed to the RedisSubscription constructor
36+
objInst.setSkyWalkingDynamicField(allArguments[1]);
37+
}
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.common;
20+
21+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
22+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.InstanceMethodsAroundInterceptorV2;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.MethodInvocationContext;
24+
import reactor.core.CoreSubscriber;
25+
26+
import java.lang.reflect.Method;
27+
28+
/**
29+
* Interceptor for {@code RedisPublisher.RedisSubscription#subscribe(Subscriber)} method.
30+
*
31+
* <p>
32+
* This interceptor works together with the constructor interceptor of
33+
* {@code RedisSubscription}:
34+
* </p>
35+
*/
36+
public class RedisSubscriptionSubscribeMethodInterceptor implements InstanceMethodsAroundInterceptorV2 {
37+
38+
@Override
39+
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) {
40+
if (allArguments[0] instanceof CoreSubscriber) {
41+
CoreSubscriber<?> subscriber = (CoreSubscriber<?>) allArguments[0];
42+
// get ContextSnapshot from reactor context, the snapshot is set to reactor context by any other plugin
43+
// such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
44+
Object skywalkingContextSnapshot = subscriber.currentContext().getOrDefault("SKYWALKING_CONTEXT_SNAPSHOT", null);
45+
if (skywalkingContextSnapshot != null) {
46+
((EnhancedInstance) objInst.getSkyWalkingDynamicField()).setSkyWalkingDynamicField(skywalkingContextSnapshot);
47+
}
48+
}
49+
}
50+
51+
@Override
52+
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) {
53+
return ret;
54+
}
55+
56+
@Override
57+
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t, MethodInvocationContext context) {
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.plugin.lettuce.common.define;
20+
21+
import net.bytebuddy.description.method.MethodDescription;
22+
import net.bytebuddy.matcher.ElementMatcher;
23+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterceptPoint;
24+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.v2.ClassInstanceMethodsEnhancePluginDefineV2;
25+
import org.apache.skywalking.apm.agent.core.plugin.interceptor.v2.InstanceMethodsInterceptV2Point;
26+
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
27+
28+
import static net.bytebuddy.matcher.ElementMatchers.any;
29+
import static net.bytebuddy.matcher.ElementMatchers.named;
30+
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
31+
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
32+
33+
/**
34+
*
35+
*/
36+
public class RedisSubscriptionInstrumentation extends ClassInstanceMethodsEnhancePluginDefineV2 {
37+
38+
private static final String ENHANCE_CLASS = "io.lettuce.core.RedisPublisher$RedisSubscription";
39+
40+
private static final String REDIS_SUBSCRIPTION_SUBSCRIBE_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.common.RedisSubscriptionSubscribeMethodInterceptor";
41+
private static final String REDIS_SUBSCRIPTION_CONST_METHOD_INTERCEPTOR = "org.apache.skywalking.apm.plugin.lettuce.common.RedisSubscriptionConstructorInterceptor";
42+
43+
@Override
44+
public InstanceMethodsInterceptV2Point[] getInstanceMethodsInterceptV2Points() {
45+
return new InstanceMethodsInterceptV2Point[]{
46+
new InstanceMethodsInterceptV2Point() {
47+
@Override
48+
public ElementMatcher<MethodDescription> getMethodsMatcher() {
49+
return named("subscribe");
50+
}
51+
52+
@Override
53+
public String getMethodsInterceptorV2() {
54+
return REDIS_SUBSCRIPTION_SUBSCRIBE_METHOD_INTERCEPTOR;
55+
}
56+
57+
@Override
58+
public boolean isOverrideArgs() {
59+
return false;
60+
}
61+
}
62+
};
63+
}
64+
65+
@Override
66+
public ClassMatch enhanceClass() {
67+
return byName(ENHANCE_CLASS);
68+
}
69+
70+
@Override
71+
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
72+
return new ConstructorInterceptPoint[] {
73+
new ConstructorInterceptPoint() {
74+
@Override
75+
public ElementMatcher<MethodDescription> getConstructorMatcher() {
76+
return any().and(takesArgument(1, named("io.lettuce.core.protocol.RedisCommand")));
77+
}
78+
79+
@Override
80+
public String getConstructorInterceptor() {
81+
return REDIS_SUBSCRIPTION_CONST_METHOD_INTERCEPTOR;
82+
}
83+
}
84+
};
85+
}
86+
}

apm-sniffer/apm-sdk-plugin/lettuce-plugins/lettuce-common/src/main/resources/skywalking-plugin.def

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,5 @@
1515
# limitations under the License.
1616

1717
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.DefaultEndpointInstrumentation
18-
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisCommandInstrumentation
18+
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisCommandInstrumentation
19+
lettuce-common=org.apache.skywalking.apm.plugin.lettuce.common.define.RedisSubscriptionInstrumentation
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
19+
home="$(cd "$(dirname $0)"; pwd)"
20+
21+
java -Dredis.host=${REDIS_SERVERS} -jar -Dskywalking.plugin.lettuce.trace_redis_parameters=true ${agent_opts} ${home}/../libs/lettuce-webflux-5x-scenario.jar &
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
segmentItems:
17+
- serviceName: lettuce-webflux-5x-scenario
18+
segmentSize: nq 0
19+
segments:
20+
- segmentId: not null
21+
spans:
22+
- operationName: /case/healthCheck
23+
parentSpanId: -1
24+
spanId: 0
25+
spanLayer: Http
26+
startTime: not null
27+
endTime: not null
28+
componentId: 67
29+
isError: false
30+
spanType: Entry
31+
peer: ''
32+
skipAnalysis: false
33+
tags:
34+
- {key: url, value: 'http://localhost:8080/case/healthCheck'}
35+
- {key: http.method, value: HEAD}
36+
- {key: http.status_code, value: '200'}
37+
- segmentId: not null
38+
spans:
39+
- operationName: Lettuce/GET
40+
parentSpanId: -1
41+
spanId: 0
42+
spanLayer: Cache
43+
startTime: not null
44+
endTime: not null
45+
componentId: 57
46+
isError: false
47+
spanType: Exit
48+
peer: not null
49+
skipAnalysis: false
50+
tags:
51+
- {key: cache.type, value: Redis}
52+
- {key: cache.key, value: key}
53+
- {key: cache.cmd, value: GET}
54+
- {key: cache.op, value: read}
55+
refs:
56+
- { parentEndpoint: /case/lettuce-case, networkAddress: '', refType: CrossThread,
57+
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
58+
null, parentService: not null, traceId: not null }
59+
- segmentId: not null
60+
spans:
61+
- operationName: Lettuce/SET
62+
parentSpanId: -1
63+
spanId: 0
64+
spanLayer: Cache
65+
startTime: not null
66+
endTime: not null
67+
componentId: 57
68+
isError: false
69+
spanType: Exit
70+
peer: not null
71+
skipAnalysis: false
72+
tags:
73+
- { key: cache.type, value: Redis }
74+
- { key: cache.key, value: key0 }
75+
- { key: cache.cmd, value: SET }
76+
- { key: cache.op, value: write }
77+
refs:
78+
- { parentEndpoint: /case/lettuce-case, networkAddress: '', refType: CrossThread,
79+
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
80+
null, parentService: not null, traceId: not null }
81+
- segmentId: not null
82+
spans:
83+
- operationName: Lettuce/SET
84+
parentSpanId: -1
85+
spanId: 0
86+
spanLayer: Cache
87+
startTime: not null
88+
endTime: not null
89+
componentId: 57
90+
isError: false
91+
spanType: Exit
92+
peer: not null
93+
skipAnalysis: false
94+
tags:
95+
- { key: cache.type, value: Redis }
96+
- { key: cache.key, value: key1 }
97+
- { key: cache.cmd, value: SET }
98+
- { key: cache.op, value: write }
99+
refs:
100+
- { parentEndpoint: /case/lettuce-case, networkAddress: '', refType: CrossThread,
101+
parentSpanId: 0, parentTraceSegmentId: not null, parentServiceInstance: not
102+
null, parentService: not null, traceId: not null }
103+
- segmentId: not null
104+
spans:
105+
- operationName: /case/lettuce-case
106+
parentSpanId: -1
107+
spanId: 0
108+
spanLayer: Http
109+
startTime: not null
110+
endTime: not null
111+
componentId: 67
112+
isError: false
113+
spanType: Entry
114+
peer: ''
115+
skipAnalysis: false
116+
tags:
117+
- {key: url, value: 'http://localhost:8080/case/lettuce-case'}
118+
- {key: http.method, value: GET}
119+
- {key: http.status_code, value: '200'}

0 commit comments

Comments
 (0)