Skip to content

Commit 35f27d3

Browse files
Michael Feromikefero
authored andcommitted
CPP-402 - test: Adding unit/integration tests for host listener callback API
* Updates to mockssandra * Added topology change events for new and removed nodes * Added integration and unit tests for external host listener * Moved `wait_for_logger` into the base integration class * Fixed TODO and broken test in control connection * Removing empty cluster manager test
1 parent cf31713 commit 35f27d3

10 files changed

Lines changed: 342 additions & 50 deletions

File tree

cpp-driver/gtests/src/integration/integration.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#define SIMPLE_KEYSPACE_FORMAT "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s"
2727
#define REPLICATION_STRATEGY "{ 'class': %s }"
2828
#define SELECT_SERVER_VERSION "SELECT release_version FROM system.local"
29+
#define LOGGER_MAXIMUM_WAIT_TIME_MS 10000u
30+
#define LOGGER_WAIT_FOR_NAP_MS 100
2931

3032
// Initialize static variables
3133
bool Integration::skipped_message_displayed_ = false;
@@ -434,3 +436,12 @@ void Integration::maybe_shrink_name(std::string& name)
434436
name = name.substr(0, ENTITY_MAXIMUM_LENGTH - id.size()) + id;
435437
}
436438
}
439+
440+
bool Integration::wait_for_logger(size_t expected_count) {
441+
start_timer();
442+
while (elapsed_time() < LOGGER_MAXIMUM_WAIT_TIME_MS
443+
&& logger_.count() < expected_count) {
444+
msleep(LOGGER_WAIT_FOR_NAP_MS);
445+
}
446+
return logger_.count() >= expected_count;
447+
}

cpp-driver/gtests/src/integration/integration.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,16 @@ class Integration : public testing::Test {
595595
*/
596596
void maybe_shrink_name(std::string& name);
597597

598+
/**
599+
* Wait for the logger count to reach expected count
600+
*
601+
* NOTE: This may wait up to LOGGER_MAXIMUM_WAIT_TIME_MS
602+
*
603+
* @param expected_count Expected logger count
604+
* @return True if expected count is equal to logger count; false otherwise
605+
*/
606+
bool wait_for_logger(size_t expected_count);
607+
598608
private:
599609
/**
600610
* Keyspace creation query (generated via SetUp)

cpp-driver/gtests/src/integration/objects/cluster.hpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,21 @@ class Cluster : public Object<CassCluster, cass_cluster_free> {
189189
return *this;
190190
}
191191

192+
/**
193+
* Sets a callback for handling host state changes in the cluster
194+
*
195+
* @param callback Callback to use for cluster host state changes
196+
* @param data User data supplied to the callback (default: NULL)
197+
* @return Cluster object
198+
*/
199+
Cluster& with_host_listener_callback(CassHostListenerCallback callback,
200+
void* data = NULL) {
201+
EXPECT_EQ(CASS_OK, cass_cluster_set_host_listener_callback(get(),
202+
callback,
203+
data));
204+
return *this;
205+
}
206+
192207
/**
193208
* Enable/Disable the use of hostname resolution
194209
*

cpp-driver/gtests/src/integration/tests/test_cluster_manager.cpp

Whitespace-only changes.

cpp-driver/gtests/src/integration/tests/test_control_connection.cpp

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
#include "integration.hpp"
1818

19-
#define LOGGER_WAIT_FOR_NAP 100
20-
2119
/**
2220
* Control connection integration tests; single node cluster
2321
*/
@@ -84,22 +82,6 @@ class ControlConnectionTests : public Integration {
8482
logger_.add_critera(prefix + node_ip_address.str() + suffix);
8583
}
8684
}
87-
88-
/**
89-
* Wait for the logger count to reach expected count
90-
*
91-
* NOTE: This may wait up to 10s
92-
*
93-
* @param expected_count Expected logger count
94-
* @return True if expected count is equal to logger count; false otherwise
95-
*/
96-
bool wait_for_logger(size_t expected_count) {
97-
start_timer();
98-
while (elapsed_time() < 10000u && logger_.count() < expected_count) {
99-
msleep(LOGGER_WAIT_FOR_NAP);
100-
}
101-
return logger_.count() >= expected_count;
102-
}
10385
};
10486

