3030import org .apache .activemq .network .DiscoveryNetworkConnector ;
3131import org .apache .activemq .network .NetworkConnector ;
3232import org .apache .activemq .network .NetworkTestSupport ;
33+ import org .apache .activemq .util .Wait ;
3334import org .slf4j .Logger ;
3435import org .slf4j .LoggerFactory ;
3536import org .junit .experimental .categories .Category ;
@@ -59,6 +60,9 @@ protected void setUp() throws Exception {
5960 nc .setDuplex (true );
6061 remoteBroker .addNetworkConnector (nc );
6162 nc .start ();
63+
64+ assertTrue ("Network bridge did not establish in time" ,
65+ Wait .waitFor (() -> !nc .activeBridges ().isEmpty (), 15000 , 100 ));
6266 }
6367
6468 protected void tearDown () throws Exception {
@@ -92,28 +96,25 @@ public void testDurableSubNetwork() throws Exception {
9296 Destination dest = session .createTopic (topicName );
9397 TopicSubscriber sub = session .createDurableSubscriber ((Topic )dest , subName );
9498 LOG .info ("Durable subscription of name " + subName + "created." );
95- Thread .sleep (100 );
9699
97100 // query durable sub on local and remote broker
98101 // raise an error if not found
99-
100- assertTrue (foundSubInLocalBroker (subName ));
101-
102-
103- assertTrue (foundSubInRemoteBrokerByTopicName (topicName ));
102+ assertTrue ("Durable subscription not found in local broker" ,
103+ Wait .waitFor (() -> foundSubInLocalBroker (subName ), 15000 , 100 ));
104+ assertTrue ("Durable subscription not propagated to remote broker" ,
105+ Wait .waitFor (() -> foundSubInRemoteBrokerByTopicName (topicName ), 15000 , 100 ));
104106
105107 // unsubscribe from durable sub
106108 sub .close ();
107109 session .unsubscribe (subName );
108110 LOG .info ("Unsubscribed from durable subscription." );
109- Thread .sleep (100 );
110111
111112 // query durable sub on local and remote broker
112113 // raise an error if its not removed from both brokers
113- assertFalse ( foundSubInLocalBroker ( subName ));
114-
115- assertFalse ("Durable subscription not unregistered on remote broker" ,
116- foundSubInRemoteBrokerByTopicName (topicName ));
114+ assertTrue ( "Durable subscription not removed from local broker" ,
115+ Wait . waitFor (() -> ! foundSubInLocalBroker ( subName ), 15000 , 100 ));
116+ assertTrue ("Durable subscription not unregistered on remote broker" ,
117+ Wait . waitFor (() -> ! foundSubInRemoteBrokerByTopicName (topicName ), 15000 , 100 ));
117118
118119
119120 }
@@ -131,39 +132,35 @@ public void testTwoDurableSubsInNetworkWithUnsubscribe() throws Exception{
131132 TopicSubscriber sub2 = session .createDurableSubscriber ((Topic ) dest , subName2 );
132133 LOG .info ("Durable subscription of name " + subName2 + "created." );
133134
134- Thread .sleep (100 );
135-
136135 // query durable sub on local and remote broker
137136 // raise an error if not found
138-
139- assertTrue ( foundSubInLocalBroker (subName ));
140- assertTrue (foundSubInLocalBroker ( subName2 ));
141-
142-
143- assertTrue ( foundSubInRemoteBrokerByTopicName (topicName ));
137+ assertTrue ( "Subscription1 not found in local broker" ,
138+ Wait . waitFor (() -> foundSubInLocalBroker (subName ), 15000 , 100 ));
139+ assertTrue ("Subscription2 not found in local broker" ,
140+ Wait . waitFor (() -> foundSubInLocalBroker ( subName2 ), 15000 , 100 ));
141+ assertTrue ( "Durable subscription not propagated to remote broker" ,
142+ Wait . waitFor (() -> foundSubInRemoteBrokerByTopicName (topicName ), 15000 , 100 ));
144143
145144 // unsubscribe from durable sub
146145 sub .close ();
147146 session .unsubscribe (subName );
148147 LOG .info ("Unsubscribed from durable subscription." );
149- Thread .sleep (100 );
150148
151149 // query durable sub on local and remote broker
152- assertFalse (foundSubInLocalBroker (subName ));
153- assertTrue (foundSubInLocalBroker (subName2 ));
154-
150+ assertTrue ("Subscription1 not removed from local broker" ,
151+ Wait .waitFor (() -> !foundSubInLocalBroker (subName ), 15000 , 100 ));
152+ assertTrue ("Subscription2 should still be in local broker" ,
153+ Wait .waitFor (() -> foundSubInLocalBroker (subName2 ), 15000 , 100 ));
155154 assertTrue ("Durable subscription should still be on remote broker" ,
156- foundSubInRemoteBrokerByTopicName (topicName ));
155+ Wait . waitFor (() -> foundSubInRemoteBrokerByTopicName (topicName ), 15000 , 100 ));
157156
158157 sub2 .close ();
159158 session .unsubscribe (subName2 );
160159
161- Thread .sleep (100 );
162-
163- assertFalse (foundSubInLocalBroker (subName2 ));
164-
165- assertFalse ("Durable subscription not unregistered on remote broker" ,
166- foundSubInRemoteBrokerByTopicName (topicName ));
160+ assertTrue ("Subscription2 not removed from local broker" ,
161+ Wait .waitFor (() -> !foundSubInLocalBroker (subName2 ), 15000 , 100 ));
162+ assertTrue ("Durable subscription not unregistered on remote broker" ,
163+ Wait .waitFor (() -> !foundSubInRemoteBrokerByTopicName (topicName ), 15000 , 100 ));
167164
168165 }
169166
0 commit comments