Skip to content

Commit 74f2b84

Browse files
authored
CPP-701 - Fix UP/DOWN status (#204)
* Remove status from `Host` type and move it to LBPs * Remove refresh on UP from `ControlConnection` * Remove `Host` type from UP/DOWN events * LBP's `on_remove()` now marks a host as DOWN * Remove unnecessary `HostMap` from `RequestProcessor` * Fix `Cluster`, `RequestProcessor` and LBP unit tests * Fixed naming consistency
1 parent 487f2a1 commit 74f2b84

48 files changed

Lines changed: 736 additions & 550 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cpp-driver/gtests/src/integration/tests/test_session.cpp

Lines changed: 23 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,16 @@
1919

2020
class SessionTest : public Integration {
2121
public:
22-
SessionTest()
23-
: added_(0)
24-
, removed_(0)
25-
, up_(0)
26-
, down_(0) { }
22+
typedef std::pair<CassHostListenerEvent, std::string> Event;
23+
typedef std::vector<Event> Events;
2724

2825
void SetUp() {
2926
is_session_requested_ = false;
3027
Integration::SetUp();
3128
}
3229

33-
const std::string& address() const { return address_; }
34-
int added() const { return added_; }
35-
int removed() const { return removed_; }
36-
int up() const { return up_; }
37-
int down() const { return down_; }
30+
size_t event_count() { return events_.size(); }
31+
const Events& events() const { return events_; }
3832

3933
protected:
4034
static void on_host_listener(CassHostListenerEvent event,
@@ -46,29 +40,11 @@ class SessionTest : public Integration {
4640
uv_inet_ntop(inet.address_length == CASS_INET_V4_LENGTH ? AF_INET : AF_INET6,
4741
inet.address,
4842
address, CASS_INET_STRING_LENGTH);
49-
instance->address_ = address;
50-
51-
if (event == CASS_HOST_LISTENER_EVENT_ADD) {
52-
TEST_LOG("Added node " << address);
53-
++instance->added_;
54-
} else if (event == CASS_HOST_LISTENER_EVENT_REMOVE) {
55-
TEST_LOG("Removed node " << address);
56-
++instance->removed_;
57-
} else if (event == CASS_HOST_LISTENER_EVENT_UP) {
58-
TEST_LOG("Up node " << address);
59-
++instance->up_;
60-
} else if (event == CASS_HOST_LISTENER_EVENT_DOWN) {
61-
TEST_LOG("Down node " << address);
62-
++instance->down_;
63-
}
43+
instance->events_.push_back(Event(event, address));
6444
}
6545

6646
private:
67-
std::string address_;
68-
int added_;
69-
int removed_;
70-
int up_;
71-
int down_;
47+
Events events_;
7248
};
7349

7450
CASSANDRA_INTEGRATION_TEST_F(SessionTest, MetricsWithoutConnecting) {
@@ -102,33 +78,26 @@ CASSANDRA_INTEGRATION_TEST_F(SessionTest, ExternalHostListener) {
10278
.with_host_listener_callback(on_host_listener, this);
10379
Session session = cluster.connect();
10480

105-
// Sanity check
106-
EXPECT_EQ(0, added());
107-
EXPECT_EQ(0, removed());
108-
EXPECT_EQ(0, up());
109-
EXPECT_EQ(0, down());
81+
ASSERT_EQ(0u, event_count());
11082

111-
logger_.add_critera("New node " + (ccm_->get_ip_prefix() + "2") + " added");
11283
EXPECT_EQ(2, ccm_->bootstrap_node());
113-
EXPECT_TRUE(wait_for_logger(1));
114-
EXPECT_EQ(1, added());
115-
EXPECT_STREQ((ccm_->get_ip_prefix() + "2").c_str(), address().c_str());
116-
11784
stop_node(1);
118-
EXPECT_EQ(1, down());
119-
EXPECT_STREQ((ccm_->get_ip_prefix() + "1").c_str(), address().c_str());
120-
12185
ccm_->start_node(1);
122-
EXPECT_EQ(1, up());
123-
EXPECT_STREQ((ccm_->get_ip_prefix() + "1").c_str(), address().c_str());
124-
12586
force_decommission_node(1);
126-
EXPECT_EQ(1, removed());
127-
EXPECT_STREQ((ccm_->get_ip_prefix() + "1").c_str(), address().c_str());
128-
129-
// Sanity check
130-
EXPECT_EQ(1, added());
131-
EXPECT_EQ(1, removed());
132-
EXPECT_EQ(1, up());
133-
EXPECT_EQ(1, down());
87+
88+
session.close();
89+
90+
ASSERT_EQ(6u, event_count());
91+
EXPECT_EQ(CASS_HOST_LISTENER_EVENT_ADD, events()[0].first);
92+
EXPECT_EQ(ccm_->get_ip_prefix() + "2", events()[0].second);
93+
EXPECT_EQ(CASS_HOST_LISTENER_EVENT_UP, events()[1].first);
94+
EXPECT_EQ(ccm_->get_ip_prefix() + "2", events()[1].second);
95+
EXPECT_EQ(CASS_HOST_LISTENER_EVENT_DOWN, events()[2].first);
96+
EXPECT_EQ(ccm_->get_ip_prefix() + "1", events()[2].second);
97+
EXPECT_EQ(CASS_HOST_LISTENER_EVENT_UP, events()[3].first);
98+
EXPECT_EQ(ccm_->get_ip_prefix() + "1", events()[3].second);
99+
EXPECT_EQ(CASS_HOST_LISTENER_EVENT_DOWN, events()[4].first);
100+
EXPECT_EQ(ccm_->get_ip_prefix() + "1", events()[4].second);
101+
EXPECT_EQ(CASS_HOST_LISTENER_EVENT_REMOVE, events()[5].first);
102+
EXPECT_EQ(ccm_->get_ip_prefix() + "1", events()[5].second);
134103
}

cpp-driver/gtests/src/unit/test_token_map_utils.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,6 @@ inline cass::Host::Ptr create_host(const cass::Address& address,
335335
builder.append_local_peers_row_v3(tokens, partitioner, dc, rack, release_version);
336336

337337
host->set(&builder.finish()->first_row(), true);
338-
host->set_up();
339338

340339
return host;
341340
}

cpp-driver/gtests/src/unit/tests/test_cluster.cpp

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ class ClusterUnitTest : public EventLoopTest {
6565

6666
virtual ~Listener() { }
6767

68-
virtual void on_up(const Host::Ptr& host) { }
69-
virtual void on_down(const Host::Ptr& host) { }
68+
virtual void on_host_up(const Host::Ptr& host) { }
69+
virtual void on_host_down(const Host::Ptr& host) { }
7070

71-
virtual void on_add(const Host::Ptr& host) { }
72-
virtual void on_remove(const Host::Ptr& host) { }
71+
virtual void on_host_added(const Host::Ptr& host) { }
72+
virtual void on_host_removed(const Host::Ptr& host) { }
7373

74-
virtual void on_update_token_map(const TokenMap::Ptr& token_map) { }
74+
virtual void on_token_map_updated(const TokenMap::Ptr& token_map) { }
7575

7676
virtual void on_close(Cluster* cluster) {
7777
if (close_future_) {
@@ -106,15 +106,15 @@ class ClusterUnitTest : public EventLoopTest {
106106
return address_;
107107
}
108108

109-
virtual void on_up(const Host::Ptr& host) {
109+
virtual void on_host_up(const Host::Ptr& host) {
110110
if (up_future_) {
111111
ScopedMutex l(&mutex_);
112112
address_ = host->address();
113113
up_future_->set();
114114
}
115115
}
116116

117-
virtual void on_down(const Host::Ptr& host) {
117+
virtual void on_host_down(const Host::Ptr& host) {
118118
if (down_future_) {
119119
ScopedMutex l(&mutex_);
120120
address_ = host->address();
@@ -148,7 +148,8 @@ class ClusterUnitTest : public EventLoopTest {
148148

149149
ReconnectClusterListener(const Future::Ptr& close_future, OutagePlan* outage_plan)
150150
: Listener(close_future)
151-
, outage_plan_(outage_plan) { }
151+
, outage_plan_(outage_plan)
152+
, cluster_(NULL) { }
152153

153154
const HostVec& connected_hosts() const {
154155
return connected_hosts_;
@@ -159,6 +160,7 @@ class ClusterUnitTest : public EventLoopTest {
159160
}
160161

161162
virtual void on_reconnect(Cluster* cluster) {
163+
cluster_ = cluster;
162164
connected_hosts_.push_back(cluster->connected_host());
163165
if (connected_hosts_.size() == 1) { // First host
164166
outage_plan_->run();
@@ -170,23 +172,29 @@ class ClusterUnitTest : public EventLoopTest {
170172
}
171173
}
172174

173-
virtual void on_up(const Host::Ptr& host) { }
174-
virtual void on_down(const Host::Ptr& host) { }
175+
virtual void on_host_up(const Host::Ptr& host) { }
176+
virtual void on_host_down(const Host::Ptr& host) { }
175177

176-
virtual void on_add(const Host::Ptr& host) {
178+
virtual void on_host_added(const Host::Ptr& host) {
177179
events_.push_back(Event(Event::NODE_ADD, host->address()));
180+
// In the absence of RequestProcessor objects the cluster must notify
181+
// itself that a host is UP.
182+
if (cluster_) {
183+
cluster_->notify_host_up(host->address());
184+
}
178185
}
179186

180-
virtual void on_remove(const Host::Ptr& host) {
187+
virtual void on_host_removed(const Host::Ptr& host) {
181188
events_.push_back(Event(Event::NODE_REMOVE, host->address()));
182189
}
183190

184-
virtual void on_update_token_map(const TokenMap::Ptr& token_map) { }
191+
virtual void on_token_map_updated(const TokenMap::Ptr& token_map) { }
185192

186193
private:
187194
HostVec connected_hosts_;
188195
Events events_;
189196
OutagePlan* outage_plan_;
197+
Cluster* cluster_;
190198
};
191199

192200
class RecoverClusterListener : public UpDownListener {
@@ -232,11 +240,11 @@ class ClusterUnitTest : public EventLoopTest {
232240
mockssandra::TopologyChangeEvent::new_node(cass::Address("127.0.0.2", 9042)));
233241
}
234242

235-
virtual void on_up(const Host::Ptr& host) { event_future_->set(); }
236-
virtual void on_down(const Host::Ptr& host) { event_future_->set(); }
237-
virtual void on_add(const Host::Ptr& host) { event_future_->set(); }
238-
virtual void on_remove(const Host::Ptr& host) { event_future_->set(); }
239-
virtual void on_update_token_map(const TokenMap::Ptr& token_map) { event_future_->set(); }
243+
virtual void on_host_up(const Host::Ptr& host) { event_future_->set(); }
244+
virtual void on_host_down(const Host::Ptr& host) { event_future_->set(); }
245+
virtual void on_host_added(const Host::Ptr& host) { event_future_->set(); }
246+
virtual void on_host_removed(const Host::Ptr& host) { event_future_->set(); }
247+
virtual void on_token_map_updated(const TokenMap::Ptr& token_map) { event_future_->set(); }
240248

241249
public:
242250
Future::Ptr event_future_;
@@ -619,11 +627,11 @@ TEST_F(ClusterUnitTest, NotifyDownUp) {
619627

620628
// We need to mark the host as DOWN first otherwise an UP event won't be
621629
// triggered.
622-
cluster->notify_down(address);
630+
cluster->notify_host_down(address);
623631
ASSERT_TRUE(down_future->wait_for(WAIT_FOR_TIME));
624632
EXPECT_EQ(address, listener->address());
625633

626-
cluster->notify_up(address);
634+
cluster->notify_host_up(address);
627635
ASSERT_TRUE(up_future->wait_for(WAIT_FOR_TIME));
628636
EXPECT_EQ(address, listener->address());
629637

@@ -809,11 +817,11 @@ TEST_F(ClusterUnitTest, DCAwareRecoverOnRemoteHost) {
809817
EXPECT_FALSE(connect_future->error());
810818

811819
// Notify every host as down
812-
connect_future->cluster()->notify_down(local_address);
813-
connect_future->cluster()->notify_down(remote_address);
820+
connect_future->cluster()->notify_host_down(local_address);
821+
connect_future->cluster()->notify_host_down(remote_address);
814822

815823
// Notify the remote host as up
816-
connect_future->cluster()->notify_up(remote_address);
824+
connect_future->cluster()->notify_host_up(remote_address);
817825

818826
// Verify that the remote host was marked as up
819827
ASSERT_TRUE(up_future->wait_for(WAIT_FOR_TIME));

cpp-driver/gtests/src/unit/tests/test_control_connection.cpp

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,9 @@ class RecordingControlConnectionListener
123123
events_.push_back(event);
124124
}
125125

126-
virtual void on_up(const Address& address, const Host::Ptr& refreshed) {
126+
virtual void on_up(const Address& address) {
127127
RecordedEvent event(RecordedEvent::NODE_UP);
128-
if (refreshed) {
129-
event.host = refreshed;
130-
} else {
131-
event.host.reset(Memory::allocate<Host>(address));
132-
}
128+
event.host.reset(Memory::allocate<Host>(address));
133129
events_.push_back(event);
134130
}
135131

@@ -214,8 +210,8 @@ class ControlConnectionUnitTest : public LoopTest {
214210
if (--remaining_ <= 0) connection_->close();
215211
}
216212

217-
virtual void on_up(const Address& address, const Host::Ptr& refreshed) {
218-
RecordingControlConnectionListener::on_up(address, refreshed);
213+
virtual void on_up(const Address& address) {
214+
RecordingControlConnectionListener::on_up(address);
219215
if (--remaining_ <= 0) connection_->close();
220216
}
221217

0 commit comments

Comments
 (0)