Skip to content

Commit 1b2078b

Browse files
committed
Fix rocketmq-5-grpc-scenario: avoid curl timeout by not joining consumer threads
The testcase() method was joining consumer threads that could block for 10-30 seconds, exceeding curl's 3-second timeout. This caused the main HTTP entry span to never complete, so no producer segments were reported. Fix: move PushConsumer startup to healthCheck() for early rebalance, and start SimpleConsumer threads without joining so testcase() returns fast. Background consumers complete within the 5-second post-curl sleep window. Verified locally: both 5.1.1 and 5.1.4 pass with all expected segments.
1 parent 067f167 commit 1b2078b

1 file changed

Lines changed: 14 additions & 18 deletions

File tree

  • test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller

test/plugin/scenarios/rocketmq-5-grpc-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/client/java/controller/CaseController.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,30 +53,20 @@ public class CaseController {
5353
public String testcase() {
5454
try {
5555
messageService.sendNormalMessage(NORMAL_TOPIC, TAG_NOMARL, GROUP);
56-
Thread t1 = new Thread(() -> messageService.pushConsumes(
57-
Collections.singletonList(NORMAL_TOPIC),
58-
Collections.singletonList(TAG_NOMARL),
59-
GROUP
60-
));
61-
t1.start();
62-
t1.join();
6356

6457
messageService.sendNormalMessageAsync(ASYNC_PRODUCER_TOPIC, TAG_ASYNC_PRODUCER, GROUP);
6558
messageService.sendNormalMessageAsync(ASYNC_PRODUCER_TOPIC, TAG_ASYNC_PRODUCER, GROUP);
66-
Thread t2 = new Thread(() -> messageService.simpleConsumes(Collections.singletonList(ASYNC_PRODUCER_TOPIC),
67-
Collections.singletonList(TAG_ASYNC_PRODUCER), GROUP,
68-
10, 10
69-
));
70-
t2.start();
71-
t2.join();
59+
new Thread(() -> messageService.simpleConsumes(
60+
Collections.singletonList(ASYNC_PRODUCER_TOPIC),
61+
Collections.singletonList(TAG_ASYNC_PRODUCER), GROUP,
62+
10, 10
63+
)).start();
7264

7365
messageService.sendNormalMessage(ASYNC_CONSUMER_TOPIC, TAG_ASYNC_CONSUMER, GROUP);
7466
messageService.sendNormalMessage(ASYNC_CONSUMER_TOPIC, TAG_ASYNC_CONSUMER, GROUP);
75-
Thread t3 = new Thread(() -> messageService.simpleConsumeAsync(ASYNC_CONSUMER_TOPIC, TAG_ASYNC_CONSUMER, GROUP, 10,
76-
10
77-
));
78-
t3.start();
79-
t3.join();
67+
new Thread(() -> messageService.simpleConsumeAsync(
68+
ASYNC_CONSUMER_TOPIC, TAG_ASYNC_CONSUMER, GROUP, 10, 10
69+
)).start();
8070
} catch (Exception e) {
8171
log.error("testcase error", e);
8272
}
@@ -90,6 +80,12 @@ public String healthCheck() throws Exception {
9080
messageService.updateNormalTopic(NORMAL_TOPIC);
9181
messageService.updateNormalTopic(ASYNC_PRODUCER_TOPIC);
9282
messageService.updateNormalTopic(ASYNC_CONSUMER_TOPIC);
83+
// Start push consumer early so it has time to complete rebalance
84+
messageService.pushConsumes(
85+
Collections.singletonList(NORMAL_TOPIC),
86+
Collections.singletonList(TAG_NOMARL),
87+
GROUP
88+
);
9389
final Producer producer = ProducerSingleton.getInstance(endpoints, NORMAL_TOPIC);
9490
return SUCCESS;
9591
}

0 commit comments

Comments
 (0)