Skip to content

Commit f299c50

Browse files
wu-shengclaude
andcommitted
Fix rocketmq-scenario: start consumer before producer, no blocking
The previous latch.await(30s) approach blocked the HTTP handler far longer than curl's --max-time 3 timeout, causing curl to disconnect and the entry span to never complete. Only the healthCheck segment was collected. Now the consumer starts first so it is already subscribed when the message is sent. The endpoint returns immediately (within curl's timeout), and the 5-second sleep in run.sh gives the agent time to flush both producer and consumer segments to the collector. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent f396353 commit f299c50

1 file changed

Lines changed: 15 additions & 33 deletions

File tree

  • test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller

test/plugin/scenarios/rocketmq-scenario/src/main/java/test/apache/skywalking/apm/testcase/rocketmq/controller/CaseController.java

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
import java.nio.charset.StandardCharsets;
3737
import java.util.Date;
3838
import java.util.List;
39-
import java.util.concurrent.CountDownLatch;
40-
import java.util.concurrent.TimeUnit;
4139

4240
@RestController
4341
@RequestMapping("/case")
@@ -53,49 +51,33 @@ public class CaseController {
5351
@ResponseBody
5452
public String testcase() {
5553
try {
56-
// start producer
54+
// start consumer first so it is ready to receive
55+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
56+
consumer.setNamesrvAddr(namerServer);
57+
consumer.subscribe("TopicTest", "*");
58+
consumer.registerMessageListener(new MessageListenerConcurrently() {
59+
@Override
60+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
61+
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
62+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
63+
}
64+
});
65+
consumer.start();
66+
System.out.printf("Consumer Started.%n");
67+
68+
// start producer and send msg
5769
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
5870
producer.setNamesrvAddr(namerServer);
5971
producer.start();
6072
System.out.printf("Provider Started.%n");
6173

62-
// send msg
6374
Message msg = new Message("TopicTest",
6475
("Hello RocketMQ sendMsg " + new Date()).getBytes(RemotingHelper.DEFAULT_CHARSET)
6576
);
6677
msg.setTags("TagA");
6778
msg.setKeys("KeyA");
6879
SendResult sendResult = producer.send(msg);
6980
System.out.printf("%s send msg: %s%n", new Date(), sendResult);
70-
71-
// start consumer and wait for message to be consumed
72-
CountDownLatch latch = new CountDownLatch(1);
73-
Thread thread = new Thread(new Runnable() {
74-
@Override
75-
public void run() {
76-
try {
77-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
78-
consumer.setNamesrvAddr(namerServer);
79-
consumer.subscribe("TopicTest", "*");
80-
consumer.registerMessageListener(new MessageListenerConcurrently() {
81-
@Override
82-
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
83-
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody(), StandardCharsets.UTF_8));
84-
latch.countDown();
85-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
86-
}
87-
});
88-
consumer.start();
89-
System.out.printf("Consumer Started.%n");
90-
} catch (Exception e) {
91-
log.error("consumer start error", e);
92-
}
93-
}
94-
});
95-
thread.start();
96-
latch.await(30, TimeUnit.SECONDS);
97-
// Wait for the agent to flush the consumer segment to the collector
98-
Thread.sleep(2000);
9981
} catch (Exception e) {
10082
log.error("testcase error", e);
10183
}

0 commit comments

Comments
 (0)