Skip to content

Commit 1a5a4c6

Browse files
author
Michael Fero
committed
Adding tests to validate propagating local datacenter
1 parent 1d2a7a7 commit 1a5a4c6

2 files changed

Lines changed: 131 additions & 0 deletions

File tree

cpp-driver/gtests/src/unit/tests/test_cluster.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,37 @@ class ClusterUnitTest : public EventLoopTest {
288288
};
289289
};
290290

291+
class LocalDcClusterMetadataResolver : public ClusterMetadataResolver {
292+
public:
293+
LocalDcClusterMetadataResolver(const String& local_dc)
294+
: desired_local_dc_(local_dc) {}
295+
296+
private:
297+
virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) {
298+
resolved_contact_points_ = contact_points;
299+
local_dc_ = desired_local_dc_;
300+
callback_(this);
301+
}
302+
303+
virtual void internal_cancel() {}
304+
305+
private:
306+
String desired_local_dc_;
307+
};
308+
309+
class LocalDcClusterMetadataResolverFactory : public ClusterMetadataResolverFactory {
310+
public:
311+
LocalDcClusterMetadataResolverFactory(const String& local_dc)
312+
: local_dc_(local_dc) {}
313+
314+
virtual ClusterMetadataResolver::Ptr new_instance(const ClusterSettings& settings) const {
315+
return ClusterMetadataResolver::Ptr(new LocalDcClusterMetadataResolver(local_dc_));
316+
}
317+
318+
private:
319+
String local_dc_;
320+
};
321+
291322
static void on_connection_connected(ClusterConnector* connector, Future* future) {
292323
if (connector->is_ok()) {
293324
future->set();
@@ -957,3 +988,25 @@ TEST_F(ClusterUnitTest, ReconnectionPolicy) {
957988
EXPECT_GE(policy->scheduled_delay_count(), 2u);
958989
EXPECT_EQ(3u, mock_cluster.connection_attempts(1)); // Includes initial connection attempt
959990
}
991+
992+
TEST_F(ClusterUnitTest, LocalDcFromResolver) {
993+
mockssandra::SimpleCluster cluster(simple(), 1);
994+
ASSERT_EQ(cluster.start_all(), 0);
995+
996+
AddressVec contact_points;
997+
contact_points.push_back(Address("127.0.0.1", 9042));
998+
999+
Future::Ptr connect_future(new Future());
1000+
ClusterConnector::Ptr connector(
1001+
new ClusterConnector(contact_points, PROTOCOL_VERSION,
1002+
bind_callback(on_connection_reconnect, connect_future.get())));
1003+
1004+
ClusterSettings settings;
1005+
settings.cluster_metadata_resolver_factory = ClusterMetadataResolverFactory::Ptr(
1006+
new LocalDcClusterMetadataResolverFactory("this_local_dc"));
1007+
connector->with_settings(settings)->connect(event_loop());
1008+
1009+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
1010+
EXPECT_FALSE(connect_future->error());
1011+
ASSERT_EQ("this_local_dc", connect_future->cluster()->local_dc());
1012+
}

cpp-driver/gtests/src/unit/tests/test_session.cpp

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,37 @@ class SessionUnitTest : public EventLoopTest {
195195
uv_mutex_t mutex_;
196196
EventQueue events_;
197197
};
198+
199+
class LocalDcClusterMetadataResolver : public ClusterMetadataResolver {
200+
public:
201+
LocalDcClusterMetadataResolver(const String& local_dc)
202+
: desired_local_dc_(local_dc) {}
203+
204+
private:
205+
virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) {
206+
resolved_contact_points_ = contact_points;
207+
local_dc_ = desired_local_dc_;
208+
callback_(this);
209+
}
210+
211+
virtual void internal_cancel() {}
212+
213+
private:
214+
String desired_local_dc_;
215+
};
216+
217+
class LocalDcClusterMetadataResolverFactory : public ClusterMetadataResolverFactory {
218+
public:
219+
LocalDcClusterMetadataResolverFactory(const String& local_dc)
220+
: local_dc_(local_dc) {}
221+
222+
virtual ClusterMetadataResolver::Ptr new_instance(const ClusterSettings& settings) const {
223+
return ClusterMetadataResolver::Ptr(new LocalDcClusterMetadataResolver(local_dc_));
224+
}
225+
226+
private:
227+
String local_dc_;
228+
};
198229
};
199230

200231
TEST_F(SessionUnitTest, ExecuteQueryNotConnected) {
@@ -610,3 +641,50 @@ TEST_F(SessionUnitTest, HostListenerNodeDown) {
610641

611642
ASSERT_EQ(0u, listener->event_count());
612643
}
644+
645+
TEST_F(SessionUnitTest, LocalDcUpdatedOnPolicy) {
646+
mockssandra::SimpleCluster cluster(simple(), 3, 2);
647+
ASSERT_EQ(cluster.start_all(), 0);
648+
649+
TestHostListener::Ptr listener(new TestHostListener());
650+
651+
Config config;
652+
config.contact_points().push_back(Address("127.0.0.1", 9042));
653+
config.set_use_randomized_contact_points(
654+
false); // Ensure round robin order over DC for query execution
655+
config.set_token_aware_routing(false);
656+
config.set_load_balancing_policy(new DCAwarePolicy("dc1"));
657+
config.set_cluster_metadata_resolver_factory(
658+
ClusterMetadataResolverFactory::Ptr(new LocalDcClusterMetadataResolverFactory("dc2")));
659+
config.set_host_listener(listener);
660+
661+
Session session;
662+
connect(config, &session);
663+
664+
{ // Initial nodes available from peers table (should skip DC1)
665+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.4", 9042)),
666+
listener->wait_for_event(WAIT_FOR_TIME));
667+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.4", 9042)),
668+
listener->wait_for_event(WAIT_FOR_TIME));
669+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.5", 9042)),
670+
listener->wait_for_event(WAIT_FOR_TIME));
671+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.5", 9042)),
672+
listener->wait_for_event(WAIT_FOR_TIME));
673+
}
674+
675+
for (int i = 0; i < 20; ++i) { // Validate the request processors are using DC2 only
676+
QueryRequest::Ptr request(new QueryRequest("blah", 0));
677+
678+
ResponseFuture::Ptr future = session.execute(request, NULL);
679+
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
680+
EXPECT_FALSE(future->error());
681+
682+
String expected_address = "127.0.0.";
683+
expected_address.append((i % 2 == 0) ? "4" : "5");
684+
EXPECT_EQ(expected_address, future->address().to_string());
685+
}
686+
687+
close(&session);
688+
689+
ASSERT_EQ(0u, listener->event_count());
690+
}

0 commit comments

Comments
 (0)