Skip to content

Commit e074db5

Browse files
authored
CPP-751 - Call host listener for the initial set of hosts
1 parent f595b02 commit e074db5

2 files changed

Lines changed: 170 additions & 2 deletions

File tree

cpp-driver/gtests/src/unit/tests/test_session.cpp

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ class SessionUnitTest : public EventLoopTest {
177177

178178
HostEventFuture::Event wait_for_event(uint64_t timeout_us) {
179179
HostEventFuture::Event event(front()->wait_for_event(timeout_us));
180-
pop_front();
180+
if (event.first != HostEventFuture::INVALID) pop_front();
181181
return event;
182182
}
183183

@@ -466,7 +466,20 @@ TEST_F(SessionUnitTest, HostListener) {
466466
cass::Session session;
467467
connect(config, &session);
468468

469-
EXPECT_EQ(0u, listener->event_count());
469+
{ // Initial nodes available from peers table
470+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
471+
Address("127.0.0.1", 9042)),
472+
listener->wait_for_event(WAIT_FOR_TIME));
473+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
474+
Address("127.0.0.1", 9042)),
475+
listener->wait_for_event(WAIT_FOR_TIME));
476+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
477+
Address("127.0.0.2", 9042)),
478+
listener->wait_for_event(WAIT_FOR_TIME));
479+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
480+
Address("127.0.0.2", 9042)),
481+
listener->wait_for_event(WAIT_FOR_TIME));
482+
}
470483

