Skip to content

Commit 11be33e

Browse files
author
Michael Fero
committed
Fixing local_dc_ assignment and addressing review comments
The local DC is now only being applied to the DCAware policy if the current `local_dc_` is not set. New unit tests were added to ensure that the local DC is not overridden when set in the default or execution profiles and also set when the local DC is not initially set.
1 parent fb33796 commit 11be33e

11 files changed

Lines changed: 169 additions & 59 deletions

cpp-driver/gtests/src/unit/tests/test_session.cpp

Lines changed: 154 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -643,17 +643,13 @@ TEST_F(SessionUnitTest, HostListenerNodeDown) {
643643
}
644644

645645
TEST_F(SessionUnitTest, LocalDcUpdatedOnPolicy) {
646-
mockssandra::SimpleCluster cluster(simple(), 3, 2);
646+
mockssandra::SimpleCluster cluster(simple(), 3, 1);
647647
ASSERT_EQ(cluster.start_all(), 0);
648648

649649
TestHostListener::Ptr listener(new TestHostListener());
650650

651651
Config config;
652-
config.contact_points().push_back(Address("127.0.0.1", 9042));
653-
config.set_use_randomized_contact_points(
654-
false); // Ensure round robin order over DC for query execution
655-
config.set_token_aware_routing(false);
656-
config.set_load_balancing_policy(new DCAwarePolicy("dc1"));
652+
config.contact_points().push_back(Address("127.0.0.4", 9042));
657653
config.set_cluster_metadata_resolver_factory(
658654
ClusterMetadataResolverFactory::Ptr(new LocalDcClusterMetadataResolverFactory("dc2")));
659655
config.set_host_listener(listener);
@@ -666,22 +662,168 @@ TEST_F(SessionUnitTest, LocalDcUpdatedOnPolicy) {
666662
listener->wait_for_event(WAIT_FOR_TIME));
667663
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.4", 9042)),
668664
listener->wait_for_event(WAIT_FOR_TIME));
669-
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.5", 9042)),
665+
}
666+
667+
for (int i = 0; i < 20; ++i) { // Validate the request processors are using DC2 only
668+
QueryRequest::Ptr request(new QueryRequest("blah", 0));
669+
670+
ResponseFuture::Ptr future = session.execute(request, NULL);
671+
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
672+
EXPECT_FALSE(future->error());
673+
EXPECT_EQ("127.0.0.4", future->address().to_string());
674+
}
675+
676+
close(&session);
677+
678+
ASSERT_EQ(0u, listener->event_count());
679+
}
680+
681+
TEST_F(SessionUnitTest, LocalDcNotOverriddenOnPolicy) {
682+
mockssandra::SimpleCluster cluster(simple(), 1, 3);
683+
ASSERT_EQ(cluster.start_all(), 0);
684+
685+
TestHostListener::Ptr listener(new TestHostListener());
686+
687+
Config config;
688+
config.contact_points().push_back(Address("127.0.0.1", 9042));
689+
config.set_load_balancing_policy(new DCAwarePolicy("dc1"));
690+
config.set_cluster_metadata_resolver_factory(
691+
ClusterMetadataResolverFactory::Ptr(new LocalDcClusterMetadataResolverFactory("dc2")));
692+
config.set_host_listener(listener);
693+
694+
Session session;
695+
connect(config, &session);
696+
697+
{ // Initial nodes available from peers table (should be DC1)
698+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.1", 9042)),
670699
listener->wait_for_event(WAIT_FOR_TIME));
671-
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.5", 9042)),
700+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.1", 9042)),
672701
listener->wait_for_event(WAIT_FOR_TIME));
673702
}
674703

