Skip to content

Commit 56b7caa

Browse files
authored
CPP-811 Add unit tests for request handler changes (#302)
1 parent 8dd752c commit 56b7caa

5 files changed

Lines changed: 259 additions & 16 deletions

File tree

cpp-driver/gtests/src/unit/tests/test_pool.cpp

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ class PoolUnitTest : public LoopTest {
7070
: public RequestState
7171
, public Status<RequestState::Enum> {
7272
public:
73-
RequestStatus(uv_loop_t* loop, int num_nodes = NUM_NODES)
73+
RequestStatus(uv_loop_t* loop, int num_requests = NUM_NODES)
7474
: loop_(loop)
75-
, remaining_(num_nodes) {}
75+
, remaining_(num_requests) {}
7676

7777
virtual void set(RequestState::Enum state) {
7878
Status<RequestStatus::Enum>::set(state);
@@ -88,13 +88,13 @@ class PoolUnitTest : public LoopTest {
8888

8989
protected:
9090
uv_loop_t* loop_;
91-
size_t remaining_;
91+
int remaining_;
9292
};
9393

9494
class RequestStatusWithManager : public RequestStatus {
9595
public:
96-
RequestStatusWithManager(uv_loop_t* loop, int num_nodes = NUM_NODES)
97-
: RequestStatus(loop, num_nodes) {}
96+
RequestStatusWithManager(uv_loop_t* loop, int num_requests = NUM_NODES)
97+
: RequestStatus(loop, num_requests) {}
9898

9999
~RequestStatusWithManager() {
100100
ConnectionPoolManager::Ptr temp(manager());
@@ -354,6 +354,33 @@ class PoolUnitTest : public LoopTest {
354354
}
355355
}
356356

357+
static void on_pool_connected_exhaust_streams(ConnectionPoolManagerInitializer* initializer,
358+
RequestStatusWithManager* status) {
359+
const Address address("127.0.0.1", 9042);
360+
ConnectionPoolManager::Ptr manager = initializer->release_manager();
361+
status->set_manager(manager);
362+
363+
for (size_t i = 0; i < CASS_MAX_STREAMS; ++i) {
364+
PooledConnection::Ptr connection = manager->find_least_busy(address);
365+
366+
if (connection) {
367+
RequestCallback::Ptr callback(new RequestCallback(status));
368+
if (connection->write(callback.get()) < 0) {
369+
status->error_failed_write();
370+
}
371+
} else {
372+
status->error_no_connection();
373+
}
374+
}
375+
376+
PooledConnection::Ptr connection = manager->find_least_busy(address);
377+
ASSERT_TRUE(connection);
378+
RequestCallback::Ptr callback(new RequestCallback(status));
379+
EXPECT_EQ(connection->write(callback.get()), Request::REQUEST_ERROR_NO_AVAILABLE_STREAM_IDS);
380+
381+
manager->flush();
382+
}
383+
357384
static void on_pool_nop(ConnectionPoolManagerInitializer* initializer,
358385
RequestStatusWithManager* status) {
359386
ConnectionPoolManager::Ptr manager = initializer->release_manager();
@@ -798,6 +825,17 @@ TEST_F(PoolUnitTest, PartialReconnect) {
798825
// TODO:
799826
}
800827

801-
TEST_F(PoolUnitTest, LowNumberOfStreams) {
802-
// TODO:
828+
TEST_F(PoolUnitTest, NoAvailableStreams) {
829+
mockssandra::SimpleCluster cluster(simple(), 1);
830+
ASSERT_EQ(cluster.start_all(), 0);
831+
832+
RequestStatusWithManager status(loop(), CASS_MAX_STREAMS);
833+
834+
ConnectionPoolManagerInitializer::Ptr initializer(new ConnectionPoolManagerInitializer(
835+
PROTOCOL_VERSION, bind_callback(on_pool_connected_exhaust_streams, &status)));
836+
837+
initializer->initialize(loop(), hosts());
838+
uv_run(loop(), UV_RUN_DEFAULT);
839+
840+
EXPECT_EQ(status.count(RequestStatus::SUCCESS), CASS_MAX_STREAMS) << status.results();
803841
}

cpp-driver/gtests/src/unit/tests/test_request_processor.cpp

Lines changed: 149 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,91 @@
2727
using namespace datastax::internal;
2828
using namespace datastax::internal::core;
2929

30+
class InorderLoadBalancingPolicy : public LoadBalancingPolicy {
31+
public:
32+
typedef SharedRefPtr<LoadBalancingPolicy> Ptr;
33+
typedef Vector<Ptr> Vec;
34+
35+
InorderLoadBalancingPolicy()
36+
: LoadBalancingPolicy()
37+
, hosts_(new HostVec()) {}
38+
39+
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
40+
const String& local_dc) {
41+
hosts_->reserve(hosts.size());
42+
std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost());
43+
}
44+
45+
virtual CassHostDistance distance(const Host::Ptr& host) const {
46+
return CASS_HOST_DISTANCE_LOCAL;
47+
}
48+
49+
virtual bool is_host_up(const Address& address) const {
50+
return std::find_if(hosts_->begin(), hosts_->end(), FindAddress(address)) != hosts_->end();
51+
}
52+
53+
virtual void on_host_added(const Host::Ptr& host) { add_host(hosts_, host); }
54+
55+
virtual void on_host_removed(const Host::Ptr& host) { remove_host(hosts_, host); }
56+
57+
virtual void on_host_up(const Host::Ptr& host) { add_host(hosts_, host); }
58+
59+
virtual void on_host_down(const Address& address) { remove_host(hosts_, address); }
60+
61+
virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler,
62+
const TokenMap* token_map) {
63+
return new InternalQueryPlan(hosts_);
64+
}
65+
66+
virtual LoadBalancingPolicy* new_instance() { return new InorderLoadBalancingPolicy(); }
67+
68+
private:
69+
struct FindAddress {
70+
71+
FindAddress(const Address& address)
72+
: address(address) {}
73+
74+
bool operator()(const Host::Ptr& host) const { return host->address() == address; }
75+
76+
Address address;
77+
};
78+
79+
class InternalQueryPlan : public datastax::internal::core::QueryPlan {
80+
public:
81+
InternalQueryPlan(const CopyOnWriteHostVec& hosts)
82+
: index_(0)
83+
, hosts_(hosts) {}
84+
85+
virtual Host::Ptr compute_next() {
86+
if (index_ < hosts_->size()) {
87+
return (*hosts_)[index_++];
88+
}
89+
return Host::Ptr();
90+
}
91+
92+
private:
93+
size_t index_;
94+
CopyOnWriteHostVec hosts_;
95+
};
96+
97+
private:
98+
CopyOnWriteHostVec hosts_;
99+
};
100+
30101
class RequestProcessorUnitTest : public EventLoopTest {
31102
public:
32103
RequestProcessorUnitTest()
33104
: EventLoopTest("RequestProcessorUnitTest") {}
34105

35-
HostMap generate_hosts() {
106+
HostMap generate_hosts(size_t num_hosts = 3) {
36107
HostMap hosts;
37-
Host::Ptr host1(new Host(Address("127.0.0.1", PORT)));
38-
Host::Ptr host2(new Host(Address("127.0.0.2", PORT)));
39-
Host::Ptr host3(new Host(Address("127.0.0.3", PORT)));
40-
hosts[host1->address()] = host1;
41-
hosts[host2->address()] = host2;
42-
hosts[host3->address()] = host3;
108+
num_hosts = std::min(num_hosts, static_cast<size_t>(255));
109+
for (size_t i = 1; i <= num_hosts; ++i) {
110+
char buf[64];
111+
sprintf(buf, "127.0.0.%d", static_cast<int>(i));
112+
Host::Ptr host(new Host(Address(buf, PORT)));
113+
hosts[host->address()] = host;
114+
}
43115
return hosts;
44116
}
45117

@@ -619,3 +691,73 @@ TEST_F(RequestProcessorUnitTest, RequestTimeout) {
619691
processor->close();
620692
ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));
621693
}
694+
695+
TEST_F(RequestProcessorUnitTest, LowNumberOfStreams) {
696+
mockssandra::SimpleRequestHandlerBuilder builder;
697+
builder.on(mockssandra::OPCODE_QUERY)
698+
.wait(1000) // Give time for the streams to run out
699+
.system_local()
700+
.system_peers()
701+
.empty_rows_result(1);
702+
mockssandra::SimpleCluster cluster(builder.build(), 2); // Two node cluster
703+
ASSERT_EQ(cluster.start_all(), 0);
704+
705+
Future::Ptr close_future(new Future());
706+
CloseListener::Ptr listener(new CloseListener(close_future));
707+
708+
HostMap hosts(generate_hosts(2));
709+
Future::Ptr connect_future(new Future());
710+
711+
ExecutionProfile profile;
712+
profile.set_load_balancing_policy(new InorderLoadBalancingPolicy());
713+
profile.set_speculative_execution_policy(new NoSpeculativeExecutionPolicy());
714+
profile.set_retry_policy(new DefaultRetryPolicy());
715+
716+
RequestProcessorSettings settings;
717+
settings.default_profile = profile;
718+
settings.request_queue_size = 2 * CASS_MAX_STREAMS + 1; // Create a request queue with enough room
719+
720+
RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer(
721+
hosts.begin()->second, PROTOCOL_VERSION, hosts, TokenMap::Ptr(), "",
722+
bind_callback(on_connected, connect_future.get())));
723+
initializer->with_settings(settings)->with_listener(listener.get())->initialize(event_loop());
724+
725+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
726+
EXPECT_FALSE(connect_future->error());
727+
RequestProcessor::Ptr processor(connect_future->processor());
728+
729+
// Saturate the hosts connections, but leave one stream.
730+
for (int i = 0; i < 2 * CASS_MAX_STREAMS - 1; ++i) {
731+
ResponseFuture::Ptr response_future(new ResponseFuture());
732+
Statement::Ptr request(new QueryRequest("SELECT * FROM table"));
733+
RequestHandler::Ptr request_handler(new RequestHandler(request, response_future));
734+
processor->process_request(request_handler);
735+
}
736+
737+
{ // Try two more requests. One should succeed on "127.0.0.2" and the other should fail (out of
738+
// streams).
739+
ResponseFuture::Ptr response_future(new ResponseFuture());
740+
741+
Statement::Ptr request(new QueryRequest("SELECT * FROM table"));
742+
request->set_record_attempted_addresses(true);
743+
RequestHandler::Ptr request_handler(new RequestHandler(request, response_future));
744+
processor->process_request(request_handler);
745+
746+
ResponseFuture::Ptr response_future_fail(new ResponseFuture());
747+
RequestHandler::Ptr request_handler_fail(new RequestHandler(
748+
Statement::Ptr(new QueryRequest("SELECT * FROM table")), response_future_fail));
749+
processor->process_request(request_handler_fail);
750+
ASSERT_TRUE(response_future_fail->wait_for(WAIT_FOR_TIME));
751+
ASSERT_TRUE(response_future_fail->error());
752+
EXPECT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, response_future_fail->error()->code);
753+
754+
ASSERT_TRUE(response_future->wait_for(WAIT_FOR_TIME));
755+
EXPECT_FALSE(response_future->error());
756+
AddressVec attempted = response_future->attempted_addresses();
757+
ASSERT_GE(attempted.size(), 1u);
758+
EXPECT_EQ(attempted[0], Address("127.0.0.2", PORT));
759+
}
760+
761+
processor->close();
762+
ASSERT_TRUE(close_future->wait_for(WAIT_FOR_TIME));
763+
}

cpp-driver/gtests/src/unit/tests/test_session_base.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,25 @@ TEST_F(SessionBaseUnitTest, InvalidProtocol) {
218218
EXPECT_EQ(0, session_base.closed());
219219
}
220220

221+
TEST_F(SessionBaseUnitTest, UnsupportedProtocol) {
222+
mockssandra::SimpleCluster cluster(simple());
223+
ASSERT_EQ(cluster.start_all(), 0);
224+
225+
Config config;
226+
config.set_protocol_version(ProtocolVersion(2)); // Unsupported protocol version
227+
config.contact_points().push_back(Address("127.0.0.1", 9042));
228+
TestSessionBase session_base;
229+
230+
Future::Ptr connect_future(session_base.connect(config, KEYSPACE));
231+
ASSERT_TRUE(connect_future->wait_for(WAIT_FOR_TIME));
232+
EXPECT_EQ(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, connect_future->error()->code);
233+
EXPECT_TRUE(connect_future->error()->message.find(
234+
"Operation unsupported by this protocol version") != String::npos);
235+
EXPECT_EQ(0, session_base.connected());
236+
EXPECT_EQ(1, session_base.failed());
237+
EXPECT_EQ(0, session_base.closed());
238+
}
239+
221240
TEST_F(SessionBaseUnitTest, SslError) {
222241
mockssandra::SimpleCluster cluster(simple());
223242
use_ssl(&cluster);

cpp-driver/gtests/src/unit/tests/test_statement.cpp

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "unit.hpp"
1818

19+
#include "batch_request.hpp"
1920
#include "constants.hpp"
2021
#include "control_connection.hpp"
2122
#include "query_request.hpp"
@@ -123,3 +124,45 @@ TEST_F(StatementUnitTest, SetHostWhereHostIsDown) {
123124
ASSERT_TRUE(future->error());
124125
EXPECT_EQ(future->error()->code, CASS_ERROR_LIB_NO_HOSTS_AVAILABLE);
125126
}
127+
128+
TEST_F(StatementUnitTest, ErrorBatchWithNamedParameters) {
129+
mockssandra::SimpleCluster cluster(simple(), 1);
130+
ASSERT_EQ(cluster.start_all(), 0);
131+
132+
connect();
133+
134+
BatchRequest::Ptr batch(new BatchRequest(CASS_BATCH_TYPE_UNLOGGED));
135+
136+
Statement::Ptr request(new QueryRequest("SELECT * FROM does_not_matter WHERE key = ?",
137+
1)); // Space for a named parameter
138+
139+
request->set("key", 42); // Use named parameters
140+
141+
batch->add_statement(request.get());
142+
143+
ResponseFuture::Ptr future(session.execute(Request::ConstPtr(batch)));
144+
future->wait();
145+
146+
ASSERT_TRUE(future->error());
147+
EXPECT_EQ(future->error()->code, CASS_ERROR_LIB_BAD_PARAMS);
148+
EXPECT_EQ(future->error()->message, "Batches cannot contain queries with named values");
149+
}
150+
151+
TEST_F(StatementUnitTest, ErrorParametersUnset) {
152+
mockssandra::SimpleCluster cluster(simple(), 1);
153+
ASSERT_EQ(cluster.start_all(), 0);
154+
155+
Config config;
156+
config.set_protocol_version(ProtocolVersion(3));
157+
158+
connect(config);
159+
160+
Statement::Ptr request(new QueryRequest("SELECT * FROM does_not_matter WHERE key = ?",
161+
1)); // Parameters start as unset
162+
163+
ResponseFuture::Ptr future(session.execute(Request::ConstPtr(request)));
164+
future->wait();
165+
166+
ASSERT_TRUE(future->error());
167+
EXPECT_EQ(future->error()->code, CASS_ERROR_LIB_PARAMETER_UNSET);
168+
}

cpp-driver/src/batch_request.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ class ExecuteRequest;
3333

3434
class BatchRequest : public RoutableRequest {
3535
public:
36+
typedef SharedRefPtr<BatchRequest> Ptr;
3637
typedef Vector<Statement::Ptr> StatementVec;
3738

38-
BatchRequest(uint8_t type_)
39+
BatchRequest(uint8_t type)
3940
: RoutableRequest(CQL_OPCODE_BATCH)
40-
, type_(type_) {}
41+
, type_(type) {}
4142

4243
uint8_t type() const { return type_; }
4344

0 commit comments

Comments
 (0)