471484
{
472485
cluster.remove(1);
@@ -506,3 +519,150 @@ TEST_F(SessionUnitTest, HostListener) {
506519

507520
ASSERT_EQ(0u, listener->event_count());
508521
}
522+
523+
TEST_F(SessionUnitTest, HostListenerDCAwareLocal) {
524+
mockssandra::SimpleCluster cluster(simple(), 2, 1);
525+
ASSERT_EQ(cluster.start_all(), 0);
526+
527+
TestHostListener::Ptr listener(new TestHostListener());
528+
529+
cass::Config config;
530+
config.set_reconnect_wait_time(100); // Reconnect immediately
531+
config.contact_points().push_back("127.0.0.1");
532+
config.set_host_listener(listener);
533+
534+
cass::Session session;
535+
connect(config, &session);
536+
537+
{ // Initial nodes available from peers table
538+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
539+
Address("127.0.0.1", 9042)),
540+
listener->wait_for_event(WAIT_FOR_TIME));
541+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
542+
Address("127.0.0.1", 9042)),
543+
listener->wait_for_event(WAIT_FOR_TIME));
544+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
545+
Address("127.0.0.2", 9042)),
546+
listener->wait_for_event(WAIT_FOR_TIME));
547+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
548+
Address("127.0.0.2", 9042)),
549+
listener->wait_for_event(WAIT_FOR_TIME));
550+
}
551+
552+
{ // Node 3 is DC2 should be ignored
553+
cluster.stop(3);
554+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::INVALID,
555+
Address()),
556+
listener->wait_for_event(WAIT_FOR_TIME));
557+
}
558+
559+
close(&session);
560+
561+
ASSERT_EQ(0u, listener->event_count());
562+
}
563+
564+
// TODO: Remove HostListenerDCAwareRemote after remote DC settings are removed from API
565+
TEST_F(SessionUnitTest, HostListenerDCAwareRemote) {
566+
mockssandra::SimpleCluster cluster(simple(), 2, 1);
567+
ASSERT_EQ(cluster.start_all(), 0);
568+
569+
TestHostListener::Ptr listener(new TestHostListener());
570+
571+
cass::Config config;
572+
config.set_reconnect_wait_time(100); // Reconnect immediately
573+
config.contact_points().push_back("127.0.0.1");
574+
config.set_load_balancing_policy(new cass::DCAwarePolicy(
575+
"dc1",
576+
1,
577+
false));
578+
config.set_host_listener(listener);
579+
580+
cass::Session session;
581+
connect(config, &session);
582+
583+
{ // Initial nodes available from peers table
584+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
585+
Address("127.0.0.1", 9042)),
586+
listener->wait_for_event(WAIT_FOR_TIME));
587+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
588+
Address("127.0.0.1", 9042)),
589+
listener->wait_for_event(WAIT_FOR_TIME));
590+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
591+
Address("127.0.0.2", 9042)),
592+
listener->wait_for_event(WAIT_FOR_TIME));
593+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
594+
Address("127.0.0.2", 9042)),
595+
listener->wait_for_event(WAIT_FOR_TIME));
596+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
597+
Address("127.0.0.3", 9042)),
598+
listener->wait_for_event(WAIT_FOR_TIME));
599+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
600+
Address("127.0.0.3", 9042)),
601+
listener->wait_for_event(WAIT_FOR_TIME));
602+
}
603+
604+
{
605+
cluster.stop(3);
606+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::STOP_NODE,
607+
Address("127.0.0.3", 9042)),
608+
listener->wait_for_event(WAIT_FOR_TIME));
609+
}
610+
611+
close(&session);
612+
613+
ASSERT_EQ(0u, listener->event_count());
614+
}
615+
616+
TEST_F(SessionUnitTest, HostListenerNodeDown) {
617+
mockssandra::SimpleCluster cluster(simple(), 3);
618+
ASSERT_EQ(cluster.start(1), 0);
619+
ASSERT_EQ(cluster.start(3), 0);
620+
621+
TestHostListener::Ptr listener(new TestHostListener());
622+
623+
cass::Config config;
624+
config.set_reconnect_wait_time(100); // Reconnect immediately
625+
config.contact_points().push_back("127.0.0.1");
626+
config.set_host_listener(listener);
627+
628+
cass::Session session;
629+
connect(config, &session);
630+
631+
{ // Initial nodes available from peers table
632+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
633+
Address("127.0.0.1", 9042)),
634+
listener->wait_for_event(WAIT_FOR_TIME));
635+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
636+
Address("127.0.0.1", 9042)),
637+
listener->wait_for_event(WAIT_FOR_TIME));
638+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
639+
Address("127.0.0.2", 9042)),
640+
listener->wait_for_event(WAIT_FOR_TIME));
641+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
642+
Address("127.0.0.2", 9042)),
643+
listener->wait_for_event(WAIT_FOR_TIME));
644+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::ADD_NODE,
645+
Address("127.0.0.3", 9042)),
646+
listener->wait_for_event(WAIT_FOR_TIME));
647+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
648+
Address("127.0.0.3", 9042)),
649+
listener->wait_for_event(WAIT_FOR_TIME));
650+
}
651+
652+
{ // Node 2 connection should not be established (node down event)
653+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::STOP_NODE,
654+
Address("127.0.0.2", 9042)),
655+
listener->wait_for_event(WAIT_FOR_TIME));
656+
}
657+
658+
{
659+
cluster.start(2);
660+
EXPECT_EQ(HostEventFuture::Event(HostEventFuture::START_NODE,
661+
Address("127.0.0.2", 9042)),
662+
listener->wait_for_event(WAIT_FOR_TIME));
663+
}
664+
665+
close(&session);
666+
667+
ASSERT_EQ(0u, listener->event_count());
668+
}

cpp-driver/src/session.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ class SessionInitializer : public RefCounted<SessionInitializer> {
213213
const HostMap& hosts,
214214
const TokenMap::Ptr& token_map) {
215215
inc_ref();
216+
216217
const size_t thread_count_io = remaining_ = session_->config().thread_count_io();
217218
for (size_t i = 0; i < thread_count_io; ++i) {
218219
RequestProcessorInitializer::Ptr initializer(
@@ -409,6 +410,13 @@ void Session::on_connect(const Host::Ptr& connected_host,
409410
return;
410411
}
411412

413+
for (HostMap::const_iterator it = hosts.begin(),
414+
end = hosts.end(); it != end; ++it) {
415+
const Host::Ptr& host = it->second;
416+
config().host_listener()->on_host_added(host);
417+
config().host_listener()->on_host_up(host); // If host is down it will be marked down later in the connection process
418+
}
419+
412420
request_processors_.clear();
413421
request_processor_count_ = 0;
414422
is_closing_ = false;

0 commit comments

Comments
 (0)