Skip to content

Commit 50160bf

Browse files
qianye1001RongtongJin
authored andcommitted
[ISSUE #9375] Make client trace thread can be closed correctly (#9376)
1 parent be73bf7 commit 50160bf

File tree

2 files changed

+22
-24
lines changed

2 files changed

+22
-24
lines changed

client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,19 @@
1616
*/
1717
package org.apache.rocketmq.client.trace;
1818

19+
import java.nio.charset.StandardCharsets;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.HashSet;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Set;
26+
import java.util.concurrent.ArrayBlockingQueue;
27+
import java.util.concurrent.ThreadPoolExecutor;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.atomic.AtomicLong;
1932
import org.apache.rocketmq.client.AccessChannel;
2033
import org.apache.rocketmq.client.common.ThreadLocalIndex;
2134
import org.apache.rocketmq.client.exception.MQClientException;
@@ -35,20 +48,6 @@
3548
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3649
import org.apache.rocketmq.remoting.RPCHook;
3750

38-
import java.nio.charset.StandardCharsets;
39-
import java.util.ArrayList;
40-
import java.util.HashMap;
41-
import java.util.HashSet;
42-
import java.util.List;
43-
import java.util.Map;
44-
import java.util.Set;
45-
import java.util.concurrent.ArrayBlockingQueue;
46-
import java.util.concurrent.ThreadPoolExecutor;
47-
import java.util.concurrent.TimeUnit;
48-
import java.util.concurrent.atomic.AtomicBoolean;
49-
import java.util.concurrent.atomic.AtomicInteger;
50-
import java.util.concurrent.atomic.AtomicLong;
51-
5251
import static org.apache.rocketmq.client.trace.TraceConstants.TRACE_INSTANCE_NAME;
5352

5453
public class AsyncTraceDispatcher implements TraceDispatcher {
@@ -254,9 +253,10 @@ public void run() {
254253
} catch (Throwable e) {
255254
log.error("flushTraceContext error", e);
256255
}
257-
}
258-
if (AsyncTraceDispatcher.this.stopped) {
259-
this.stopped = true;
256+
257+
if (AsyncTraceDispatcher.this.stopped) {
258+
this.stopped = true;
259+
}
260260
}
261261
}
262262
}

client/src/test/java/org/apache/rocketmq/client/impl/consumer/ProcessQueueTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616
*/
1717
package org.apache.rocketmq.client.impl.consumer;
1818

19+
import java.lang.reflect.Field;
20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import java.util.TreeMap;
1924
import org.apache.commons.lang3.reflect.FieldUtils;
2025
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
2126
import org.apache.rocketmq.client.exception.MQBrokerException;
@@ -29,12 +34,6 @@
2934
import org.junit.runner.RunWith;
3035
import org.mockito.junit.MockitoJUnitRunner;
3136

32-
import java.lang.reflect.Field;
33-
import java.util.ArrayList;
34-
import java.util.Collections;
35-
import java.util.List;
36-
import java.util.TreeMap;
37-
3837
import static org.assertj.core.api.Assertions.assertThat;
3938
import static org.junit.Assert.assertEquals;
4039
import static org.junit.Assert.assertFalse;
@@ -158,7 +157,6 @@ public void testProcessQueue() {
158157
ProcessQueue processQueue2 = createProcessQueue();
159158
assertEquals(processQueue1.getMsgAccCnt(), processQueue2.getMsgAccCnt());
160159
assertEquals(processQueue1.getTryUnlockTimes(), processQueue2.getTryUnlockTimes());
161-
assertEquals(processQueue1.getLastLockTimestamp(), processQueue2.getLastLockTimestamp());
162160
assertEquals(processQueue1.getLastPullTimestamp(), processQueue2.getLastPullTimestamp());
163161
}
164162

0 commit comments

Comments
 (0)