675-
for (int i = 0; i < 20; ++i) { // Validate the request processors are using DC2 only
704+
for (int i = 0; i < 20; ++i) { // Validate the request processors are using DC1 only
676705
QueryRequest::Ptr request(new QueryRequest("blah", 0));
677706

678707
ResponseFuture::Ptr future = session.execute(request, NULL);
679708
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
680709
EXPECT_FALSE(future->error());
710+
EXPECT_EQ("127.0.0.1", future->address().to_string());
711+
}
712+
713+
close(&session);
714+
715+
ASSERT_EQ(0u, listener->event_count());
716+
}
717+
718+
TEST_F(SessionUnitTest, LocalDcOverriddenOnPolicyUsingExecutionProfiles) {
719+
mockssandra::SimpleCluster cluster(simple(), 3, 1);
720+
ASSERT_EQ(cluster.start_all(), 0);
721+
722+
TestHostListener::Ptr listener(new TestHostListener());
723+
724+
Config config;
725+
config.contact_points().push_back(Address("127.0.0.4", 9042));
726+
config.set_use_randomized_contact_points(
727+
false); // Ensure round robin order over DC for query execution
728+
config.set_cluster_metadata_resolver_factory(
729+
ClusterMetadataResolverFactory::Ptr(new LocalDcClusterMetadataResolverFactory("dc2")));
730+
config.set_host_listener(listener);
731+
732+
ExecutionProfile profile;
733+
profile.set_load_balancing_policy(new DCAwarePolicy());
734+
config.set_execution_profile("use_propagated_local_dc", &profile);
735+
736+
Session session;
737+
connect(config, &session);
738+
739+
{ // Initial nodes available from peers table (should be DC2)
740+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.4", 9042)),
741+
listener->wait_for_event(WAIT_FOR_TIME));
742+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.4", 9042)),
743+
listener->wait_for_event(WAIT_FOR_TIME));
744+
}
745+
746+
for (int i = 0; i < 20; ++i) { // Validate the default profile is using DC2 only
747+
QueryRequest::Ptr request(new QueryRequest("blah", 0));
681748

682-
String expected_address = "127.0.0.";
683-
expected_address.append((i % 2 == 0) ? "4" : "5");
684-
EXPECT_EQ(expected_address, future->address().to_string());
749+
ResponseFuture::Ptr future = session.execute(request, NULL);
750+
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
751+
EXPECT_FALSE(future->error());
752+
EXPECT_EQ("127.0.0.4", future->address().to_string());
753+
}
754+
755+
for (int i = 0; i < 20; ++i) { // Validate the default profile is using DC2 only
756+
QueryRequest::Ptr request(new QueryRequest("blah", 0));
757+
request->set_execution_profile_name("use_propagated_local_dc");
758+
759+
ResponseFuture::Ptr future = session.execute(request, NULL);
760+
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
761+
EXPECT_FALSE(future->error());
762+
EXPECT_EQ("127.0.0.4", future->address().to_string());
763+
}
764+
765+
close(&session);
766+
767+
ASSERT_EQ(0u, listener->event_count());
768+
}
769+
770+
TEST_F(SessionUnitTest, LocalDcNotOverriddenOnPolicyUsingExecutionProfiles) {
771+
mockssandra::SimpleCluster cluster(simple(), 3, 1);
772+
ASSERT_EQ(cluster.start_all(), 0);
773+
774+
TestHostListener::Ptr listener(new TestHostListener());
775+
776+
Config config;
777+
config.contact_points().push_back(Address("127.0.0.4", 9042));
778+
config.set_use_randomized_contact_points(
779+
false); // Ensure round robin order over DC for query execution
780+
config.set_cluster_metadata_resolver_factory(
781+
ClusterMetadataResolverFactory::Ptr(new LocalDcClusterMetadataResolverFactory("dc2")));
782+
config.set_host_listener(listener);
783+
784+
ExecutionProfile profile;
785+
profile.set_load_balancing_policy(new DCAwarePolicy("dc1"));
786+
config.set_execution_profile("use_dc1", &profile);
787+
788+
Session session;
789+
connect(config, &session);
790+
791+
{ // Initial nodes available from peers table (should be DC1 and DC2)
792+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.1", 9042)),
793+
listener->wait_for_event(WAIT_FOR_TIME));
794+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.1", 9042)),
795+
listener->wait_for_event(WAIT_FOR_TIME));
796+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.2", 9042)),
797+
listener->wait_for_event(WAIT_FOR_TIME));
798+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.2", 9042)),
799+
listener->wait_for_event(WAIT_FOR_TIME));
800+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.3", 9042)),
801+
listener->wait_for_event(WAIT_FOR_TIME));
802+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.3", 9042)),
803+
listener->wait_for_event(WAIT_FOR_TIME));
804+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE, Address("127.0.0.4", 9042)),
805+
listener->wait_for_event(WAIT_FOR_TIME));
806+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE, Address("127.0.0.4", 9042)),
807+
listener->wait_for_event(WAIT_FOR_TIME));
808+
}
809+
810+
for (int i = 0; i < 20; ++i) { // Validate the default profile is using DC2 only
811+
QueryRequest::Ptr request(new QueryRequest("blah", 0));
812+
813+
ResponseFuture::Ptr future = session.execute(request, NULL);
814+
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
815+
EXPECT_FALSE(future->error());
816+
EXPECT_EQ("127.0.0.4", future->address().to_string());
817+
}
818+
819+
for (int i = 0; i < 20; ++i) { // Validate the default profile is using DC1 only
820+
QueryRequest::Ptr request(new QueryRequest("blah", 0));
821+
request->set_execution_profile_name("use_dc1");
822+
823+
ResponseFuture::Ptr future = session.execute(request, NULL);
824+
EXPECT_TRUE(future->wait_for(WAIT_FOR_TIME));
825+
EXPECT_FALSE(future->error());
826+
EXPECT_NE("127.0.0.4", future->address().to_string());
685827
}
686828

