4444import org .apache .rocketmq .common .message .Message ;
4545import org .apache .rocketmq .common .message .MessageQueue ;
4646import org .apache .rocketmq .common .topic .TopicValidator ;
47+ import org .apache .rocketmq .common .utils .ThreadUtils ;
4748import org .apache .rocketmq .logging .org .slf4j .Logger ;
4849import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
4950import org .apache .rocketmq .remoting .RPCHook ;
@@ -54,6 +55,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
5455 private static final Logger log = LoggerFactory .getLogger (AsyncTraceDispatcher .class );
5556 private static final AtomicInteger COUNTER = new AtomicInteger ();
5657 private static final AtomicInteger INSTANCE_NUM = new AtomicInteger (0 );
58+ private static final long WAIT_FOR_SHUTDOWN = 5000L ;
5759 private volatile boolean stopped = false ;
5860 private final int traceInstanceId = INSTANCE_NUM .getAndIncrement ();
5961 private final int batchNum ;
@@ -190,23 +192,19 @@ public boolean append(final Object ctx) {
190192
191193 @ Override
192194 public void flush () {
193- long end = System .currentTimeMillis () + 500 ;
194- while (traceContextQueue .size () > 0 || appenderQueue .size () > 0 && System .currentTimeMillis () <= end ) {
195+ while (traceContextQueue .size () > 0 ) {
195196 try {
196197 flushTraceContext (true );
197198 } catch (Throwable throwable ) {
198199 log .error ("flushTraceContext error" , throwable );
199200 }
200201 }
201- if (appenderQueue .size () > 0 ) {
202- log .error ("There are still some traces that haven't been sent " + traceContextQueue .size () + " " + appenderQueue .size ());
203- }
204202 }
205203
206204 @ Override
207205 public void shutdown () {
208206 flush ();
209- this .traceExecutor . shutdown ( );
207+ ThreadUtils . shutdownGracefully ( this .traceExecutor , WAIT_FOR_SHUTDOWN , TimeUnit . MILLISECONDS );
210208 if (isStarted .get ()) {
211209 traceProducer .shutdown ();
212210 }
0 commit comments