Skip to content

Commit dfb7495

Browse files
authored
CPP-687 - Fix recovery of remote DC host (#427)
The DC-aware load balancing policy depends on an internal host map to determine host distance. The problem was the host distance was being checked to determine whether a node should be added or marked as up. This fix always adds and marks hosts up even if they're ignored. The distance is now only used to determine if a host should have a connection pool.
1 parent cbd6de6 commit dfb7495

8 files changed

Lines changed: 179 additions & 37 deletions

File tree

gtests/src/unit/mockssandra.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1807,15 +1807,23 @@ cass::String SchemaChangeEvent::encode(SchemaChangeEvent::Target target, SchemaC
18071807

18081808
void Cluster::init(AddressGenerator& generator,
18091809
ClientConnectionFactory& factory,
1810-
size_t num_nodes) {
1810+
size_t num_nodes_dc1,
1811+
size_t num_nodes_dc2) {
18111812
MT19937_64 token_rng;
1812-
for (size_t i = 0; i < num_nodes; ++i) {
1813+
for (size_t i = 0; i < num_nodes_dc1; ++i) {
18131814
Address address(generator.next());
18141815
Server server(Host(address, "dc1", "rack1", token_rng),
18151816
internal::ServerConnection::Ptr(
18161817
Memory::allocate<internal::ServerConnection>(address, factory)));
18171818
servers_.push_back(server);
18181819
}
1820+
for (size_t i = 0; i < num_nodes_dc2; ++i) {
1821+
Address address(generator.next());
1822+
Server server(Host(address, "dc2", "rack1", token_rng),
1823+
internal::ServerConnection::Ptr(
1824+
Memory::allocate<internal::ServerConnection>(address, factory)));
1825+
servers_.push_back(server);
1826+
}
18191827
}
18201828

18211829
Cluster::~Cluster() {

gtests/src/unit/mockssandra.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,7 +1119,8 @@ class Cluster {
11191119
protected:
11201120
void init(AddressGenerator& generator,
11211121
ClientConnectionFactory& factory,
1122-
size_t num_nodes);
1122+
size_t num_nodes_dc1,
1123+
size_t num_nodes_dc2);
11231124

11241125
public:
11251126
~Cluster();
@@ -1187,10 +1188,11 @@ class AuthRequestHandlerBuilder : public SimpleRequestHandlerBuilder {
11871188
class SimpleCluster : public Cluster {
11881189
public:
11891190
SimpleCluster(const RequestHandler* request_handler,
1190-
size_t num_nodes = 1)
1191+
size_t num_nodes_dc1 = 1,
1192+
size_t num_nodes_dc2 = 0)
11911193
: factory_(request_handler, this)
11921194
, event_loop_group_(1) {
1193-
init(generator_, factory_, num_nodes);
1195+
init(generator_, factory_, num_nodes_dc1, num_nodes_dc2);
11941196
}
11951197

11961198
~SimpleCluster() {

gtests/src/unit/tests/test_cluster.cpp

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,32 @@ class ClusterUnitTest : public EventLoopTest {
189189
OutagePlan* outage_plan_;
190190
};
191191

192+
class RecoverClusterListener : public UpDownListener {
193+
public:
194+
typedef SharedRefPtr<RecoverClusterListener> Ptr;
195+
196+
RecoverClusterListener(const Future::Ptr& close_future,
197+
const Future::Ptr& up_future,
198+
const Future::Ptr& recover_future)
199+
: UpDownListener(close_future, up_future, Future::Ptr())
200+
, recover_future_(recover_future) { }
201+
202+
const HostVec& connected_hosts() const {
203+
return connected_hosts_;
204+
}
205+
206+
virtual void on_reconnect(Cluster* cluster) {
207+
connected_hosts_.push_back(cluster->connected_host());
208+
if (connected_hosts_.size() > 1 && recover_future_) {
209+
recover_future_->set();
210+
}
211+
}
212+
213+
private:
214+
HostVec connected_hosts_;
215+
Future::Ptr recover_future_;
216+
};
217+
192218
static void on_connection_connected(ClusterConnector* connector, Future* future) {
193219
if (connector->is_ok()) {
194220
future->set();
@@ -208,7 +234,7 @@ class ClusterUnitTest : public EventLoopTest {
208234
break;
209235
case ClusterConnector::CLUSTER_ERROR_NO_HOSTS_AVAILABLE:
210236
future->set_error(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE,
211-
"Unable to connect to any contact points");
237+
connector->error_message());
212238
break;
213239
case ClusterConnector::CLUSTER_CANCELED:
214240
future->set_error(CASS_ERROR_LIB_UNABLE_TO_CONNECT,
@@ -668,3 +694,88 @@ TEST_F(ClusterUnitTest, InvalidSsl) {
668694
ASSERT_TRUE(connect_future->error());
669695
EXPECT_EQ(CASS_ERROR_SSL_INVALID_PEER_CERT, connect_future->error()->code);
670696
}
697+
698+
TEST_F(ClusterUnitTest, DCAwareRecoverOnRemoteHost) {
699+
mockssandra::SimpleCluster cluster(simple(), 1, 1); // 2 DCs with a single node each
700+
ASSERT_EQ(cluster.start_all(), 0);
701+
702+
Address local_address("127.0.0.1", 9042);
703+
Address remote_address("127.0.0.2", 9042);
704+
705+
ContactPointList contact_points;
706+
contact_points.push_back(local_address.to_string());
707+
708+
Future::Ptr close_future(Memory::allocate<Future>());
709+
Future::Ptr connect_future(Memory::allocate<Future>());
710+
ClusterConnector::Ptr connector(Memory::allocate<ClusterConnector>(contact_points,
711+
PROTOCOL_VERSION,
712+
bind_callback(on_connection_reconnect, connect_future.get())));
713+
714+
Future::Ptr up_future(Memory::allocate<Future>());
715+
Future::Ptr recover_future(Memory::allocate<Future>());
716+
RecoverClusterListener::Ptr listener(
717+
Memory::allocate<RecoverClusterListener>(close_future, up_future, recover_future));
718+
719+
ClusterSettings settings;
720+
settings.load_balancing_policy.reset(Memory::allocate<DCAwarePolicy>("dc1", 1, false)); // Allow connection to a single remote host
721+
settings.load_balancing_policies.clear();
722+
settings.load_balancing_policies.push_back(settings.load_balancing_policy);
723+
settings.reconnect_timeout_ms = 1; // Reconnect immediately
724+
settings.control_connection_settings.connection_settings.connect_timeout_ms = 200; // Give enough time for the connection to complete
725+
726+
connector
727+
->with_settings(settings)
728+
->with_listener(listener.get())
729+
->connect(event_loop());
730+
731+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
732+
EXPECT_FALSE(connect_future->error());
733+
734+
// Notify every host as down
735+
connect_future->cluster()->notify_down(local_address);
736+
connect_future->cluster()->notify_down(remote_address);
737+
738+
// Notify the remote host as up
739+
connect_future->cluster()->notify_up(remote_address);
740+
741+
// Verify that the remote host was marked as up
742+
ASSERT_TRUE(up_future->wait_for(WAIT_FOR_TIME));
743+
EXPECT_EQ(remote_address, listener->address());
744+
745+
cluster.stop(1); // Stop local node to verify that remote host is tried for reconnection.
746+
ASSERT_TRUE(recover_future->wait_for(WAIT_FOR_TIME));
747+
748+
connect_future->cluster()->close();
749+
ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));
750+
751+
ASSERT_EQ(listener->connected_hosts().size(), 2u);
752+
EXPECT_EQ(listener->connected_hosts()[0]->address(), Address("127.0.0.1", PORT));
753+
EXPECT_EQ(listener->connected_hosts()[1]->address(), Address("127.0.0.2", PORT)); // Connected to remote host.
754+
}
755+
756+
TEST_F(ClusterUnitTest, InvalidDC) {
757+
mockssandra::SimpleCluster cluster(simple());
758+
ASSERT_EQ(cluster.start_all(), 0);
759+
760+
ContactPointList contact_points;
761+
contact_points.push_back("127.0.0.1");
762+
763+
Future::Ptr connect_future(Memory::allocate<Future>());
764+
ClusterConnector::Ptr connector(Memory::allocate<ClusterConnector>(contact_points,
765+
PROTOCOL_VERSION,
766+
bind_callback(on_connection_connected, connect_future.get())));
767+
768+
ClusterSettings settings;
769+
settings.load_balancing_policy.reset(Memory::allocate<DCAwarePolicy>("invalid_dc", 0, false)); // Invalid DC and not using remote hosts
770+
settings.load_balancing_policies.clear();
771+
settings.load_balancing_policies.push_back(settings.load_balancing_policy);
772+
773+
connector
774+
->with_settings(settings)
775+
->connect(event_loop());
776+
777+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
778+
ASSERT_TRUE(connect_future->error());
779+
EXPECT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, connect_future->error()->code);
780+
EXPECT_TRUE(connect_future->error()->message.find("Check to see if the configured local datacenter is valid") != String::npos);
781+
}

src/cluster.cpp

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -213,19 +213,28 @@ void Cluster::prepared(const String& id,
213213
prepared_metadata_.set(id, entry);
214214
}
215215

216+
HostMap Cluster::available_hosts() const {
217+
HostMap available;
218+
for (HostMap::const_iterator it = hosts_.begin(),
219+
end = hosts_.end(); it != end; ++it) {
220+
if (!is_host_ignored(it->second)) {
221+
available[it->first] = it->second;
222+
}
223+
}
224+
return available;
225+
}
226+
216227
void Cluster::update_hosts(const HostMap& hosts) {
217228
// Update the hosts and properly notify the listener
218229
HostMap existing(hosts_);
219230

220231
for (HostMap::const_iterator it = hosts.begin(),
221232
end = hosts.end(); it != end; ++it) {
222-
if (!is_host_ignored(it->second)) {
223-
HostMap::iterator find_it = existing.find(it->first);
224-
if (find_it != existing.end()) {
225-
existing.erase(find_it); // Already exists mark as visited
226-
} else {
227-
notify_add(it->second); // A new host has been added
228-
}
233+
HostMap::iterator find_it = existing.find(it->first);
234+
if (find_it != existing.end()) {
235+
existing.erase(find_it); // Already exists mark as visited
236+
} else {
237+
notify_add(it->second); // A new host has been added
229238
}
230239
}
231240

@@ -305,6 +314,10 @@ void Cluster::update_token_map(const HostMap& hosts,
305314
}
306315
}
307316

317+
// All hosts from the cluster are included in the host map and in the load
318+
// balancing policies (LBP) so that LBPs return the correct host distance (esp.
319+
// important for DC-aware). This method prevents connection pools from being
320+
// created to ignored hosts.
308321
bool Cluster::is_host_ignored(const Host::Ptr& host) const {
309322
return cass::is_host_ignored(load_balancing_policies_, host);
310323
}
@@ -413,10 +426,6 @@ void Cluster::internal_notify_up(const Address& address, const Host::Ptr& refres
413426

414427
Host::Ptr host(it->second);
415428

416-
if (is_host_ignored(host)) {
417-
return; // Ignore host
418-
}
419-
420429
if (refreshed){
421430
if (token_map_) {
422431
token_map_ = token_map_->copy();
@@ -437,6 +446,10 @@ void Cluster::internal_notify_up(const Address& address, const Host::Ptr& refres
437446
(*it)->on_up(host);
438447
}
439448

449+
if (is_host_ignored(host)) {
450+
return; // Ignore host
451+
}
452+
440453
if (!prepare_host(host,
441454
bind_callback(&Cluster::on_prepare_host_up, this))) {
442455
notify_up_after_prepare(host);
@@ -487,16 +500,16 @@ void Cluster::notify_add(const Host::Ptr& host) {
487500
listener_->on_remove(host_it->second);
488501
}
489502

490-
if (is_host_ignored(host)) {
491-
return; // Ignore host
492-
}
493-
494503
hosts_[host->address()] = host;
495504
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
496505
end = load_balancing_policies_.end(); it != end; ++it) {
497506
(*it)->on_add(host);
498507
}
499508

509+
if (is_host_ignored(host)) {
510+
return; // Ignore host
511+
}
512+
500513
if (!prepare_host(host,
501514
bind_callback(&Cluster::on_prepare_host_add, this))) {
502515
notify_add_after_prepare(host);

src/cluster.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,10 +255,17 @@ class Cluster : public RefCounted<Cluster>
255255
void prepared(const String& id,
256256
const PreparedMetadata::Entry::Ptr& entry);
257257

258+
/**
259+
* Get available hosts (determined by host distance). This filters out ignored
260+
* hosts (*NOT* thread-safe).
261+
*
262+
* @return A mapping of available hosts.
263+
*/
264+
HostMap available_hosts() const;
265+
258266
public:
259267
int protocol_version() const { return connection_->protocol_version(); }
260268
const Host::Ptr& connected_host() const { return connected_host_; }
261-
const HostMap& hosts() const { return hosts_; }
262269
const TokenMap::Ptr& token_map() const { return token_map_; }
263270

264271
private:

src/cluster_connector.cpp

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include "cluster_connector.hpp"
18+
#include "dc_aware_policy.hpp"
1819
#include "random.hpp"
1920
#include "round_robin_policy.hpp"
2021

@@ -230,28 +231,26 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
230231
policy->register_handles(event_loop_->loop());
231232
}
232233

233-
// Gather the available hosts for the load balancing policies
234-
HostMap available_hosts;
235-
for (HostMap::const_iterator it = hosts.begin(), end = hosts.end();
236-
it != end; ++it) {
237-
if (!is_host_ignored(policies, it->second)) {
238-
available_hosts[it->first] = it->second;
234+
ScopedPtr<QueryPlan> query_plan(default_policy->new_query_plan("", NULL, NULL));
235+
if (!query_plan->compute_next()) { // No hosts in the query plan
236+
const char* message;
237+
if (dynamic_cast<DCAwarePolicy::DCAwareQueryPlan*>(query_plan.get()) != NULL) { // Check if DC-aware
238+
message = "No hosts available for the control connection using the " \
239+
"DC-aware load balancing policy. " \
240+
"Check to see if the configured local datacenter is valid";
241+
} else {
242+
message = "No hosts available for the control connection using the " \
243+
"configured load balancing policy";
239244
}
240-
}
241-
242-
if (available_hosts.empty()) {
243-
//TODO(fero): Check for DC aware policy to give more informative message (e.g. invalid DC)
244-
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE,
245-
"No hosts available for connection using the current load " \
246-
"balancing policy(s)");
245+
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE, message);
247246
return;
248247
}
249248

250249
cluster_.reset(Memory::allocate<Cluster>(connector->release_connection(),
251250
listener_,
252251
event_loop_,
253252
connected_host,
254-
available_hosts,
253+
hosts,
255254
connector->schema(),
256255
default_policy,
257256
policies,

src/dc_aware_policy.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
9393
const CopyOnWriteHostVec& get_local_dc_hosts() const;
9494
void get_remote_dcs(PerDCHostMap::KeySet* remote_dcs) const;
9595

96+
public:
9697
class DCAwareQueryPlan : public QueryPlan {
9798
public:
9899
DCAwareQueryPlan(const DCAwarePolicy* policy,
@@ -111,6 +112,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
111112
size_t index_;
112113
};
113114

115+
private:
114116
String local_dc_;
115117
size_t used_hosts_per_remote_dc_;
116118
bool skip_remote_dcs_for_local_cl_;

src/session_base.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ void SessionBase::on_initialize(ClusterConnector* connector) {
165165
cluster_ = connector->release_cluster();
166166
on_connect(cluster_->connected_host(),
167167
cluster_->protocol_version(),
168-
cluster_->hosts(),
168+
cluster_->available_hosts(),
169169
cluster_->token_map());
170170
} else {
171171
assert(!connector->is_canceled() && "Cluster connection process canceled");

0 commit comments

Comments
 (0)