Skip to content

Commit a4292a0

Browse files
authored
Merge pull request #291 from riptano/CPP-815
CPP-815 - Correcting schema agreement with SNI/DBaaS clusters
2 parents 744310e + 551abd4 commit a4292a0

11 files changed

Lines changed: 298 additions & 114 deletions

cpp-driver/gtests/src/integration/tests/test_dbaas.cpp

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,140 @@ CASSANDRA_INTEGRATION_TEST_F(DbaasTests, QueryEachNode) {
302302
EXPECT_EQ(3u, server_names.size()); // Ensure all three nodes were queried
303303
}
304304

305+
/**
306+
* Create function and aggregate definitions and ensure the schema metadata is reflected when
307+
* execute against the DBaaS SNI single endpoint docker image.
308+
*
309+
* This test will perform a connection and execute create function/aggregate queries to ensure
310+
* schema metadata using a DBaaS SNI single endpoint is handled properly.
311+
*
312+
* @jira_ticket CPP-815
313+
* @test_category dbaas
314+
* @test_category queries:schema_metadata:udf
315+
* @since 2.14.0
316+
* @expected_result Function/Aggregate definitions schema metadata are validated.
317+
*/
318+
CASSANDRA_INTEGRATION_TEST_F(DbaasTests, SchemaMetadata) {
319+
CHECK_FAILURE;
320+
321+
Cluster cluster = default_cluster(false);
322+
cass_cluster_set_cloud_secure_connection_bundle_no_ssl_lib_init(cluster.get(),
323+
creds_v1().c_str());
324+
connect(cluster);
325+
326+
// clang-format off
327+
session_.execute("CREATE OR REPLACE FUNCTION avg_state(state tuple<int, bigint>, val int) "
328+
"CALLED ON NULL INPUT RETURNS tuple<int, bigint> "
329+
"LANGUAGE java AS "
330+
"'if (val != null) {"
331+
"state.setInt(0, state.getInt(0) + 1);"
332+
"state.setLong(1, state.getLong(1) + val.intValue());"
333+
"};"
334+
"return state;'"
335+
";");
336+
session_.execute("CREATE OR REPLACE FUNCTION avg_final (state tuple<int, bigint>) "
337+
"CALLED ON NULL INPUT RETURNS double "
338+
"LANGUAGE java AS "
339+
"'double r = 0;"
340+
"if (state.getInt(0) == 0) return null;"
341+
"r = state.getLong(1);"
342+
"r /= state.getInt(0);"
343+
"return Double.valueOf(r);'"
344+
";");
345+
session_.execute("CREATE OR REPLACE AGGREGATE average(int) "
346+
"SFUNC avg_state STYPE tuple<int, bigint> FINALFUNC avg_final "
347+
"INITCOND(0, 0);");
348+
// clang-format on
349+
350+
const CassSchemaMeta* schema_meta = cass_session_get_schema_meta(session_.get());
351+
ASSERT_TRUE(schema_meta != NULL);
352+
const CassKeyspaceMeta* keyspace_meta =
353+
cass_schema_meta_keyspace_by_name(schema_meta, default_keyspace().c_str());
354+
ASSERT_TRUE(keyspace_meta != NULL);
355+
356+
{ // Function `avg_state`
357+
const char* data = NULL;
358+
size_t length = 0;
359+
const CassDataType* datatype = NULL;
360+
361+
const CassFunctionMeta* function_meta =
362+
cass_keyspace_meta_function_by_name(keyspace_meta, "avg_state", "tuple<int,bigint>,int");
363+
ASSERT_TRUE(function_meta != NULL);
364+
cass_function_meta_name(function_meta, &data, &length);
365+
EXPECT_EQ("avg_state", std::string(data, length));
366+
cass_function_meta_full_name(function_meta, &data, &length);
367+
EXPECT_EQ("avg_state(tuple<int,bigint>,int)", std::string(data, length));
368+
cass_function_meta_body(function_meta, &data, &length);
369+
EXPECT_EQ("if (val != null) {state.setInt(0, state.getInt(0) + 1);state.setLong(1, "
370+
"state.getLong(1) + val.intValue());};return state;",
371+
std::string(data, length));
372+
cass_function_meta_language(function_meta, &data, &length);
373+
EXPECT_EQ("java", std::string(data, length));
374+
EXPECT_TRUE(cass_function_meta_called_on_null_input(function_meta));
375+
ASSERT_EQ(2u, cass_function_meta_argument_count(function_meta));
376+
cass_function_meta_argument(function_meta, 0, &data, &length, &datatype);
377+
EXPECT_EQ("state", std::string(data, length));
378+
EXPECT_EQ(CASS_VALUE_TYPE_TUPLE, cass_data_type_type(datatype));
379+
ASSERT_EQ(2u, cass_data_type_sub_type_count(datatype));
380+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(cass_data_type_sub_data_type(datatype, 0)));
381+
EXPECT_EQ(CASS_VALUE_TYPE_BIGINT,
382+
cass_data_type_type(cass_data_type_sub_data_type(datatype, 1)));
383+
cass_function_meta_argument(function_meta, 1, &data, &length, &datatype);
384+
EXPECT_EQ("val", std::string(data, length));
385+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(datatype));
386+
datatype = cass_function_meta_argument_type_by_name(function_meta, "state");
387+
EXPECT_EQ(CASS_VALUE_TYPE_TUPLE, cass_data_type_type(datatype));
388+
ASSERT_EQ(2u, cass_data_type_sub_type_count(datatype));
389+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(cass_data_type_sub_data_type(datatype, 0)));
390+
EXPECT_EQ(CASS_VALUE_TYPE_BIGINT,
391+
cass_data_type_type(cass_data_type_sub_data_type(datatype, 1)));
392+
datatype = cass_function_meta_argument_type_by_name(function_meta, "val");
393+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(datatype));
394+
datatype = cass_function_meta_return_type(function_meta);
395+
EXPECT_EQ(CASS_VALUE_TYPE_TUPLE, cass_data_type_type(datatype));
396+
ASSERT_EQ(2u, cass_data_type_sub_type_count(datatype));
397+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(cass_data_type_sub_data_type(datatype, 0)));
398+
EXPECT_EQ(CASS_VALUE_TYPE_BIGINT,
399+
cass_data_type_type(cass_data_type_sub_data_type(datatype, 1)));
400+
}
401+
402+
{ // Aggregate `average`
403+
const char* data = NULL;
404+
size_t length = 0;
405+
const CassDataType* datatype = NULL;
406+
407+
const CassAggregateMeta* aggregate_meta =
408+
cass_keyspace_meta_aggregate_by_name(keyspace_meta, "average", "int");
409+
ASSERT_TRUE(aggregate_meta != NULL);
410+
cass_aggregate_meta_name(aggregate_meta, &data, &length);
411+
EXPECT_EQ("average", std::string(data, length));
412+
cass_aggregate_meta_full_name(aggregate_meta, &data, &length);
413+
EXPECT_EQ("average(int)", std::string(data, length));
414+
size_t count = cass_aggregate_meta_argument_count(aggregate_meta);
415+
ASSERT_EQ(1u, cass_aggregate_meta_argument_count(aggregate_meta));
416+
datatype = cass_aggregate_meta_argument_type(aggregate_meta, 0);
417+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(datatype));
418+
datatype = cass_aggregate_meta_return_type(aggregate_meta);
419+
EXPECT_EQ(CASS_VALUE_TYPE_DOUBLE, cass_data_type_type(datatype));
420+
datatype = cass_aggregate_meta_state_type(aggregate_meta);
421+
EXPECT_EQ(CASS_VALUE_TYPE_TUPLE, cass_data_type_type(datatype));
422+
ASSERT_EQ(2u, cass_data_type_sub_type_count(datatype));
423+
EXPECT_EQ(CASS_VALUE_TYPE_INT, cass_data_type_type(cass_data_type_sub_data_type(datatype, 0)));
424+
EXPECT_EQ(CASS_VALUE_TYPE_BIGINT,
425+
cass_data_type_type(cass_data_type_sub_data_type(datatype, 1)));
426+
const CassFunctionMeta* function_meta = cass_aggregate_meta_state_func(aggregate_meta);
427+
cass_function_meta_name(function_meta, &data, &length);
428+
EXPECT_EQ("avg_state", std::string(data, length));
429+
function_meta = cass_aggregate_meta_final_func(aggregate_meta);
430+
cass_function_meta_name(function_meta, &data, &length);
431+
EXPECT_EQ("avg_final", std::string(data, length));
432+
const CassValue* initcond = cass_aggregate_meta_init_cond(aggregate_meta);
433+
EXPECT_EQ(CASS_VALUE_TYPE_VARCHAR, cass_value_type(initcond));
434+
EXPECT_EQ(Text("(0, 0)"), Text(initcond));
435+
ASSERT_TRUE(true);
436+
}
437+
}
438+
305439
/**
306440
* Ensure guardrails are enabled when performing a query against the DBaaS SNI single endpoint
307441
* docker image.

cpp-driver/src/address.cpp

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -176,43 +176,6 @@ String Address::to_string(bool with_port) const {
176176

177177
namespace datastax { namespace internal { namespace core {
178178

179-
bool determine_address_for_peer_host(const Address& connected_address, const Value* peer_value,
180-
const Value* rpc_value, Address* output) {
181-
Address peer_address;
182-
if (!peer_value ||
183-
!peer_value->decoder().as_inet(peer_value->size(), connected_address.port(), &peer_address)) {
184-
LOG_WARN("Invalid address format for peer address");
185-
return false;
186-
}
187-
if (rpc_value && !rpc_value->is_null()) {
188-
if (!rpc_value->decoder().as_inet(rpc_value->size(), connected_address.port(), output)) {
189-
LOG_WARN("Invalid address format for rpc address");
190-
return false;
191-
}
192-
if (connected_address == *output || connected_address == peer_address) {
193-
LOG_DEBUG("system.peers on %s contains a line with rpc_address for itself. "
194-
"This is not normal, but is a known problem for some versions of DSE. "
195-
"Ignoring this entry.",
196-
connected_address.to_string(false).c_str());
197-
return false;
198-
}
199-
if (Address("0.0.0.0", 0).equals(*output, false) || Address("::", 0).equals(*output, false)) {
200-
LOG_WARN("Found host with 'bind any' for rpc_address; using listen_address (%s) to contact "
201-
"instead. "
202-
"If this is incorrect you should configure a specific interface for rpc_address on "
203-
"the server.",
204-
peer_address.to_string(false).c_str());
205-
*output = peer_address;
206-
}
207-
} else {
208-
LOG_WARN("No rpc_address for host %s in system.peers on %s. "
209-
"Ignoring this entry.",
210-
peer_address.to_string(false).c_str(), connected_address.to_string(false).c_str());
211-
return false;
212-
}
213-
return true;
214-
}
215-
216179
String determine_listen_address(const Address& address, const Row* row) {
217180
const Value* v = row->get_by_name("peer");
218181
if (v != NULL) {

cpp-driver/src/address.hpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
namespace datastax { namespace internal { namespace core {
2929

3030
class Row;
31-
class Value;
3231

3332
class Address : public Allocated {
3433
public:
@@ -104,9 +103,6 @@ class Address : public Allocated {
104103
int port_;
105104
};
106105

107-
bool determine_address_for_peer_host(const Address& connected_address, const Value* peer_value,
108-
const Value* rpc_value, Address* output);
109-
110106
String determine_listen_address(const Address& address, const Row* row);
111107

112108
}}} // namespace datastax::internal::core

cpp-driver/src/address_factory.cpp

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
Copyright (c) DataStax, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#include "address_factory.hpp"
18+
19+
#include "row.hpp"
20+
21+
using namespace datastax::internal::core;
22+
23+
bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
24+
Address* output) {
25+
Address connected_address = connected_host->address();
26+
const Value* peer_value = peers_row->get_by_name("peer");
27+
const Value* rpc_value = peers_row->get_by_name("rpc_address");
28+
29+
Address peer_address;
30+
if (!peer_value ||
31+
!peer_value->decoder().as_inet(peer_value->size(), connected_address.port(), &peer_address)) {
32+
LOG_WARN("Invalid address format for peer address");
33+
return false;
34+
}
35+
if (rpc_value && !rpc_value->is_null()) {
36+
if (!rpc_value->decoder().as_inet(rpc_value->size(), connected_address.port(), output)) {
37+
LOG_WARN("Invalid address format for rpc address");
38+
return false;
39+
}
40+
if (connected_address == *output || connected_address == peer_address) {
41+
LOG_DEBUG("system.peers on %s contains a line with rpc_address for itself. "
42+
"This is not normal, but is a known problem for some versions of DSE. "
43+
"Ignoring this entry.",
44+
connected_address.to_string(false).c_str());
45+
return false;
46+
}
47+
if (Address("0.0.0.0", 0).equals(*output, false) || Address("::", 0).equals(*output, false)) {
48+
LOG_WARN("Found host with 'bind any' for rpc_address; using listen_address (%s) to contact "
49+
"instead. If this is incorrect you should configure a specific interface for "
50+
"rpc_address on the server.",
51+
peer_address.to_string(false).c_str());
52+
*output = peer_address;
53+
}
54+
} else {
55+
LOG_WARN("No rpc_address for host %s in system.peers on %s. Ignoring this entry.",
56+
peer_address.to_string(false).c_str(), connected_address.to_string(false).c_str());
57+
return false;
58+
}
59+
return true;
60+
}
61+
62+
bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
63+
Address* output) {
64+
CassUuid host_id;
65+
if (!peers_row->get_uuid_by_name("host_id", &host_id)) {
66+
// Attempt to get an peer address for the error log.
67+
Address peer_address;
68+
const Value* peer_value = peers_row->get_by_name("peer");
69+
if (!peer_value || !peer_value->decoder().as_inet(
70+
peer_value->size(), connected_host->address().port(), &peer_address)) {
71+
LOG_WARN("Invalid address format for peer address");
72+
}
73+
LOG_ERROR("Invalid `host_id` for host. %s will be ignored.",
74+
peer_address.is_valid() ? peer_address.to_string().c_str() : "<unknown>");
75+
return false;
76+
}
77+
*output = Address(connected_host->address().hostname_or_address(),
78+
connected_host->address().port(), to_string(host_id));
79+
return true;
80+
}

cpp-driver/src/address_factory.hpp

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
Copyright (c) DataStax, Inc.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
#ifndef DATASTAX_INTERNAL_ADDRESS_FACTORY_HPP
18+
#define DATASTAX_INTERNAL_ADDRESS_FACTORY_HPP
19+
20+
#include "config.hpp"
21+
#include "host.hpp"
22+
#include "ref_counted.hpp"
23+
24+
namespace datastax { namespace internal { namespace core {
25+
26+
class Row;
27+
28+
/**
29+
* An interface for constructing `Address` from `system.local`/`system.peers` row data.
30+
*/
31+
class AddressFactory : public RefCounted<AddressFactory> {
32+
public:
33+
typedef SharedRefPtr<AddressFactory> Ptr;
34+
virtual ~AddressFactory() {}
35+
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output) = 0;
36+
};
37+
38+
/**
39+
* An address factory that creates `Address` using the `rpc_address` column.
40+
*/
41+
class DefaultAddressFactory : public AddressFactory {
42+
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
43+
};
44+
45+
/**
46+
* An address factory that creates `Address` using the connected host's address and the `host_id`
47+
* (for the SNI servername) column.
48+
*/
49+
class SniAddressFactory : public AddressFactory {
50+
virtual bool create(const Row* peers_row, const Host::Ptr& connected_host, Address* output);
51+
};
52+
53+
inline AddressFactory* create_address_factory_from_config(const Config& config) {
54+
if (config.cloud_secure_connection_config().is_loaded()) {
55+
return new SniAddressFactory();
56+
} else {
57+
return new DefaultAddressFactory();
58+
}
59+
}
60+
61+
}}} // namespace datastax::internal::core
62+
63+
#endif