687829
close(&session);

cpp-driver/src/cluster_connector.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,14 +116,12 @@ void ClusterConnector::internal_resolve_and_connect() {
116116
bind_callback(&ClusterConnector::on_resolve, this));
117117
}
118118

119-
void ClusterConnector::internal_connect(const Address& address, ProtocolVersion version,
120-
const String& local_dc) {
119+
void ClusterConnector::internal_connect(const Address& address, ProtocolVersion version) {
121120
Host::Ptr host(new Host(address));
122121
ControlConnector::Ptr connector(
123122
new ControlConnector(host, version, bind_callback(&ClusterConnector::on_connect, this)));
124123
connectors_[address] = connector; // Keep track of the connectors so they can be canceled.
125124
connector->with_metrics(metrics_)
126-
->with_local_dc(local_dc)
127125
->with_settings(settings_.control_connection_settings)
128126
->connect(event_loop_->loop());
129127
}
@@ -180,11 +178,12 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) {
180178
return;
181179
}
182180

181+
local_dc_ = resolver->local_dc();
183182
remaining_connector_count_ = resolved_contact_points.size();
184183
for (AddressVec::const_iterator it = resolved_contact_points.begin(),
185184
end = resolved_contact_points.end();
186185
it != end; ++it) {
187-
internal_connect(*it, protocol_version_, resolver->local_dc());
186+
internal_connect(*it, protocol_version_);
188187
}
189188
}
190189

@@ -260,7 +259,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
260259

261260
cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_,
262261
connected_host, hosts, connector->schema(), default_policy, policies,
263-
connector->local_dc(), settings_));
262+
local_dc_, settings_));
264263

