Skip to content

Commit cd4bf58

Browse files
committed
Add case
1 parent 4551e27 commit cd4bf58

7 files changed

Lines changed: 501 additions & 0 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.testcase.grpc.consumr;
20+
21+
import io.grpc.CallOptions;
22+
import io.grpc.Channel;
23+
import io.grpc.ClientCall;
24+
import io.grpc.ClientInterceptor;
25+
import io.grpc.ForwardingClientCall;
26+
import io.grpc.ForwardingClientCallListener;
27+
import io.grpc.Metadata;
28+
import io.grpc.MethodDescriptor;
29+
import io.grpc.Status;
30+
import javax.annotation.Nullable;
31+
import org.apache.logging.log4j.LogManager;
32+
import org.apache.logging.log4j.Logger;
33+
34+
public class ConsumerInterceptor implements ClientInterceptor {
35+
36+
private static final Logger LOGGER = LogManager.getLogger(ConsumerInterceptor.class);
37+
38+
@Override
39+
public <REQ_T, RESP_T> ClientCall<REQ_T, RESP_T> interceptCall(MethodDescriptor<REQ_T, RESP_T> descriptor,
40+
CallOptions options, Channel channel) {
41+
LOGGER.info("start interceptor!");
42+
LOGGER.info("method type: {}", descriptor.getType());
43+
return new ForwardingClientCall.SimpleForwardingClientCall<REQ_T, RESP_T>(channel.newCall(descriptor, options)) {
44+
@Override
45+
public void start(Listener<RESP_T> responseListener, Metadata headers) {
46+
LOGGER.info("Peer: {}", channel.authority());
47+
LOGGER.info("Operation Name : {}", descriptor.getFullMethodName());
48+
Interceptor<RESP_T> tracingResponseListener = new Interceptor(responseListener);
49+
tracingResponseListener.contextSnapshot = "contextSnapshot";
50+
delegate().start(tracingResponseListener, headers);
51+
}
52+
53+
@Override
54+
public void cancel(@Nullable String message, @Nullable Throwable cause) {
55+
LOGGER.info("cancel");
56+
super.cancel(message, cause);
57+
}
58+
59+
@Override
60+
public void halfClose() {
61+
LOGGER.info("halfClose");
62+
super.halfClose();
63+
}
64+
65+
@Override
66+
public void sendMessage(REQ_T message) {
67+
LOGGER.info("sendMessage ....");
68+
super.sendMessage(message);
69+
}
70+
};
71+
}
72+
73+
private static class Interceptor<RESP_T> extends ForwardingClientCallListener.SimpleForwardingClientCallListener<RESP_T> {
74+
private static final Logger LOGGER = LogManager.getLogger(Interceptor.class);
75+
76+
private Object contextSnapshot;
77+
78+
protected Interceptor(ClientCall.Listener<RESP_T> delegate) {
79+
super(delegate);
80+
}
81+
82+
@Override
83+
public void onHeaders(Metadata headers) {
84+
LOGGER.info("on Headers");
85+
for (String key : headers.keys()) {
86+
LOGGER.info("Receive key: {}", key);
87+
}
88+
delegate().onHeaders(headers);
89+
}
90+
91+
@Override
92+
public void onMessage(RESP_T message) {
93+
LOGGER.info("contextSnapshot: {}", contextSnapshot);
94+
delegate().onMessage(message);
95+
}
96+
97+
@Override
98+
public void onClose(Status status, Metadata trailers) {
99+
LOGGER.info("on close");
100+
delegate().onClose(status, trailers);
101+
}
102+
103+
@Override
104+
public void onReady() {
105+
LOGGER.info("on Ready");
106+
super.onReady();
107+
}
108+
}
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.testcase.grpc.controller;
20+
21+
import io.grpc.ClientInterceptors;
22+
import io.grpc.ManagedChannel;
23+
import io.grpc.ManagedChannelBuilder;
24+
import io.grpc.stub.ClientCallStreamObserver;
25+
import io.grpc.stub.ClientResponseObserver;
26+
import java.util.Arrays;
27+
import java.util.Iterator;
28+
import java.util.List;
29+
import javax.annotation.PostConstruct;
30+
import org.apache.logging.log4j.LogManager;
31+
import org.apache.logging.log4j.Logger;
32+
import org.apache.skywalking.apm.testcase.grpc.consumr.ConsumerInterceptor;
33+
import org.apache.skywalking.apm.testcase.grpc.proto.GreeterBlockingErrorGrpc;
34+
import org.apache.skywalking.apm.testcase.grpc.proto.GreeterBlockingGrpc;
35+
import org.apache.skywalking.apm.testcase.grpc.proto.GreeterGrpc;
36+
import org.apache.skywalking.apm.testcase.grpc.proto.HelloRequest;
37+
import org.apache.skywalking.apm.testcase.grpc.proto.HelloReply;
38+
import org.springframework.web.bind.annotation.RequestMapping;
39+
import org.springframework.web.bind.annotation.ResponseBody;
40+
import org.springframework.web.bind.annotation.RestController;
41+
42+
@RestController
43+
@RequestMapping("/case")
44+
public class CaseController {
45+
46+
private static final Logger LOGGER = LogManager.getLogger(CaseController.class);
47+
48+
private static final String SUCCESS = "Success";
49+
50+
private final String grpcProviderHost = "127.0.0.1";
51+
private final int grpcProviderPort = 18080;
52+
private ManagedChannel channel;
53+
private GreeterGrpc.GreeterStub greeterStub;
54+
private GreeterBlockingGrpc.GreeterBlockingBlockingStub greeterBlockingStub;
55+
private GreeterBlockingErrorGrpc.GreeterBlockingErrorBlockingStub greeterBlockingErrorStub;
56+
57+
@PostConstruct
58+
public void up() {
59+
channel = ManagedChannelBuilder.forAddress(grpcProviderHost, grpcProviderPort).usePlaintext().build();
60+
greeterStub = GreeterGrpc.newStub(ClientInterceptors.intercept(channel, new ConsumerInterceptor()));
61+
greeterBlockingStub = GreeterBlockingGrpc.newBlockingStub(ClientInterceptors.intercept(channel, new ConsumerInterceptor()));
62+
greeterBlockingErrorStub = GreeterBlockingErrorGrpc.newBlockingStub(ClientInterceptors.intercept(channel, new ConsumerInterceptor()));
63+
}
64+
65+
@RequestMapping("/grpc-scenario")
66+
@ResponseBody
67+
public String testcase() {
68+
greetService();
69+
greetBlockingService();
70+
greetBlockingErrorService();
71+
return SUCCESS;
72+
}
73+
74+
@RequestMapping("/healthCheck")
75+
@ResponseBody
76+
public String healthCheck() {
77+
// your codes
78+
return SUCCESS;
79+
}
80+
81+
private static List<String> names() {
82+
return Arrays.asList("Sophia", "Jackson");
83+
}
84+
85+
private void greetService() {
86+
ClientResponseObserver<HelloRequest, HelloReply> helloReplyStreamObserver = new ClientResponseObserver<HelloRequest, HelloReply>() {
87+
private ClientCallStreamObserver<HelloRequest> requestStream;
88+
89+
@Override
90+
public void beforeStart(ClientCallStreamObserver observer) {
91+
this.requestStream = observer;
92+
this.requestStream.setOnReadyHandler(new Runnable() {
93+
Iterator<String> iterator = names().iterator();
94+
95+
@Override
96+
public void run() {
97+
while (requestStream.isReady()) {
98+
if (iterator.hasNext()) {
99+
String name = iterator.next();
100+
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
101+
requestStream.onNext(request);
102+
} else {
103+
requestStream.onCompleted();
104+
}
105+
}
106+
}
107+
});
108+
}
109+
110+
@Override
111+
public void onNext(HelloReply reply) {
112+
LOGGER.info("Receive an message from provider. message: {}", reply.getMessage());
113+
requestStream.request(1);
114+
}
115+
116+
public void onError(Throwable throwable) {
117+
LOGGER.error("Failed to send data", throwable);
118+
}
119+
120+
public void onCompleted() {
121+
LOGGER.info("All Done");
122+
}
123+
};
124+
125+
greeterStub.sayHello(helloReplyStreamObserver);
126+
}
127+
128+
private void greetBlockingService() {
129+
HelloRequest request = HelloRequest.newBuilder().setName("Sophia").build();
130+
greeterBlockingStub.sayHello(request);
131+
}
132+
133+
private void greetBlockingErrorService() {
134+
HelloRequest request = HelloRequest.newBuilder().setName("Tony").build();
135+
greeterBlockingErrorStub.sayHello(request);
136+
}
137+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.testcase.grpc.provider;
20+
21+
import io.grpc.Server;
22+
import io.grpc.ServerBuilder;
23+
import io.grpc.ServerInterceptors;
24+
import org.apache.skywalking.apm.testcase.grpc.provider.interceptor.ProviderInterceptor;
25+
import org.apache.skywalking.apm.testcase.grpc.provider.service.GreeterBlockingErrorServiceImpl;
26+
import org.apache.skywalking.apm.testcase.grpc.provider.service.GreeterBlockingServiceImpl;
27+
import org.apache.skywalking.apm.testcase.grpc.provider.service.GreeterServiceImpl;
28+
import org.springframework.beans.factory.annotation.Configurable;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.stereotype.Component;
31+
32+
@Configurable
33+
@Component
34+
public class ProviderConfiguration {
35+
36+
@Bean(initMethod = "start", destroyMethod = "shutdown")
37+
public Server server() {
38+
return ServerBuilder.forPort(18080)
39+
.addService(ServerInterceptors.intercept(new GreeterServiceImpl(), new ProviderInterceptor()))
40+
.addService(ServerInterceptors.intercept(new GreeterBlockingServiceImpl(), new ProviderInterceptor()))
41+
.addService(ServerInterceptors.intercept(new GreeterBlockingErrorServiceImpl(), new ProviderInterceptor()))
42+
.build();
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.apm.testcase.grpc.provider.interceptor;
20+
21+
import io.grpc.ForwardingServerCall;
22+
import io.grpc.ForwardingServerCallListener;
23+
import io.grpc.Metadata;
24+
import io.grpc.ServerCall;
25+
import io.grpc.ServerCallHandler;
26+
import io.grpc.ServerInterceptor;
27+
28+
import java.util.HashMap;
29+
import java.util.Map;
30+
31+
import org.apache.logging.log4j.LogManager;
32+
import org.apache.logging.log4j.Logger;
33+
34+
public class ProviderInterceptor implements ServerInterceptor {
35+
private static final Logger LOGGER = LogManager.getLogger(ProviderInterceptor.class);
36+
37+
@Override
38+
public <REQ_T, RESQ_T> ServerCall.Listener<REQ_T> interceptCall(ServerCall<REQ_T, RESQ_T> call, Metadata metadata,
39+
ServerCallHandler<REQ_T, RESQ_T> handler) {
40+
Map<String, String> headerMap = new HashMap<String, String>();
41+
for (String key : metadata.keys()) {
42+
LOGGER.info("Receive key: {}", key);
43+
if (!key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
44+
String value = metadata.get(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER));
45+
46+
headerMap.put(key, value);
47+
}
48+
}
49+
LOGGER.info("authority : {}", call.getAuthority());
50+
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<REQ_T>(handler.startCall(new ForwardingServerCall.SimpleForwardingServerCall<REQ_T, RESQ_T>(call) {
51+
@Override
52+
public void sendHeaders(Metadata responseHeaders) {
53+
LOGGER.info("sendHeaders....");
54+
Metadata.Key<String> headerKey = Metadata.Key.of("test-server", Metadata.ASCII_STRING_MARSHALLER);
55+
responseHeaders.put(headerKey, "test-server");
56+
delegate().sendHeaders(responseHeaders);
57+
}
58+
59+
@Override
60+
public void sendMessage(RESQ_T message) {
61+
delegate().sendMessage(message);
62+
}
63+
64+
}, metadata)) {
65+
@Override
66+
public void onReady() {
67+
LOGGER.info("onReady....");
68+
delegate().onReady();
69+
}
70+
71+
@Override
72+
public void onCancel() {
73+
LOGGER.info("onCancel....");
74+
delegate().onCancel();
75+
}
76+
77+
@Override
78+
public void onComplete() {
79+
LOGGER.info("onComplete....");
80+
delegate().onComplete();
81+
}
82+
83+
@Override
84+
public void onHalfClose() {
85+
LOGGER.info("onHalfClose....");
86+
delegate().onHalfClose();
87+
}
88+
89+
@Override
90+
public void onMessage(REQ_T message) {
91+
LOGGER.info("onMessage....");
92+
delegate().onMessage(message);
93+
}
94+
};
95+
}
96+
}

0 commit comments

Comments
 (0)