cpp-driver/src/control_connection.cpp

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -304,40 +304,6 @@ class NopControlConnectionListener : public ControlConnectionListener {
304304

305305
static NopControlConnectionListener nop_listener__;
306306

307-
bool DefaultAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
308-
Address* address) {
309-
return determine_address_for_peer_host(connected_host->address(), peers_row->get_by_name("peer"),
310-
peers_row->get_by_name("rpc_address"), address);
311-
}
312-
313-
bool SniAddressFactory::create(const Row* peers_row, const Host::Ptr& connected_host,
314-
Address* address) {
315-
CassUuid host_id;
316-
if (!peers_row->get_uuid_by_name("host_id", &host_id)) {
317-
// Attempt to get an peer address for the error log.
318-
Address peer_address;
319-
const Value* peer_value = peers_row->get_by_name("peer");
320-
if (!peer_value || !peer_value->decoder().as_inet(
321-
peer_value->size(), connected_host->address().port(), &peer_address)) {
322-
LOG_WARN("Invalid address format for peer address");
323-
}
324-
LOG_ERROR("Invalid `host_id` for host. %s will be ignored.",
325-
peer_address.is_valid() ? peer_address.to_string().c_str() : "<unknown>");
326-
return false;
327-
}
328-
*address = Address(connected_host->address().hostname_or_address(),
329-
connected_host->address().port(), to_string(host_id));
330-
return true;
331-
}
332-
333-
static AddressFactory* create_address_factory_from_config(const Config& config) {
334-
if (config.cloud_secure_connection_config().is_loaded()) {
335-
return new SniAddressFactory();
336-
} else {
337-
return new DefaultAddressFactory();
338-
}
339-
}
340-
341307
ControlConnectionSettings::ControlConnectionSettings()
342308
: use_schema(CASS_DEFAULT_USE_SCHEMA)
343309
, use_token_aware_routing(CASS_DEFAULT_USE_TOKEN_AWARE_ROUTING)
@@ -433,9 +399,7 @@ void ControlConnection::handle_refresh_node(RefreshNodeCallback* callback) {
433399
row = rows.row();
434400
if (callback->is_all_peers) {
435401
Address address;
436-
bool is_valid_address = determine_address_for_peer_host(
437-
connection_->host()->rpc_address(), row->get_by_name("peer"),
438-
row->get_by_name("rpc_address"), &address);
402+
bool is_valid_address = settings_.address_factory->create(row, connection_->host(), &address);
439403
if (is_valid_address && callback->address == address) {
440404
found_host = true;
441405
}

0 commit comments

Comments
 (0)