265264
// Clear any connection errors and set the final negotiated protocol version.
266265
error_code_ = CLUSTER_OK;
@@ -282,7 +281,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
282281
on_error(CLUSTER_ERROR_INVALID_PROTOCOL, "Unable to find supported protocol version");
283282
return;
284283
}
285-
internal_connect(connector->address(), lower_version, connector->local_dc());
284+
internal_connect(connector->address(), lower_version);
286285
} else if (connector->is_ssl_error()) {
287286
ssl_error_code_ = connector->ssl_error_code();
288287
on_error(CLUSTER_ERROR_SSL_ERROR, connector->error_message());

cpp-driver/src/cluster_connector.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
138138

139139
private:
140140
void internal_resolve_and_connect();
141-
void internal_connect(const Address& address, ProtocolVersion version, const String& local_dc);
141+
void internal_connect(const Address& address, ProtocolVersion version);
142142
void internal_cancel();
143143

144144
void finish();
@@ -168,6 +168,7 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
168168
EventLoop* event_loop_;
169169
Random* random_;
170170
Metrics* metrics_;
171+
String local_dc_;
171172
ClusterSettings settings_;
172173

173174
Callback callback_;

cpp-driver/src/connector.cpp

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,6 @@ Connector* Connector::with_metrics(Metrics* metrics) {
201201
return this;
202202
}
203203

204-
Connector* Connector::with_local_dc(const String& local_dc) {
205-
local_dc_ = local_dc;
206-
return this;
207-
}
208-
209204
Connector* Connector::with_settings(const ConnectionSettings& settings) {
210205
settings_ = settings;
211206
// Only use hostname resolution if actually required for SSL or

cpp-driver/src/connector.hpp

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -132,16 +132,6 @@ class Connector
132132
*/
133133
Connector* with_metrics(Metrics* metrics);
134134

135-
/**
136-
* Set the local datacenter to use for the initializing the load balancing policies after the
137-
* connection is connected.
138-
*
139-
* @param local_dc The local datacenter determined by the metadata service for initializing the
140-
* load balancing policies.
141-
* @return The connector to chain calls.
142-
*/
143-
Connector* with_local_dc(const String& local_dc);
144-
145135
/**
146136
* Set the connection and socket settings.
147137
*
@@ -232,7 +222,6 @@ class Connector
232222
int event_types_;
233223
ConnectionListener* listener_;
234224
Metrics* metrics_;
235-
String local_dc_;
236225
ConnectionSettings settings_;
237226
};
238227

cpp-driver/src/control_connector.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,6 @@ ControlConnector* ControlConnector::with_metrics(Metrics* metrics) {
9090
return this;
9191
}
9292

93-
ControlConnector* ControlConnector::with_local_dc(const String& local_dc) {
94-
local_dc_ = local_dc;
95-
return this;
96-
}
97-
9893
ControlConnector* ControlConnector::with_settings(const ControlConnectionSettings& settings) {
9994
settings_ = settings;
10095
return this;
@@ -109,7 +104,6 @@ void ControlConnector::connect(uv_loop_t* loop) {
109104
event_types = CASS_EVENT_TOPOLOGY_CHANGE | CASS_EVENT_STATUS_CHANGE;
110105
}
111106
connector_->with_metrics(metrics_)
112-
->with_local_dc(local_dc_)
113107
->with_settings(settings_.connection_settings)
114108
->with_event_types(event_types)
115109
->connect(loop);

cpp-driver/src/control_connector.hpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,6 @@ class ControlConnector
9696
*/
9797
ControlConnector* with_metrics(Metrics* metrics);
9898

99-
/**
100-
* Set the local datacenter to use for the connection.
101-
*
102-
* @param local_dc The local datacenter determined by the metadata service for initializing the
103-
* load balancing policies.
104-
* @return The connector to chain calls.
105-
*/
106-
ControlConnector* with_local_dc(const String& local_dc);
107-
10899
/**
109100
* Sets the control connection settings as well as the underlying settings
110101
* for the connection and socket.

cpp-driver/src/dc_aware_policy.cpp

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,12 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot
4343
DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); }
4444

4545
void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
46-
const String& local_dc /*= ""*/) {
47-
if (!local_dc.empty()) {
48-
if (!local_dc_.empty()) {
49-
LOG_INFO("Overriding local data center %s with %s", local_dc_.c_str(), local_dc.c_str());
50-
}
46+
const String& local_dc) {
47+
if (local_dc_.empty()) { // Do not override
5148
local_dc_ = local_dc;
52-
} else if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) {
49+
}
50+
51+
if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) {
5352
LOG_INFO("Using '%s' for the local data center "
5453
"(if this is incorrect, please provide the correct data center)",
5554
connected_host->dc().c_str());

cpp-driver/src/list_policy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ using namespace datastax::internal;
2323
using namespace datastax::internal::core;
2424

2525
void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
26-
const String& local_dc /*= ""*/) {
26+
const String& local_dc) {
2727
HostMap valid_hosts;
2828
for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
2929
const Host::Ptr& host = i->second;

cpp-driver/src/round_robin_policy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ RoundRobinPolicy::RoundRobinPolicy()
3333
RoundRobinPolicy::~RoundRobinPolicy() { uv_rwlock_destroy(&available_rwlock_); }
3434

3535
void RoundRobinPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
36-
const String& local_dc /*= ""*/) {
36+
const String& local_dc) {
3737
available_.resize(hosts.size());
3838
std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()),
3939
GetAddress());

0 commit comments

Comments
 (0)