Skip to content

Commit 36d054e

Browse files
authored
CPP-811 Requests won't complete if they exceed the number of streams (#293)
* Fix null pointer dereferences in `RequestHandler` * Fix connection write error handling for user requests
1 parent 115a9e5 commit 36d054e

14 files changed

Lines changed: 107 additions & 75 deletions

cpp-driver/gtests/src/unit/tests/test_pool.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ class PoolUnitTest : public LoopTest {
324324
if (connection) {
325325
RequestStatus status(manager->loop(), 1);
326326
RequestCallback::Ptr callback(new RequestCallback(&status));
327-
EXPECT_TRUE(connection->write(callback.get()))
327+
EXPECT_TRUE(connection->write(callback.get()) > 0)
328328
<< "Unable to write request to connection " << address.to_string();
329329
connection->flush(); // Flush requests to avoid unnecessary timeouts
330330
uv_run(loop(), UV_RUN_DEFAULT);
@@ -344,7 +344,7 @@ class PoolUnitTest : public LoopTest {
344344
PooledConnection::Ptr connection = manager->find_least_busy(generator.next());
345345
if (connection) {
346346
RequestCallback::Ptr callback(new RequestCallback(status));
347-
if (!connection->write(callback.get())) {
347+
if (connection->write(callback.get()) < 0) {
348348
status->error_failed_write();
349349
}
350350
} else {

cpp-driver/src/connection.cpp

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -130,34 +130,12 @@ int32_t Connection::write(const RequestCallback::Ptr& callback) {
130130

131131
int32_t request_size = socket_->write(callback.get());
132132

133-
if (request_size < 0) {
133+
if (request_size <= 0) {
134134
stream_manager_.release(stream);
135-
136-
switch (request_size) {
137-
case SocketRequest::SOCKET_REQUEST_ERROR_CLOSED:
138-
callback->on_error(CASS_ERROR_LIB_WRITE_ERROR, "Unable to write to close socket");
139-
break;
140-
141-
case SocketRequest::SOCKET_REQUEST_ERROR_NO_HANDLER:
142-
callback->on_error(CASS_ERROR_LIB_WRITE_ERROR,
143-
"Socket is not properly configured with a handler");
144-
break;
145-
146-
case Request::REQUEST_ERROR_BATCH_WITH_NAMED_VALUES:
147-
case Request::REQUEST_ERROR_PARAMETER_UNSET:
148-
// Already handled with a specific error.
149-
break;
150-
151-
case Request::REQUEST_ERROR_UNSUPPORTED_PROTOCOL:
152-
callback->on_error(CASS_ERROR_LIB_MESSAGE_ENCODE,
153-
"Operation unsupported by this protocol version");
154-
break;
155-
156-
default:
157-
callback->on_error(CASS_ERROR_LIB_WRITE_ERROR, "Unspecified write error occurred");
158-
break;
135+
if (request_size == 0) {
136+
callback->on_error(CASS_ERROR_LIB_MESSAGE_ENCODE, "The encoded request had no data to write");
137+
return Request::REQUEST_ERROR_NO_DATA_WRITTEN;
159138
}
160-
161139
return request_size;
162140
}
163141

@@ -340,7 +318,8 @@ void Connection::restart_heartbeat_timer() {
340318

341319
void Connection::on_heartbeat(Timer* timer) {
342320
if (!heartbeat_outstanding_ && !socket_->is_closing()) {
343-
if (!write_and_flush(RequestCallback::Ptr(new HeartbeatCallback(this)))) {
321+
RequestCallback::Ptr callback(new HeartbeatCallback(this));
322+
if (write_and_flush(callback) < 0) {
344323
// Recycling only this connection with a timeout error. This is unlikely and
345324
// it means the connection ran out of stream IDs as a result of requests
346325
// that never returned and as a result timed out.

cpp-driver/src/connection_pool.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ using namespace datastax;
2727
using namespace datastax::internal::core;
2828

2929
static inline bool least_busy_comp(const PooledConnection::Ptr& a, const PooledConnection::Ptr& b) {
30+
// Don't consider closed connections to be the least busy.
31+
if (a->is_closing()) { // "a" is closed so it can't be the least busy.
32+
return false;
33+
} else if (b->is_closing()) { // "a" is not close, but "b" is closed so "a" is less busy.
34+
return true;
35+
}
36+
// Both "a" and "b" are not closed so compare their inflight request counts.
3037
return a->inflight_request_count() < b->inflight_request_count();
3138
}
3239

@@ -89,10 +96,12 @@ ConnectionPool::ConnectionPool(const Connection::Vec& connections, ConnectionPoo
8996
}
9097

9198
PooledConnection::Ptr ConnectionPool::find_least_busy() const {
92-
if (connections_.empty()) {
99+
PooledConnection::Vec::const_iterator it =
100+
std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
101+
if (it == connections_.end() || (*it)->is_closing()) {
93102
return PooledConnection::Ptr();
94103
}
95-
return *std::min_element(connections_.begin(), connections_.end(), least_busy_comp);
104+
return *it;
96105
}
97106

98107
bool ConnectionPool::has_connections() const { return !connections_.empty(); }

cpp-driver/src/connection_pool.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class ConnectionPool : public RefCounted<ConnectionPool> {
131131

132132
/**
133133
* Find the least busy connection for the pool. The least busy connection has
134-
* the lowest number of outstanding requests.
134+
* the lowest number of outstanding requests and is not closed.
135135
*
136136
* @return The least busy connection or null if no connection is available.
137137
*/

cpp-driver/src/control_connection.cpp

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,8 @@ void ControlConnection::refresh_node(RefreshNodeType type, const Address& addres
378378

379379
LOG_DEBUG("Refresh node: %s", query.c_str());
380380

381-
if (write_and_flush(RequestCallback::Ptr(
382-
new RefreshNodeCallback(address, type, is_all_peers, query, this))) < 0) {
381+
RequestCallback::Ptr callback(new RefreshNodeCallback(address, type, is_all_peers, query, this));
382+
if (write_and_flush(callback) < 0) {
383383
LOG_ERROR("No more stream available while attempting to refresh node info");
384384
defunct();
385385
}
@@ -448,8 +448,9 @@ void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
448448

449449
LOG_DEBUG("Refreshing keyspace %s", query.c_str());
450450

451-
if (write_and_flush(RequestCallback::Ptr(
452-
new RefreshKeyspaceCallback(keyspace_name.to_string(), query, this))) < 0) {
451+
RequestCallback::Ptr callback(
452+
new RefreshKeyspaceCallback(keyspace_name.to_string(), query, this));
453+
if (write_and_flush(callback) < 0) {
453454
LOG_ERROR("No more stream available while attempting to refresh keyspace info");
454455
defunct();
455456
}
@@ -594,8 +595,9 @@ void ControlConnection::refresh_type(const StringRef& keyspace_name, const Strin
594595

595596
LOG_DEBUG("Refreshing type %s", query.c_str());
596597

597-
if (!write_and_flush(RequestCallback::Ptr(new RefreshTypeCallback(
598-
keyspace_name.to_string(), type_name.to_string(), query, this)))) {
598+
RequestCallback::Ptr callback(
599+
new RefreshTypeCallback(keyspace_name.to_string(), type_name.to_string(), query, this));
600+
if (write_and_flush(callback) < 0) {
599601
LOG_ERROR("No more stream available while attempting to refresh type info");
600602
defunct();
601603
}
@@ -654,9 +656,10 @@ void ControlConnection::refresh_function(const StringRef& keyspace_name,
654656
request->set(1, CassString(function_name.data(), function_name.size()));
655657
request->set(2, signature.get());
656658

657-
if (!write_and_flush(RequestCallback::Ptr(
658-
new RefreshFunctionCallback(keyspace_name.to_string(), function_name.to_string(),
659-
to_strings(arg_types), is_aggregate, request, this)))) {
659+
RequestCallback::Ptr callback(
660+
new RefreshFunctionCallback(keyspace_name.to_string(), function_name.to_string(),
661+
to_strings(arg_types), is_aggregate, request, this));
662+
if (write_and_flush(callback) < 0) {
660663
LOG_ERROR("No more stream available while attempting to refresh function info");
661664
defunct();
662665
}

cpp-driver/src/pooled_connection.cpp

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void ChainedSetKeyspaceCallback::on_internal_timeout() { chained_callback_->on_r
8888
void ChainedSetKeyspaceCallback::on_result_response(ResponseMessage* response) {
8989
ResultResponse* result = static_cast<ResultResponse*>(response->response_body().get());
9090
if (result->kind() == CASS_RESULT_KIND_SET_KEYSPACE) {
91-
if (!connection_->write_and_flush(chained_callback_)) {
91+
if (connection_->write_and_flush(chained_callback_) < 0) {
9292
// Try on the same host but a different connection
9393
chained_callback_->on_retry_current_host();
9494
}
@@ -107,8 +107,8 @@ PooledConnection::PooledConnection(ConnectionPool* pool, const Connection::Ptr&
107107
connection_->set_listener(this);
108108
}
109109

110-
bool PooledConnection::write(RequestCallback* callback) {
111-
bool result = false;
110+
int32_t PooledConnection::write(RequestCallback* callback) {
111+
int32_t result;
112112
const String& keyspace(pool_->keyspace());
113113
if (keyspace != connection_->keyspace()) {
114114
LOG_DEBUG("Setting keyspace %s on connection(%p) pool(%p)", keyspace.c_str(),
@@ -119,9 +119,10 @@ bool PooledConnection::write(RequestCallback* callback) {
119119
result = connection_->write(RequestCallback::Ptr(callback));
120120
}
121121

122-
if (result) {
122+
if (result > 0) {
123123
pool_->requires_flush(this, ConnectionPool::Protected());
124124
}
125+
125126
return result;
126127
}
127128

@@ -142,6 +143,8 @@ int PooledConnection::inflight_request_count() const {
142143
return connection_->inflight_request_count();
143144
}
144145

146+
bool PooledConnection::is_closing() const { return connection_->is_closing(); }
147+
145148
void PooledConnection::on_read() {
146149
if (event_loop_) {
147150
event_loop_->maybe_start_io_time();

cpp-driver/src/pooled_connection.hpp

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,9 @@ class PooledConnection
5151
* the connection pool manager flushes the request.
5252
*
5353
* @param callback A request callback that handles the request.
54-
* @return Returns true if the request was written, otherwise, an error
55-
* occurred.
54+
* @return The number of bytes written, or negative if an error occurred.
5655
*/
57-
bool write(RequestCallback* callback);
56+
int32_t write(RequestCallback* callback);
5857

5958
/**
6059
* Flush pending writes.
@@ -73,6 +72,13 @@ class PooledConnection
7372
*/
7473
int inflight_request_count() const;
7574

75+
/**
76+
* Determine if the connection is closing.
77+
*
78+
* @return Returns true if closing.
79+
*/
80+
bool is_closing() const;
81+
7682
public:
7783
const String& keyspace() const { return connection_->keyspace(); } // Test only
7884

cpp-driver/src/prepare_host_handler.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ void PrepareHostHandler::prepare_next() {
111111
// Set the keyspace in case per request keyspaces are supported
112112
prepare_request->set_keyspace(current_keyspace_);
113113

114-
if (!connection_->write(
115-
PrepareCallback::Ptr(new PrepareCallback(prepare_request, Ptr(this))))) {
114+
PrepareCallback::Ptr callback(new PrepareCallback(prepare_request, Ptr(this)));
115+
if (connection_->write(callback) < 0) {
116116
LOG_WARN("Failed to write prepare request while preparing all queries on host %s",
117117
host_->address_string().c_str());
118118
close();
@@ -134,8 +134,8 @@ bool PrepareHostHandler::check_and_set_keyspace() {
134134
const String& keyspace((*current_entry_it_)->keyspace());
135135

136136
if (keyspace != current_keyspace_) {
137-
if (!connection_->write_and_flush(
138-
PrepareCallback::Ptr(new SetKeyspaceCallback(keyspace, Ptr(this))))) {
137+
PrepareCallback::Ptr callback(new SetKeyspaceCallback(keyspace, Ptr(this)));
138+
if (connection_->write_and_flush(callback) < 0) {
139139
LOG_WARN("Failed to write \"USE\" keyspace request while preparing all queries on host %s",
140140
host_->address_string().c_str());
141141
close();

cpp-driver/src/request.hpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ class Request : public RefCounted<Request> {
8181
REQUEST_ERROR_UNSUPPORTED_PROTOCOL = SocketRequest::SOCKET_REQUEST_ERROR_LAST_ENTRY,
8282
REQUEST_ERROR_BATCH_WITH_NAMED_VALUES,
8383
REQUEST_ERROR_PARAMETER_UNSET,
84-
REQUEST_ERROR_NO_AVAILABLE_STREAM_IDS
84+
REQUEST_ERROR_NO_AVAILABLE_STREAM_IDS,
85+
REQUEST_ERROR_NO_DATA_WRITTEN
8586
};
8687

8788
Request(uint8_t opcode)

cpp-driver/src/request_callback.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ bool RequestCallback::skip_metadata() const {
5858
int32_t RequestCallback::encode(BufferVec* bufs) {
5959
const ProtocolVersion version = protocol_version_;
6060
if (version < CASS_LOWEST_SUPPORTED_PROTOCOL_VERSION) {
61+
on_error(CASS_ERROR_LIB_MESSAGE_ENCODE, "Operation unsupported by this protocol version");
6162
return Request::REQUEST_ERROR_UNSUPPORTED_PROTOCOL;
6263
}
6364

@@ -245,8 +246,7 @@ ResultResponse::Ptr ChainedRequestCallback::result(const String& key) const {
245246

246247
void ChainedRequestCallback::on_internal_write(Connection* connection) {
247248
if (chain_) {
248-
int32_t result = connection->write_and_flush(chain_);
249-
if (result == Request::REQUEST_ERROR_NO_AVAILABLE_STREAM_IDS) {
249+
if (connection->write_and_flush(chain_) < 0) {
250250
on_error(CASS_ERROR_LIB_NO_STREAMS,
251251
"No streams available when attempting to write chained request");
252252
}

0 commit comments

Comments
 (0)