Skip to content

Commit cf9dddb

Browse files
author
Michael Fero
committed
CPP-802 - Handle prepared id mismatch when repreparing on the fly
1 parent 36d054e commit cf9dddb

5 files changed

Lines changed: 185 additions & 12 deletions

File tree

cpp-driver/gtests/src/integration/integration.cpp

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ Integration::Integration()
5656
, is_ccm_start_requested_(true)
5757
, is_ccm_start_node_individually_(false)
5858
, is_session_requested_(true)
59+
, is_keyspace_change_requested_(true)
5960
, is_test_chaotic_(false)
6061
, is_beta_protocol_(Options::is_beta_protocol())
6162
, protocol_version_(CASS_HIGHEST_SUPPORTED_PROTOCOL_VERSION)
@@ -300,6 +301,16 @@ void Integration::drop_type(const std::string& type_name) {
300301
session_.execute(drop_type_query.str(), CASS_CONSISTENCY_ANY, false, false);
301302
}
302303

304+
bool Integration::use_keyspace(const std::string& keyspace_name) {
305+
std::stringstream use_keyspace_query;
306+
use_keyspace_query << "USE " << keyspace_name;
307+
session_.execute(use_keyspace_query.str());
308+
if (this->HasFailure()) {
309+
return false;
310+
}
311+
return true;
312+
}
313+
303314
void Integration::connect(Cluster cluster) {
304315
// Establish the session connection
305316
cluster_ = cluster;
@@ -322,10 +333,9 @@ void Integration::connect(Cluster cluster) {
322333
CHECK_FAILURE;
323334

324335
// Update the session to use the new keyspace by default
325-
std::stringstream use_keyspace_query;
326-
use_keyspace_query << "USE " << keyspace_name_;
327-
session_.execute(use_keyspace_query.str());
328-
CHECK_FAILURE;
336+
if (is_keyspace_change_requested_) {
337+
use_keyspace(keyspace_name_);
338+
}
329339
}
330340

331341
void Integration::connect() {

cpp-driver/gtests/src/integration/integration.hpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@
109109

110110
#define CASSANDRA_KEY_VALUE_TABLE_FORMAT \
111111
"CREATE TABLE IF NOT EXISTS %s (key %s PRIMARY KEY, value %s)"
112+
#define CASSANDRA_KEY_VALUE_QUALIFIED_TABLE_FORMAT \
113+
"CREATE TABLE IF NOT EXISTS %s.%s (key %s PRIMARY KEY, value %s)"
112114
#define CASSANDRA_KEY_VALUE_INSERT_FORMAT "INSERT INTO %s (key, value) VALUES(%s, %s)"
115+
#define CASSANDRA_KEY_VALUE_QUALIFIED_INSERT_FORMAT "INSERT INTO %s.%s (key, value) VALUES(%s, %s)"
113116
#define CASSANDRA_SELECT_VALUE_FORMAT "SELECT value FROM %s WHERE key=%s"
114117
#define CASSANDRA_DELETE_ROW_FORMAT "DELETE FROM %s WHERE key=%s"
115118
#define CASSANDRA_UPDATE_VALUE_FORMAT "UPDATE %s SET value=%s WHERE key=%s"
@@ -279,6 +282,11 @@ class Integration : public testing::Test {
279282
* (DEFAULT: true)
280283
*/
281284
bool is_session_requested_;
285+
/**
286+
* Flag to indicate if the newly created keyspace should be set for the session connection.
287+
* (DEFAULT: true)
288+
*/
289+
bool is_keyspace_change_requested_;
282290
/**
283291
* Flag to indicate if a test is chaotic and should have its CCM cluster
284292
* destroyed
@@ -376,6 +384,14 @@ class Integration : public testing::Test {
376384
*/
377385
virtual void drop_type(const std::string& type_name);
378386

387+
/**
388+
* Update the current keyspace used by the session
389+
*
390+
* @param keyspace_name Keyspace to use
391+
* @return True if keyspace was changed; false otherwise
392+
*/
393+
virtual bool use_keyspace(const std::string& keyspace_name);
394+
379395
/**
380396
* Establish the session connection using provided cluster object.
381397
*
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
Copyright (c) DataStax, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#include "integration.hpp"
18+
19+
/**
20+
* Prepared integration tests; common operations
21+
*/
22+
class PreparedTests : public Integration {
23+
void SetUp() {
24+
is_keyspace_change_requested_ = false;
25+
Integration::SetUp();
26+
}
27+
};
28+
29+
/**
30+
* Execute a statement that forces a re-prepare resulting in a new prepared ID that fails fast and
31+
* returns an error.
32+
*
33+
* This test will create a new table, prepare a statement using a fully qualified query, update the
34+
* default keyspace, then drop and re-create the table to force the server to invalidate the
35+
* prepared ID. After the table is dropped the prepared statement will be used to execute an insert
36+
* query that will result in an error being returned when re-using the original prepared statement.
37+
*
38+
* @see: https://issues.apache.org/jira/browse/CASSANDRA-15252 (Server version restriction may need
39+
* to be added if/when Apache Cassandra issue is addressed.
40+
*
41+
* @test_category error
42+
* @test_category queries:prepared
43+
* @since core:2.14.0
44+
* @expected_result Re-prepare will fail fast and return error.
45+
*/
46+
CASSANDRA_INTEGRATION_TEST_F(PreparedTests, FailFastWhenPreparedIDChangesDuringReprepare) {
47+
CHECK_FAILURE;
48+
49+
// Create the table and initial prepared statement
50+
session_.execute(format_string(CASSANDRA_KEY_VALUE_QUALIFIED_TABLE_FORMAT, keyspace_name_.c_str(),
51+
table_name_.c_str(), "int", "int"));
52+
Prepared insert_prepared =
53+
session_.prepare(format_string(CASSANDRA_KEY_VALUE_QUALIFIED_INSERT_FORMAT,
54+
keyspace_name_.c_str(), table_name_.c_str(), "?", "?"));
55+
56+
// Update the current keyspace for the session
57+
ASSERT_TRUE(use_keyspace(keyspace_name_));
58+
59+
// Drop and re-create the table to invalidate the prepared statement on the server
60+
drop_table(table_name_);
61+
session_.execute(format_string(CASSANDRA_KEY_VALUE_QUALIFIED_TABLE_FORMAT, keyspace_name_.c_str(),
62+
table_name_.c_str(), "int", "int"));
63+
64+
// Execute the insert statement and validate the error code
65+
logger_.add_critera("ID mismatch while trying to prepare query");
66+
Statement insert_statement = insert_prepared.bind();
67+
insert_statement.bind<Integer>(0, Integer(0));
68+
insert_statement.bind<Integer>(1, Integer(1));
69+
Result result = session_.execute(insert_statement, false);
70+
EXPECT_TRUE(contains(result.error_message(), "ID mismatch while trying to prepare query"));
71+
}
72+
73+
/**
74+
* Execute a statement that forces a re-prepare resulting in a same prepared ID.
75+
*
76+
* This test will connect to a cluster and use a keyspace, prepare a statement using a unqualified
77+
* query, then drop and re-create the table to force the server to invalidate the
78+
* prepared ID. After the table is dropped the prepared statement will be used to execute an insert
79+
* query that will result the statement being re-prepared and the insert statement succeeding.
80+
*
81+
* @test_category queries:prepared
82+
* @since core:1.0.0
83+
* @expected_result Re-prepare will correctly execute the insert statement.
84+
*/
85+
CASSANDRA_INTEGRATION_TEST_F(PreparedTests, PreparedIDUnchangedDuringReprepare) {
86+
CHECK_FAILURE;
87+
88+
// Allow for unqualified queries
89+
use_keyspace(keyspace_name_);
90+
91+
// Create the table and initial prepared statement
92+
session_.execute(
93+
format_string(CASSANDRA_KEY_VALUE_TABLE_FORMAT, table_name_.c_str(), "int", "int"));
94+
Prepared insert_prepared = session_.prepare(
95+
format_string(CASSANDRA_KEY_VALUE_INSERT_FORMAT, table_name_.c_str(), "?", "?"));
96+
97+
// Drop and re-create the table to invalidate the prepared statement on the server
98+
drop_table(table_name_);
99+
session_.execute(
100+
format_string(CASSANDRA_KEY_VALUE_TABLE_FORMAT, table_name_.c_str(), "int", "int"));
101+
102+
// Execute the insert statement and validate success
103+
logger_.add_critera("Prepared query with ID");
104+
Statement insert_statement = insert_prepared.bind();
105+
insert_statement.bind<Integer>(0, Integer(0));
106+
insert_statement.bind<Integer>(1, Integer(1));
107+
Result result = session_.execute(insert_statement, false);
108+
EXPECT_EQ(CASS_OK, result.error_code());
109+
EXPECT_EQ(1u, logger_.count());
110+
}

cpp-driver/src/request_handler.cpp

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@ using namespace datastax;
3636
using namespace datastax::internal;
3737
using namespace datastax::internal::core;
3838

39+
static String to_hex(const String& byte_id) {
40+
static const char half_byte_to_hex[] = { '0', '1', '2', '3', '4', '5', '6', '7',
41+
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
42+
OStringStream ss;
43+
44+
const char* data = byte_id.data();
45+
for (size_t i = 0; i < byte_id.length(); ++i) {
46+
uint8_t byte = static_cast<uint8_t>(data[i]);
47+
ss << half_byte_to_hex[(byte >> 4) & 0x0F];
48+
ss << half_byte_to_hex[byte & 0x0F];
49+
}
50+
return ss.str();
51+
}
52+
3953
class SingleHostQueryPlan : public QueryPlan {
4054
public:
4155
SingleHostQueryPlan(const Address& address)
@@ -53,7 +67,7 @@ class SingleHostQueryPlan : public QueryPlan {
5367

5468
class PrepareCallback : public SimpleRequestCallback {
5569
public:
56-
PrepareCallback(const String& query, RequestExecution* request_execution);
70+
PrepareCallback(const String& query, const String& id, RequestExecution* request_execution);
5771

5872
private:
5973
class PrepareRequest : public core::PrepareRequest {
@@ -72,21 +86,29 @@ class PrepareCallback : public SimpleRequestCallback {
7286

7387
private:
7488
RequestExecution::Ptr request_execution_;
89+
String id_;
7590
};
7691

77-
PrepareCallback::PrepareCallback(const String& query, RequestExecution* request_execution)
92+
PrepareCallback::PrepareCallback(const String& query, const String& id,
93+
RequestExecution* request_execution)
7894
: SimpleRequestCallback(
7995
Request::ConstPtr(new PrepareRequest(query, request_execution->request()->keyspace(),
8096
request_execution->request_timeout_ms())))
81-
, request_execution_(request_execution) {}
97+
, request_execution_(request_execution)
98+
, id_(id) {}
8299

83100
void PrepareCallback::on_internal_set(ResponseMessage* response) {
84101
switch (response->opcode()) {
85102
case CQL_OPCODE_RESULT: {
86103
ResultResponse* result = static_cast<ResultResponse*>(response->response_body().get());
87104
if (result->kind() == CASS_RESULT_KIND_PREPARED) {
88-
request_execution_->notify_result_metadata_changed(request(), result);
89-
request_execution_->on_retry_current_host();
105+
String result_id = result->prepared_id().to_string();
106+
if (id_ != result_id) {
107+
request_execution_->notify_prepared_id_mismatch(id_, result_id);
108+
} else {
109+
request_execution_->notify_result_metadata_changed(request(), result);
110+
request_execution_->on_retry_current_host();
111+
}
90112
} else {
91113
request_execution_->on_retry_next_host();
92114
}
@@ -461,6 +483,17 @@ void RequestExecution::notify_result_metadata_changed(const Request* request,
461483
}
462484
}
463485

486+
void RequestExecution::notify_prepared_id_mismatch(const String& expected_id,
487+
const String& received_id) {
488+
OStringStream ss;
489+
ss << "ID mismatch while trying to prepare query (expected ID " << to_hex(expected_id)
490+
<< ", received ID " << to_hex(received_id)
491+
<< "). This prepared statement won't work anymore. This usually happens when you run a "
492+
"'USE...' query after the statement was prepared.";
493+
String message = ss.str();
494+
request_handler_->set_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE, message);
495+
}
496+
464497
void RequestExecution::on_result_response(Connection* connection, ResponseMessage* response) {
465498
ResultResponse* result = static_cast<ResultResponse*>(response->response_body().get());
466499

@@ -602,14 +635,17 @@ void RequestExecution::on_error_response(Connection* connection, ResponseMessage
602635
}
603636

604637
void RequestExecution::on_error_unprepared(Connection* connection, ErrorResponse* error) {
605-
String query;
638+
LOG_DEBUG("Unprepared error response returned for request: %s",
639+
error->message().to_string().c_str());
606640

641+
String query;
642+
String id = error->prepared_id().to_string();
607643
if (request()->opcode() == CQL_OPCODE_EXECUTE) {
608644
const ExecuteRequest* execute = static_cast<const ExecuteRequest*>(request());
609645
query = execute->prepared()->query();
610646
} else if (request()->opcode() == CQL_OPCODE_BATCH) {
611647
const BatchRequest* batch = static_cast<const BatchRequest*>(request());
612-
if (!batch->find_prepared_query(error->prepared_id().to_string(), &query)) {
648+
if (!batch->find_prepared_query(id, &query)) {
613649
set_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE,
614650
"Unable to find prepared statement in batch statement");
615651
return;
@@ -621,7 +657,7 @@ void RequestExecution::on_error_unprepared(Connection* connection, ErrorResponse
621657
return;
622658
}
623659

624-
RequestCallback::Ptr callback(new PrepareCallback(query, this));
660+
RequestCallback::Ptr callback(new PrepareCallback(query, id, this));
625661
if (connection->write_and_flush(callback) < 0) {
626662
// Try to prepare on the same host but on a different connection
627663
retry_current_host();

cpp-driver/src/request_handler.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ class RequestExecution : public RequestCallback {
275275
void next_host() { current_host_ = request_handler_->next_host(RequestHandler::Protected()); }
276276

277277
void notify_result_metadata_changed(const Request* request, ResultResponse* result_response);
278+
void notify_prepared_id_mismatch(const String& expected_id, const String& received_id);
278279

279280
virtual void on_retry_current_host();
280281
virtual void on_retry_next_host();

0 commit comments

Comments
 (0)