3131import org .apache .rocketmq .common .message .Message ;
3232import org .apache .rocketmq .common .message .MessageQueue ;
3333import org .apache .rocketmq .common .topic .TopicValidator ;
34+ import org .apache .rocketmq .common .utils .ThreadUtils ;
3435import org .apache .rocketmq .logging .org .slf4j .Logger ;
3536import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
3637import org .apache .rocketmq .remoting .RPCHook ;
@@ -55,6 +56,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
5556 private static final Logger log = LoggerFactory .getLogger (AsyncTraceDispatcher .class );
5657 private static final AtomicInteger COUNTER = new AtomicInteger ();
5758 private static final AtomicInteger INSTANCE_NUM = new AtomicInteger (0 );
59+ private static final long WAIT_FOR_SHUTDOWN = 5000L ;
5860 private volatile boolean stopped = false ;
5961 private final int traceInstanceId = INSTANCE_NUM .getAndIncrement ();
6062 private final int batchNum ;
@@ -191,23 +193,19 @@ public boolean append(final Object ctx) {
191193
192194 @ Override
193195 public void flush () {
194- long end = System .currentTimeMillis () + 500 ;
195- while (traceContextQueue .size () > 0 || appenderQueue .size () > 0 && System .currentTimeMillis () <= end ) {
196+ while (traceContextQueue .size () > 0 ) {
196197 try {
197198 flushTraceContext (true );
198199 } catch (Throwable throwable ) {
199200 log .error ("flushTraceContext error" , throwable );
200201 }
201202 }
202- if (appenderQueue .size () > 0 ) {
203- log .error ("There are still some traces that haven't been sent " + traceContextQueue .size () + " " + appenderQueue .size ());
204- }
205203 }
206204
207205 @ Override
208206 public void shutdown () {
209207 flush ();
210- this .traceExecutor . shutdown ( );
208+ ThreadUtils . shutdownGracefully ( this .traceExecutor , WAIT_FOR_SHUTDOWN , TimeUnit . MILLISECONDS );
211209 if (isStarted .get ()) {
212210 traceProducer .shutdown ();
213211 }
0 commit comments