Skip to content

Commit 2e7e57f

Browse files
authored
Merge pull request #275 from riptano/CPP-803
CPP-803 - Propagate `local_dc` from `CloudClusterMetadataResolver` to load balancing policies
2 parents 1796159 + 7fc5478 commit 2e7e57f

33 files changed

Lines changed: 399 additions & 88 deletions

cpp-driver/gtests/src/unit/tests/test_cluster.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,37 @@ class ClusterUnitTest : public EventLoopTest {
288288
};
289289
};
290290

291+
class LocalDcClusterMetadataResolver : public ClusterMetadataResolver {
292+
public:
293+
LocalDcClusterMetadataResolver(const String& local_dc)
294+
: desired_local_dc_(local_dc) {}
295+
296+
private:
297+
virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) {
298+
resolved_contact_points_ = contact_points;
299+
local_dc_ = desired_local_dc_;
300+
callback_(this);
301+
}
302+
303+
virtual void internal_cancel() {}
304+
305+
private:
306+
String desired_local_dc_;
307+
};
308+
309+
class LocalDcClusterMetadataResolverFactory : public ClusterMetadataResolverFactory {
310+
public:
311+
LocalDcClusterMetadataResolverFactory(const String& local_dc)
312+
: local_dc_(local_dc) {}
313+
314+
virtual ClusterMetadataResolver::Ptr new_instance(const ClusterSettings& settings) const {
315+
return ClusterMetadataResolver::Ptr(new LocalDcClusterMetadataResolver(local_dc_));
316+
}
317+
318+
private:
319+
String local_dc_;
320+
};
321+
291322
static void on_connection_connected(ClusterConnector* connector, Future* future) {
292323
if (connector->is_ok()) {
293324
future->set();
@@ -958,6 +989,28 @@ TEST_F(ClusterUnitTest, ReconnectionPolicy) {
958989
EXPECT_EQ(3u, mock_cluster.connection_attempts(1)); // Includes initial connection attempt
959990
}
960991

992+
TEST_F(ClusterUnitTest, LocalDcFromResolver) {
993+
mockssandra::SimpleCluster cluster(simple(), 1);
994+
ASSERT_EQ(cluster.start_all(), 0);
995+
996+
AddressVec contact_points;
997+
contact_points.push_back(Address("127.0.0.1", 9042));
998+
999+
Future::Ptr connect_future(new Future());
1000+
ClusterConnector::Ptr connector(
1001+
new ClusterConnector(contact_points, PROTOCOL_VERSION,
1002+
bind_callback(on_connection_reconnect, connect_future.get())));
1003+
1004+
ClusterSettings settings;
1005+
settings.cluster_metadata_resolver_factory = ClusterMetadataResolverFactory::Ptr(
1006+
new LocalDcClusterMetadataResolverFactory("this_local_dc"));
1007+
connector->with_settings(settings)->connect(event_loop());
1008+
1009+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
1010+
EXPECT_FALSE(connect_future->error());
1011+
ASSERT_EQ("this_local_dc", connect_future->cluster()->local_dc());
1012+
}
1013+
9611014
TEST_F(ClusterUnitTest, NoContactPoints) {
9621015
// No cluster needed
9631016

cpp-driver/gtests/src/unit/tests/test_load_balancing.cpp

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ void test_dc_aware_policy(size_t local_count, size_t remote_count) {
169169
populate_hosts(local_count, "rack", LOCAL_DC, &hosts);
170170
populate_hosts(remote_count, "rack", REMOTE_DC, &hosts);
171171
DCAwarePolicy policy(LOCAL_DC, remote_count, false);
172-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
172+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
173173

174174
const size_t total_hosts = local_count + remote_count;
175175

@@ -185,7 +185,7 @@ TEST(RoundRobinLoadBalancingUnitTest, Simple) {
185185
populate_hosts(2, "rack", "dc", &hosts);
186186

187187
RoundRobinPolicy policy;
188-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
188+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
189189

190190
// start on first elem
191191
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
@@ -207,7 +207,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnAdd) {
207207
populate_hosts(2, "rack", "dc", &hosts);
208208

209209
RoundRobinPolicy policy;
210-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
210+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
211211

212212
// baseline
213213
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
@@ -230,7 +230,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnRemove) {
230230
populate_hosts(3, "rack", "dc", &hosts);
231231

232232
RoundRobinPolicy policy;
233-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
233+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
234234

235235
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
236236
SharedRefPtr<Host> host = hosts.begin()->second;
@@ -251,7 +251,7 @@ TEST(RoundRobinLoadBalancingUnitTest, OnUpAndDown) {
251251
populate_hosts(3, "rack", "dc", &hosts);
252252

253253
RoundRobinPolicy policy;
254-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
254+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
255255

256256
ScopedPtr<QueryPlan> qp_before1(policy.new_query_plan("ks", NULL, NULL));
257257
ScopedPtr<QueryPlan> qp_before2(policy.new_query_plan("ks", NULL, NULL));
@@ -297,7 +297,7 @@ TEST(RoundRobinLoadBalancingUnitTest, VerifyEqualDistribution) {
297297
populate_hosts(3, "rack", "dc", &hosts);
298298

299299
RoundRobinPolicy policy;
300-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
300+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
301301

302302
{ // All nodes
303303
QueryCounts counts(run_policy(policy, 12));
@@ -338,7 +338,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SomeDatacenterLocalUnspecified) {
338338
h->set_rack_and_dc("", "");
339339

340340
DCAwarePolicy policy(LOCAL_DC, 1, false);
341-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
341+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
342342

343343
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
344344

@@ -353,7 +353,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, SingleLocalDown) {
353353
populate_hosts(1, "rack", REMOTE_DC, &hosts);
354354

355355
DCAwarePolicy policy(LOCAL_DC, 1, false);
356-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
356+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
357357

358358
ScopedPtr<QueryPlan> qp_before(
359359
policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan
@@ -380,7 +380,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllLocalRemovedReturned) {
380380
populate_hosts(1, "rack", REMOTE_DC, &hosts);
381381

382382
DCAwarePolicy policy(LOCAL_DC, 1, false);
383-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
383+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
384384

385385
ScopedPtr<QueryPlan> qp_before(
386386
policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan
@@ -412,7 +412,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, RemoteRemovedReturned) {
412412
SharedRefPtr<Host> target_host = hosts[target_addr];
413413

414414
DCAwarePolicy policy(LOCAL_DC, 1, false);
415-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
415+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
416416

417417
ScopedPtr<QueryPlan> qp_before(
418418
policy.new_query_plan("ks", NULL, NULL)); // has down host ptr in plan
@@ -443,7 +443,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, UsedHostsPerDatacenter) {
443443

444444
for (size_t used_hosts = 0; used_hosts < 4; ++used_hosts) {
445445
DCAwarePolicy policy(LOCAL_DC, used_hosts, false);
446-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
446+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
447447

448448
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
449449
Vector<size_t> seq;
@@ -476,7 +476,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllowRemoteDatacentersForLocalConsist
476476
// Not allowing remote DCs for local CLs
477477
bool allow_remote_dcs_for_local_cl = false;
478478
DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl);
479-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
479+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
480480

481481
// Set local CL
482482
QueryRequest::Ptr request(new QueryRequest("", 0));
@@ -494,7 +494,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, AllowRemoteDatacentersForLocalConsist
494494
// Allowing remote DCs for local CLs
495495
bool allow_remote_dcs_for_local_cl = true;
496496
DCAwarePolicy policy(LOCAL_DC, 3, !allow_remote_dcs_for_local_cl);
497-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
497+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
498498

499499
// Set local CL
500500
QueryRequest::Ptr request(new QueryRequest("", 0));
@@ -517,7 +517,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) {
517517
// Set local DC using connected host
518518
{
519519
DCAwarePolicy policy("", 0, false);
520-
policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL);
520+
policy.init(hosts[Address("2.0.0.0", 9042)], hosts, NULL, "");
521521

522522
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
523523
const size_t seq[] = { 2, 3, 4 };
@@ -527,7 +527,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, StartWithEmptyLocalDatacenter) {
527527
// Set local DC using first host with non-empty DC
528528
{
529529
DCAwarePolicy policy("", 0, false);
530-
policy.init(SharedRefPtr<Host>(new Host(Address("0.0.0.0", 9042))), hosts, NULL);
530+
policy.init(SharedRefPtr<Host>(new Host(Address("0.0.0.0", 9042))), hosts, NULL, "");
531531

532532
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
533533
const size_t seq[] = { 1 };
@@ -547,7 +547,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionLocalDc) {
547547
populate_hosts(3, "rack", REMOTE_DC, &hosts);
548548

549549
DCAwarePolicy policy("", 0, false);
550-
policy.init(hosts.begin()->second, hosts, NULL);
550+
policy.init(hosts.begin()->second, hosts, NULL, "");
551551

552552
{ // All local nodes
553553
QueryCounts counts(run_policy(policy, 12));
@@ -590,7 +590,7 @@ TEST(DatacenterAwareLoadBalancingUnitTest, VerifyEqualDistributionRemoteDc) {
590590
populate_hosts(3, "rack", REMOTE_DC, &hosts);
591591

592592
DCAwarePolicy policy("", 3, false); // Allow all remote DC nodes
593-
policy.init(hosts.begin()->second, hosts, NULL);
593+
policy.init(hosts.begin()->second, hosts, NULL, "");
594594

595595
Host::Ptr remote_dc_node1;
596596
{ // Mark down all local nodes
@@ -664,7 +664,7 @@ TEST(TokenAwareLoadBalancingUnitTest, Simple) {
664664
token_map->build();
665665

666666
TokenAwarePolicy policy(new RoundRobinPolicy(), false);
667-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
667+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
668668

669669
QueryRequest::Ptr request(new QueryRequest("", 1));
670670
const char* value = "kjdfjkldsdjkl"; // hash: 9024137376112061887
@@ -737,7 +737,7 @@ TEST(TokenAwareLoadBalancingUnitTest, NetworkTopology) {
737737
token_map->build();
738738

739739
TokenAwarePolicy policy(new DCAwarePolicy(LOCAL_DC, num_hosts / 2, false), false);
740-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
740+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
741741

742742
QueryRequest::Ptr request(new QueryRequest("", 1));
743743
const char* value = "abc"; // hash: -5434086359492102041
@@ -811,7 +811,7 @@ TEST(TokenAwareLoadBalancingUnitTest, ShuffleReplicas) {
811811
HostVec not_shuffled;
812812
{
813813
TokenAwarePolicy policy(new RoundRobinPolicy(), false); // Not shuffled
814-
policy.init(SharedRefPtr<Host>(), hosts, &random);
814+
policy.init(SharedRefPtr<Host>(), hosts, &random, "");
815815
ScopedPtr<QueryPlan> qp1(policy.new_query_plan("test", request_handler.get(), token_map.get()));
816816
for (int i = 0; i < num_hosts; ++i) {
817817
not_shuffled.push_back(qp1->compute_next());
@@ -829,7 +829,7 @@ TEST(TokenAwareLoadBalancingUnitTest, ShuffleReplicas) {
829829
// Verify that the shuffle setting does indeed shuffle the replicas
830830
{
831831
TokenAwarePolicy shuffle_policy(new RoundRobinPolicy(), true); // Shuffled
832-
shuffle_policy.init(SharedRefPtr<Host>(), hosts, &random);
832+
shuffle_policy.init(SharedRefPtr<Host>(), hosts, &random, "");
833833

834834
HostVec shuffled_previous;
835835
ScopedPtr<QueryPlan> qp(
@@ -927,7 +927,7 @@ TEST(LatencyAwareLoadBalancingUnitTest, Simple) {
927927
HostMap hosts;
928928
populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts);
929929
LatencyAwarePolicy policy(new RoundRobinPolicy(), settings);
930-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
930+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
931931

932932
// Record some latencies with 100 ns being the minimum
933933
for (HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) {
@@ -989,7 +989,7 @@ TEST(LatencyAwareLoadBalancingUnitTest, MinAverageUnderMinMeasured) {
989989
HostMap hosts;
990990
populate_hosts(num_hosts, "rack1", LOCAL_DC, &hosts);
991991
LatencyAwarePolicy policy(new RoundRobinPolicy(), settings);
992-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
992+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
993993

994994
int count = 1;
995995
for (HostMap::iterator i = hosts.begin(); i != hosts.end(); ++i) {
@@ -1023,7 +1023,7 @@ TEST(WhitelistLoadBalancingUnitTest, Hosts) {
10231023
whitelist_hosts.push_back("37.0.0.0");
10241024
whitelist_hosts.push_back("83.0.0.0");
10251025
WhitelistPolicy policy(new RoundRobinPolicy(), whitelist_hosts);
1026-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
1026+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
10271027

10281028
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
10291029

@@ -1044,7 +1044,7 @@ TEST(WhitelistLoadBalancingUnitTest, Datacenters) {
10441044
whitelist_dcs.push_back(LOCAL_DC);
10451045
whitelist_dcs.push_back(REMOTE_DC);
10461046
WhitelistDCPolicy policy(new RoundRobinPolicy(), whitelist_dcs);
1047-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
1047+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
10481048

10491049
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
10501050

@@ -1064,7 +1064,7 @@ TEST(BlacklistLoadBalancingUnitTest, Hosts) {
10641064
blacklist_hosts.push_back("2.0.0.0");
10651065
blacklist_hosts.push_back("3.0.0.0");
10661066
BlacklistPolicy policy(new RoundRobinPolicy(), blacklist_hosts);
1067-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
1067+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
10681068

10691069
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
10701070

@@ -1085,7 +1085,7 @@ TEST(BlacklistLoadBalancingUnitTest, Datacenters) {
10851085
blacklist_dcs.push_back(LOCAL_DC);
10861086
blacklist_dcs.push_back(REMOTE_DC);
10871087
BlacklistDCPolicy policy(new RoundRobinPolicy(), blacklist_dcs);
1088-
policy.init(SharedRefPtr<Host>(), hosts, NULL);
1088+
policy.init(SharedRefPtr<Host>(), hosts, NULL, "");
10891089

10901090
ScopedPtr<QueryPlan> qp(policy.new_query_plan("ks", NULL, NULL));
10911091

0 commit comments

Comments
 (0)