10587
/**
@@ -353,19 +335,20 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionTests, TopologyChange) {
353335
Session session = cluster.connect();
354336

355337
// Bootstrap a second node and ensure all hosts are actively used
356-
unsigned int node_2 = ccm_->bootstrap_node(); // Triggers a `NEW_NODE` event
357-
msleep(3000); //TODO: Remove static sleep and check driver logs for reduced wait
338+
logger_.add_critera("New node " + (ccm_->get_ip_prefix() + "2") + " added");
339+
EXPECT_EQ(2, ccm_->bootstrap_node()); // Triggers a `NEW_NODE` event
340+
EXPECT_TRUE(wait_for_logger(1));
358341
std::set<unsigned short> expected_nodes;
359342
expected_nodes.insert(1);
360-
expected_nodes.insert(node_2);
343+
expected_nodes.insert(2);
361344
check_hosts(session, expected_nodes);
362345

363346
/*
364347
* Decommission the bootstrapped node and ensure only the first node is
365348
* actively used
366349
*/
367-
force_decommission_node(node_2); // Triggers a `REMOVE_NODE` event
368-
expected_nodes.erase(node_2);
350+
force_decommission_node(2); // Triggers a `REMOVE_NODE` event
351+
expected_nodes.erase(2);
369352
check_hosts(session, expected_nodes);
370353
}
371354

