Skip to content

Commit 3870627

Browse files
authored
CPP-745 - Exponential reconnection policy with jitter (#254)
* CPP-745 - Exponential reconnection policy
1 parent 85a9294 commit 3870627

28 files changed

Lines changed: 818 additions & 79 deletions

cpp-driver/gtests/src/integration/objects/cluster.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -302,13 +302,13 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
302302
}
303303

304304
/**
305-
* Sets the amount of time to wait before attempting to reconnect.
305+
* Sets the constant reconnection policy.
306306
*
307-
* @param wait_time_ms Wait time in milliseconds (default: 2000)
307+
* @param delay_ms Delay in milliseconds (default: 2000)
308308
* @return Cluster object
309309
*/
310-
Cluster& with_reconnect_wait_time(unsigned int wait_time_ms) {
311-
cass_cluster_set_reconnect_wait_time(get(), wait_time_ms);
310+
Cluster& with_constant_reconnect(unsigned int delay_ms) {
311+
cass_cluster_set_constant_reconnect(get(), delay_ms);
312312
return *this;
313313
}
314314

cpp-driver/gtests/src/integration/tests/test_cluster.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,21 @@ TEST(ClusterTest, SetLoadBalanceDcAwareNullLocalDc) {
2828
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS,
2929
cass_cluster_set_load_balance_dc_aware(cluster.get(), NULL, 99, cass_false));
3030
}
31+
32+
/**
33+
* Set invalid parameters for exponential reconnection policy.
34+
*
35+
* @jira_ticket CPP-745
36+
* @test_category configuration
37+
* @expected_result CASS_ERROR_LIB_BAD_PARAMS.
38+
*/
39+
TEST(ClusterTest, ExponentialReconnectionPolicyBadParameters) {
40+
test::driver::Cluster cluster;
41+
42+
// Base delay cannot be zero
43+
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_cluster_set_exponential_reconnect(cluster.get(), 0, 1));
44+
// Max delay cannot be zero
45+
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_cluster_set_exponential_reconnect(cluster.get(), 1, 0));
46+
// Base delay cannot be greater than max delay
47+
EXPECT_EQ(CASS_ERROR_LIB_BAD_PARAMS, cass_cluster_set_exponential_reconnect(cluster.get(), 2, 1));
48+
}