@@ -474,7 +457,9 @@ CASSANDRA_INTEGRATION_TEST_F(ControlConnectionThreeNodeClusterTests,
474457
* policy, initial invalid IP addresses, and ensure only the first node is
475458
* used as the valid contact point for automatic node discovery
476459
*/
477-
logger_.add_critera("Unable to establish a control connection to host 192.0.2.");
460+
logger_.add_critera("to host 192.0.2.1 closed");
461+
logger_.add_critera("to host 192.0.2.2 closed");
462+
logger_.add_critera("to host 192.0.2.3 closed");
478463
Cluster cluster = default_cluster(false) // Do not add the default contact points
479464
.with_load_balance_round_robin()
480465
.with_contact_points(generate_contact_points("192.0.2.", 3)) // Invalid IPs

cpp-driver/gtests/src/integration/tests/test_session.cpp

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,116 @@
1919

2020
class SessionTest : public Integration {
2121
public:
22+
SessionTest()
23+
: added_(0)
24+
, removed_(0)
25+
, up_(0)
26+
, down_(0) { }
27+
2228
void SetUp() {
23-
logger_.add_critera("Attempted to get metrics before connecting session object");
24-
logger_.add_critera("Attempted to get speculative execution metrics before connecting session object");
29+
is_session_requested_ = false;
30+
Integration::SetUp();
2531
}
2632

27-
void check_metrics_log_count() {
28-
EXPECT_EQ(logger_.count(), 2u);
33+
const std::string& address() const { return address_; }
34+
int added() const { return added_; }
35+
int removed() const { return removed_; }
36+
int up() const { return up_; }
37+
int down() const { return down_; }
38+
39+
protected:
40+
static void on_host_listener(CassHostListenerEvent event,
41+
CassInet inet,
42+
void* data) {
43+
SessionTest* instance = static_cast<SessionTest*>(data);
44+
45+
char address[CASS_INET_STRING_LENGTH];
46+
uv_inet_ntop(inet.address_length == CASS_INET_V4_LENGTH ? AF_INET : AF_INET6,
47+
inet.address,
48+
address, CASS_INET_STRING_LENGTH);
49+
instance->address_ = address;
50+
51+
if (event == CASS_HOST_LISTENER_EVENT_ADD) {
52+
TEST_LOG("Added node " << address);
53+
++instance->added_;
54+
} else if (event == CASS_HOST_LISTENER_EVENT_REMOVE) {
55+
TEST_LOG("Removed node " << address);
56+
++instance->removed_;
57+
} else if (event == CASS_HOST_LISTENER_EVENT_UP) {
58+
TEST_LOG("Up node " << address);
59+
++instance->up_;
60+
} else if (event == CASS_HOST_LISTENER_EVENT_DOWN) {
61+
TEST_LOG("Down node " << address);
62+
++instance->down_;
63+
}
2964
}
65+
66+
private:
67+
std::string address_;
68+
int added_;
69+
int removed_;
70+
int up_;
71+
int down_;
3072
};
3173

3274
CASSANDRA_INTEGRATION_TEST_F(SessionTest, MetricsWithoutConnecting) {
33-
Session session(cass_session_new());
75+
CHECK_FAILURE;
76+
77+
Session session;
3478

3579
CassMetrics metrics;
80+
logger_.add_critera("Attempted to get metrics before connecting session object");
3681
cass_session_get_metrics(session.get(), &metrics);
3782

3883
EXPECT_EQ(metrics.requests.min, 0u);
3984
EXPECT_EQ(metrics.requests.one_minute_rate, 0.0);
85+
EXPECT_EQ(logger_.count(), 1u);
4086

4187
CassSpeculativeExecutionMetrics spec_ex_metrics;
88+
logger_.reset();
89+
logger_.add_critera("Attempted to get speculative execution metrics before connecting session object");
4290
cass_session_get_speculative_execution_metrics(session.get(), &spec_ex_metrics);
4391
EXPECT_EQ(spec_ex_metrics.min, 0u);
4492
EXPECT_EQ(spec_ex_metrics.percentage, 0.0);
93+
EXPECT_EQ(logger_.count(), 1u);
94+
}
95+
96+
CASSANDRA_INTEGRATION_TEST_F(SessionTest, ExternalHostListener) {
97+
CHECK_FAILURE;
98+
is_test_chaotic_ = true; // Destroy the cluster after the test completes
99+
100+
Cluster cluster = default_cluster()
101+
.with_load_balance_round_robin()
102+
.with_host_listener_callback(on_host_listener, this);
103+
Session session = cluster.connect();
104+
105+
// Sanity check
106+
EXPECT_EQ(0, added());
107+
EXPECT_EQ(0, removed());
108+
EXPECT_EQ(0, up());
109+
EXPECT_EQ(0, down());
110+
111+
logger_.add_critera("New node " + (ccm_->get_ip_prefix() + "2") + " added");
112+
EXPECT_EQ(2, ccm_->bootstrap_node());
113+
EXPECT_TRUE(wait_for_logger(1));
114+
EXPECT_EQ(1, added());
115+
EXPECT_STREQ((ccm_->get_ip_prefix() + "2").c_str(), address().c_str());
116+
117+
stop_node(1);
118+
EXPECT_EQ(1, down());
119+
EXPECT_STREQ((ccm_->get_ip_prefix() + "1").c_str(), address().c_str());
120+
121+
ccm_->start_node(1);
122+
EXPECT_EQ(1, up());
123+
EXPECT_STREQ((ccm_->get_ip_prefix() + "1").c_str(), address().c_str());
124+
125+
force_decommission_node(1);
126+
EXPECT_EQ(1, removed());
127+
EXPECT_STREQ((ccm_->get_ip_prefix() + "1").c_str(), address().c_str());
45128

46-
check_metrics_log_count();
129+
// Sanity check
130+
EXPECT_EQ(1, added());
131+
EXPECT_EQ(1, removed());
132+
EXPECT_EQ(1, up());
133+
EXPECT_EQ(1, down());
47134
}

cpp-driver/gtests/src/unit/mockssandra.cpp

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2014,20 +2014,11 @@ void Cluster::init(AddressGenerator& generator,
20142014
ClientConnectionFactory& factory,
20152015
size_t num_nodes_dc1,
20162016
size_t num_nodes_dc2) {
2017-
MT19937_64 token_rng;
20182017
for (size_t i = 0; i < num_nodes_dc1; ++i) {
2019-
Address address(generator.next());
2020-
Server server(Host(address, "dc1", "rack1", token_rng),
2021-
internal::ServerConnection::Ptr(
2022-
Memory::allocate<internal::ServerConnection>(address, factory)));
2023-
servers_.push_back(server);
2018+
create_and_add_server(generator, factory, "dc1");
20242019
}
20252020
for (size_t i = 0; i < num_nodes_dc2; ++i) {
2026-
Address address(generator.next());
2027-
Server server(Host(address, "dc2", "rack1", token_rng),
2028-
internal::ServerConnection::Ptr(
2029-
Memory::allocate<internal::ServerConnection>(address, factory)));
2030-
servers_.push_back(server);
2021+
create_and_add_server(generator, factory, "dc2");
20312022
}
20322023
}
20332024

@@ -2118,17 +2109,30 @@ int Cluster::add(cass::EventLoopGroup* event_loop_group, size_t node) {
21182109
return -1;
21192110
}
21202111
Server& server = servers_[node - 1];
2121-
server.is_removed.store(false);
2112+
bool is_removed = server.is_removed.exchange(false);
21222113
server.connection->listen(event_loop_group);
2123-
return server.connection->wait_listen();
2114+
int rc = server.connection->wait_listen();
2115+
2116+
// Send the added node event after starting the socket
2117+
if (is_removed) { // Only send topology change event if node was previously removed
2118+
event(TopologyChangeEvent::new_node(server.connection->address()));
2119+
}
2120+
2121+
return rc;
21242122
}
21252123

21262124
void Cluster::remove(size_t node) {
21272125
if (node < 1 || node > servers_.size()) {
21282126
return;
21292127
}
21302128
Server& server = servers_[node - 1];
2131-
server.is_removed.store(true);
2129+
bool is_removed = server.is_removed.exchange(true);
2130+
2131+
// Send the remove node event before closing the socket
2132+
if (!is_removed) { // Only send the topology change event if node was previously active
2133+
event(TopologyChangeEvent::removed_node(server.connection->address()));
2134+
}
2135+
21322136
server.connection->close();
21332137
server.connection->wait_close();
21342138
}
@@ -2140,6 +2144,7 @@ const Host& Cluster::host(const Address& address) const {
21402144
return it->host;
21412145
}
21422146
}
2147+
21432148
throw Exception(ERROR_PROTOCOL_ERROR,
21442149
"Unable to find host " + address.to_string());
21452150
}
@@ -2156,6 +2161,18 @@ Hosts Cluster::hosts() const {
21562161
return hosts;
21572162
}
21582163

2164+
int Cluster::create_and_add_server(AddressGenerator& generator,
2165+
ClientConnectionFactory& factory,
2166+
const String& dc) {
2167+
Address address(generator.next());
2168+
Server server(Host(address, dc, "rack1", token_rng_),
2169+
internal::ServerConnection::Ptr(
2170+
Memory::allocate<internal::ServerConnection>(address, factory)));
2171+
2172+
servers_.push_back(server);
2173+
return static_cast<int>(servers_.size());
2174+
}
2175+
21592176
void Cluster::event(const Event::Ptr& event) {
21602177
for (Servers::const_iterator it = servers_.begin(),
21612178
end = servers_.end(); it != end; ++it) {

cpp-driver/gtests/src/unit/mockssandra.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1275,8 +1275,13 @@ class Cluster {
12751275

12761276
typedef Vector<Server> Servers;
12771277

1278+
int create_and_add_server(AddressGenerator& generator,
1279+
ClientConnectionFactory& factory,
1280+
const String& dc);
1281+
12781282
private:
12791283
Servers servers_;
1284+
MT19937_64 token_rng_;
12801285
};
12811286

12821287
class SimpleEventLoopGroup : public cass::RoundRobinEventLoopGroup {

0 commit comments

Comments
 (0)