cpp-driver/gtests/src/integration/tests/test_control_connection.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTwoNodeClusterTests, StatusChange)
355355
* Create a new session connection using the round robin load balancing policy
356356
* to ensure all nodes can be accessed during request execution
357357
*/
358-
Cluster cluster = default_cluster().with_load_balance_round_robin().with_reconnect_wait_time(
358+
Cluster cluster = default_cluster().with_load_balance_round_robin().with_constant_reconnect(
359359
10); // Ensure reconnect timeout is quick
360360
Session session = cluster.connect();
361361

cpp-driver/gtests/src/unit/mockssandra.cpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ ServerConnection::ServerConnection(const Address& address, const ClientConnectio
350350
, rc_(0)
351351
, address_(address)
352352
, factory_(factory)
353-
, ssl_context_(NULL) {
353+
, ssl_context_(NULL)
354+
, connection_attempts_(0) {
354355
uv_mutex_init(&mutex_);
355356
uv_cond_init(&cond_);
356357
}
@@ -493,6 +494,7 @@ void ServerConnection::wait_close() {
493494
}
494495
}
495496

497+
unsigned ServerConnection::connection_attempts() const { return connection_attempts_.load(); }
496498
void ServerConnection::run(const ServerConnectionTask::Ptr& task) {
497499
ScopedMutex l(&mutex_);
498500
if (state_ != STATE_LISTENING) return;
@@ -575,6 +577,8 @@ void ServerConnection::on_connection(uv_stream_t* server, int status) {
575577
}
576578

577579
void ServerConnection::handle_connection(int status) {
580+
connection_attempts_.fetch_add(1);
581+
578582
if (status != 0) {
579583
fprintf(stderr, "Listen failure: %s\n", uv_strerror(status));
580584
return;
@@ -2184,6 +2188,14 @@ Hosts Cluster::hosts() const {
21842188
return hosts;
21852189
}
21862190

2191+
unsigned Cluster::connection_attempts(size_t node) const {
2192+
if (node < 1 || node > servers_.size()) {
2193+
return 0;
2194+
}
2195+
const Server& server = servers_[node - 1];
2196+
return server.connection->connection_attempts();
2197+
}
2198+
21872199
int Cluster::create_and_add_server(AddressGenerator& generator, ClientConnectionFactory& factory,
21882200
const String& dc) {
21892201
Address address(generator.next());

cpp-driver/gtests/src/unit/mockssandra.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ class ServerConnection : public RefCounted<ServerConnection> {
176176
void close();
177177
void wait_close();
178178

179+
unsigned connection_attempts() const;
179180
void run(const ServerConnectionTask::Ptr& task);
180181

181182
private:
@@ -215,6 +216,7 @@ class ServerConnection : public RefCounted<ServerConnection> {
215216
const Address address_;
216217
const ClientConnectionFactory& factory_;
217218
SSL_CTX* ssl_context_;
219+
Atomic<unsigned> connection_attempts_;
218220
};
219221

220222
} // namespace internal
@@ -1162,6 +1164,8 @@ class Cluster {
11621164
const Host& host(const Address& address) const;
11631165
Hosts hosts() const;
11641166

1167+
unsigned connection_attempts(size_t node) const;
1168+
11651169
void event(const Event::Ptr& event);
11661170

11671171
private:

cpp-driver/gtests/src/unit/tests/test_cluster.cpp

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
using namespace datastax::internal;
2424
using namespace datastax::internal::core;
2525

26+
#define FIFTEEN_PERCENT(value) static_cast<double>((value * 115) / 100)
27+
2628
class ClusterUnitTest : public EventLoopTest {
2729
public:
2830
ClusterUnitTest()
@@ -239,6 +241,53 @@ class ClusterUnitTest : public EventLoopTest {
239241
mockssandra::Cluster& simple_cluster_;
240242
};
241243

244+
class ClusterUnitTestReconnectionPolicy : public ReconnectionPolicy {
245+
public:
246+
typedef SharedRefPtr<ClusterUnitTestReconnectionPolicy> Ptr;
247+
248+
ClusterUnitTestReconnectionPolicy()
249+
: ReconnectionPolicy(ReconnectionPolicy::CONSTANT)
250+
, reconnection_schedule_count_(0)
251+
, destroyed_reconnection_schedule_count_(0)
252+
, scheduled_delay_count_(0) {}
253+
254+
virtual const char* name() const { return "blah"; }
255+
virtual ReconnectionSchedule* new_reconnection_schedule() {
256+
++reconnection_schedule_count_;
257+
return new ClusterUnitTestReconnectionSchedule(&scheduled_delay_count_,
258+
&destroyed_reconnection_schedule_count_);
259+
}
260+
261+
unsigned reconnection_schedule_count() const { return reconnection_schedule_count_; }
262+
unsigned destroyed_reconnection_schedule_count() const {
263+
return destroyed_reconnection_schedule_count_;
264+
}
265+
unsigned scheduled_delay_count() const { return scheduled_delay_count_; }
266+
267+
private:
268+
unsigned reconnection_schedule_count_;
269+
unsigned destroyed_reconnection_schedule_count_;
270+
unsigned scheduled_delay_count_;
271+
272+
class ClusterUnitTestReconnectionSchedule : public ReconnectionSchedule {
273+
public:
274+
ClusterUnitTestReconnectionSchedule(unsigned* delay_count, unsigned* destroyed_count)
275+
: delay_count_(delay_count)
276+
, destroyed_count_(destroyed_count) {}
277+
278+
~ClusterUnitTestReconnectionSchedule() { ++*destroyed_count_; }
279+
280+
virtual uint64_t next_delay_ms() {
281+
++*delay_count_;
282+
return 1;
283+
}
284+
285+
private:
286+
unsigned* delay_count_;
287+
unsigned* destroyed_count_;
288+
};
289+
};
290+
242291
static void on_connection_connected(ClusterConnector* connector, Future* future) {
243292
if (connector->is_ok()) {
244293
future->set();
@@ -469,7 +518,7 @@ TEST_F(ClusterUnitTest, ReconnectToDiscoveredHosts) {
469518
ReconnectClusterListener::Ptr listener(new ReconnectClusterListener(close_future, &outage_plan));
470519

471520
ClusterSettings settings;
472-
settings.reconnect_timeout_ms = 1; // Reconnect immediately
521+
settings.reconnection_policy.reset(new ConstantReconnectionPolicy(1)); // Reconnect immediately
473522
settings.control_connection_settings.connection_settings.connect_timeout_ms =
474523
200; // Give enough time for the connection to complete
475524

@@ -512,7 +561,7 @@ TEST_F(ClusterUnitTest, ReconnectUpdateHosts) {
512561
ReconnectClusterListener::Ptr listener(new ReconnectClusterListener(close_future, &outage_plan));
513562

514563
ClusterSettings settings;
515-
settings.reconnect_timeout_ms = 1; // Reconnect immediately
564+
settings.reconnection_policy.reset(new ConstantReconnectionPolicy(1)); // Reconnect immediately
516565
settings.control_connection_settings.connection_settings.connect_timeout_ms =
517566
200; // Give enough time for the connection to complete
518567

@@ -553,7 +602,8 @@ TEST_F(ClusterUnitTest, CloseDuringReconnect) {
553602
Listener::Ptr listener(new Listener(close_future));
554603

555604
ClusterSettings settings;
556-
settings.reconnect_timeout_ms = 100000; // Make sure we're reconnecting when we close.
605+
settings.reconnection_policy.reset(
606+
new ConstantReconnectionPolicy(100000)); // Make sure we're reconnecting when we close.
557607

558608
connector->with_settings(settings)->with_listener(listener.get())->connect(event_loop());
559609

@@ -772,7 +822,7 @@ TEST_F(ClusterUnitTest, DCAwareRecoverOnRemoteHost) {
772822
new DCAwarePolicy("dc1", 1, false)); // Allow connection to a single remote host
773823
settings.load_balancing_policies.clear();
774824
settings.load_balancing_policies.push_back(settings.load_balancing_policy);
775-
settings.reconnect_timeout_ms = 1; // Reconnect immediately
825+
settings.reconnection_policy.reset(new ConstantReconnectionPolicy(1)); // Reconnect immediately
776826
settings.control_connection_settings.connection_settings.connect_timeout_ms =
777827
200; // Give enough time for the connection to complete
778828

@@ -868,3 +918,42 @@ TEST_F(ClusterUnitTest, DisableEventsOnStartup) {
868918
connect_future->cluster()->close();
869919
ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));
870920
}
921+
922+
TEST_F(ClusterUnitTest, ReconnectionPolicy) {
923+
mockssandra::SimpleCluster mock_cluster(simple());
924+
ASSERT_EQ(mock_cluster.start_all(), 0);
925+
926+
OutagePlan outage_plan(loop(), &mock_cluster);
927+
outage_plan.stop_node(1);
928+
outage_plan.start_node(1);
929+
outage_plan.stop_node(1);
930+
outage_plan.start_node(1);
931+
932+
ContactPointList contact_points;
933+
contact_points.push_back("127.0.0.1");
934+
935+
Future::Ptr close_future(new Future());
936+
Future::Ptr connect_future(new Future());
937+
ClusterConnector::Ptr connector(
938+
new ClusterConnector(contact_points, PROTOCOL_VERSION,
939+
bind_callback(on_connection_reconnect, connect_future.get())));
940+
ReconnectClusterListener::Ptr listener(new ReconnectClusterListener(close_future, &outage_plan));
941+
942+
ClusterSettings settings;
943+
settings.reconnection_policy.reset(new ClusterUnitTestReconnectionPolicy());
944+
settings.control_connection_settings.connection_settings.connect_timeout_ms =
945+
200; // Give enough time for the connection to complete
946+
connector->with_settings(settings)->with_listener(listener.get())->connect(event_loop());
947+
948+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
949+
EXPECT_FALSE(connect_future->error());
950+
951+
ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));
952+
953+
ClusterUnitTestReconnectionPolicy::Ptr policy(
954+
static_cast<ClusterUnitTestReconnectionPolicy::Ptr>(settings.reconnection_policy));
955+
EXPECT_EQ(2, policy->reconnection_schedule_count());
956+
EXPECT_EQ(2, policy->destroyed_reconnection_schedule_count());
957+
EXPECT_GE(policy->scheduled_delay_count(), 2u);
958+
EXPECT_EQ(3, mock_cluster.connection_attempts(1)); // Includes initial connection attempt
959+
}

0 commit comments

Comments
 (0)