@@ -34,7 +34,9 @@ using cass::ScopedMutex;
3434using cass::OStringStream;
3535
3636#define SSL_BUF_SIZE 8192
37- #define SERVER_VERSION " 3.11.2"
37+ #define CASSANDRA_VERSION " 3.11.4"
38+ #define DSE_VERSION " 6.7.1"
39+ #define DSE_CASSANDRA_VERSION " 4.0.0.671"
3840
3941namespace mockssandra {
4042
@@ -1265,10 +1267,18 @@ Action::Builder& Action::Builder::system_local() {
12651267 return execute (new SystemLocal ());
12661268}
12671269
1270+ Action::Builder& Action::Builder::system_local_dse () {
1271+ return execute (new SystemLocalDse ());
1272+ }
1273+
12681274Action::Builder& Action::Builder::system_peers () {
12691275 return execute (new SystemPeers ());
12701276}
12711277
1278+ Action::Builder& Action::Builder::system_peers_dse () {
1279+ return execute (new SystemPeersDse ());
1280+ }
1281+
12721282Action::Builder& Action::Builder::system_traces () {
12731283 return execute (new SystemTraces ());
12741284}
@@ -1345,14 +1355,9 @@ Request::Request(int8_t version, int8_t flags, int16_t stream, int8_t opcode,
13451355 , body_(body)
13461356 , client_(client)
13471357 , timer_action_(NULL ) {
1348- client->add (this );
13491358 (void )flags_; // TODO: Implement custom payload etc.
13501359}
13511360
1352- Request::~Request () {
1353- client_->remove (this );
1354- }
1355-
13561361void Request::write (int8_t opcode, const String& body) {
13571362 write (stream_, opcode, body);
13581363}
@@ -1369,6 +1374,7 @@ void Request::error(int32_t code, const String& message) {
13691374}
13701375
13711376void Request::wait (uint64_t timeout, const Action* action) {
1377+ inc_ref ();
13721378 timer_action_ = action;
13731379 timer_.start (client_->server ()->loop (), timeout,
13741380 cass::bind_callback (&Request::on_timeout, this ));
@@ -1420,6 +1426,7 @@ Hosts Request::hosts() const {
14201426
14211427void Request::on_timeout (Timer* timer) {
14221428 timer_action_->run_next (this );
1429+ dec_ref ();
14231430}
14241431
14251432void SendError::on_run (Request* request) const {
@@ -1505,21 +1512,20 @@ void ClientOptions::on_run(Request* request) const {
15051512 QueryParameters params;
15061513 if (!request->decode_query (&query, ¶ms)) {
15071514 request->error (ERROR_PROTOCOL_ERROR, " Invalid query message" );
1508- } else {
1509- if (query == CLIENT_OPTIONS_QUERY) {
1510- ResultSet::Builder builder (" client" , " options" );
1511- Row::Builder row_builder;
1512- for (Options::const_iterator it = request->client ()->options ().begin (),
1513- end = request->client ()->options ().end (); it != end; ++it) {
1514- builder.column ((*it).first , Type::text ());
1515- row_builder.text ((*it).second );
1516- }
1517- ResultSet client_options = builder.row (row_builder.build ()).build ();
1518-
1519- request->write (OPCODE_RESULT, client_options.encode (request->version ()));
1515+ } else if (query == CLIENT_OPTIONS_QUERY) {
1516+ ResultSet::Builder builder (" client" , " options" );
1517+ Row::Builder row_builder;
1518+ for (Options::const_iterator it = request->client ()->options ().begin (),
1519+ end = request->client ()->options ().end (); it != end; ++it) {
1520+ builder.column ((*it).first , Type::text ());
1521+ row_builder.text ((*it).second );
15201522 }
1523+ ResultSet client_options = builder.row (row_builder.build ()).build ();
1524+
1525+ request->write (OPCODE_RESULT, client_options.encode (request->version ()));
1526+ } else {
1527+ run_next (request);
15211528 }
1522- run_next (request);
15231529}
15241530
15251531void SystemLocal::on_run (Request* request) const {
@@ -1543,7 +1549,43 @@ void SystemLocal::on_run(Request* request) const {
15431549 .text (request->client ()->server ()->address ().to_string ())
15441550 .text (host.dc )
15451551 .text (host.rack )
1546- .text (SERVER_VERSION)
1552+ .text (CASSANDRA_VERSION)
1553+ .inet (request->client ()->server ()->address ())
1554+ .text (host.partitioner )
1555+ .collection (Collection::text (host.tokens ))
1556+ .build ())
1557+ .build ();
1558+
1559+ request->write (OPCODE_RESULT, local_rs.encode (request->version ()));
1560+ } else {
1561+ run_next (request);
1562+ }
1563+ }
1564+
1565+ void SystemLocalDse::on_run (Request* request) const {
1566+ String query;
1567+ QueryParameters params;
1568+ if (!request->decode_query (&query, ¶ms)) {
1569+ request->error (ERROR_PROTOCOL_ERROR, " Invalid query message" );
1570+ } else if (query.find (SELECT_LOCAL) != String::npos) {
1571+ const Host& host (request->host (request->address ()));
1572+
1573+ ResultSet local_rs
1574+ = ResultSet::Builder (" system" , " local" )
1575+ .column (" key" , Type::text ())
1576+ .column (" data_center" , Type::text ())
1577+ .column (" rack" , Type::text ())
1578+ .column (" dse_version" , Type::text ())
1579+ .column (" release_version" , Type::text ())
1580+ .column (" rpc_address" , Type::inet ())
1581+ .column (" partitioner" , Type::text ())
1582+ .column (" tokens" , Type::list (Type::text ()))
1583+ .row (Row::Builder ()
1584+ .text (request->client ()->server ()->address ().to_string ())
1585+ .text (host.dc )
1586+ .text (host.rack )
1587+ .text (DSE_VERSION)
1588+ .text (DSE_CASSANDRA_VERSION)
15471589 .inet (request->client ()->server ()->address ())
15481590 .text (host.partitioner )
15491591 .collection (Collection::text (host.tokens ))
@@ -1586,7 +1628,80 @@ void SystemPeers::on_run(Request* request) const {
15861628 .inet (host.address )
15871629 .text (host.dc )
15881630 .text (host.rack )
1589- .text (SERVER_VERSION)
1631+ .text (CASSANDRA_VERSION)
1632+ .inet (host.address )
1633+ .collection (Collection::text (host.tokens ))
1634+ .build ());
1635+ }
1636+ ResultSet peers_rs = peers_builder.build ();
1637+ request->write (OPCODE_RESULT, peers_rs.encode (request->version ()));
1638+ } else {
1639+ pos += where_clause.size ();
1640+ size_t end_pos = query.find (" '" , pos);
1641+ if (end_pos == String::npos) {
1642+ request->error (ERROR_INVALID_QUERY, " Invalid WHERE clause" );
1643+ return ;
1644+ }
1645+
1646+ String ip = query.substr (pos, end_pos - pos);
1647+ Address address;
1648+ if (!Address::from_string (ip, request->address ().port (), &address)) {
1649+ request->error (ERROR_INVALID_QUERY, " Invalid inet address in WHERE clause" );
1650+ return ;
1651+ }
1652+
1653+ const Host& host (request->host (address));
1654+ ResultSet peers_rs
1655+ = peers_builder
1656+ .row (Row::Builder ()
1657+ .inet (host.address )
1658+ .text (host.dc )
1659+ .text (host.rack )
1660+ .text (CASSANDRA_VERSION)
1661+ .inet (host.address )
1662+ .collection (Collection::text (host.tokens ))
1663+ .build ())
1664+ .build ();
1665+ request->write (OPCODE_RESULT, peers_rs.encode (request->version ()));
1666+ }
1667+ } else {
1668+ run_next (request);
1669+ }
1670+ }
1671+
1672+ void SystemPeersDse::on_run (Request* request) const {
1673+ String query;
1674+ QueryParameters params;
1675+ if (!request->decode_query (&query, ¶ms)) {
1676+ request->error (ERROR_PROTOCOL_ERROR, " Invalid query message" );
1677+ } else if (query.find (SELECT_PEERS) != String::npos) {
1678+ const String where_clause (" WHERE peer = '" );
1679+ ResultSet::Builder peers_builder
1680+ = ResultSet::Builder (" system" , " peers" )
1681+ .column (" peer" , Type::inet ())
1682+ .column (" data_center" , Type::text ())
1683+ .column (" rack" , Type::text ())
1684+ .column (" dse_version" , Type::text ())
1685+ .column (" release_version" , Type::text ())
1686+ .column (" rpc_address" , Type::inet ())
1687+ .column (" tokens" , Type::list (Type::text ()));
1688+
1689+ size_t pos = query.find (where_clause);
1690+ if (pos == String::npos) {
1691+ Hosts hosts (request->hosts ());
1692+ for (Hosts::const_iterator it = hosts.begin (),
1693+ end = hosts.end (); it != end; ++it) {
1694+ const Host& host (*it);
1695+ if (host.address == request->address ()) {
1696+ continue ;
1697+ }
1698+ peers_builder
1699+ .row (Row::Builder ()
1700+ .inet (host.address )
1701+ .text (host.dc )
1702+ .text (host.rack )
1703+ .text (DSE_VERSION)
1704+ .text (DSE_CASSANDRA_VERSION)
15901705 .inet (host.address )
15911706 .collection (Collection::text (host.tokens ))
15921707 .build ());
@@ -1615,7 +1730,7 @@ void SystemPeers::on_run(Request* request) const {
16151730 .inet (host.address )
16161731 .text (host.dc )
16171732 .text (host.rack )
1618- .text (SERVER_VERSION )
1733+ .text (CASSANDRA_VERSION )
16191734 .inet (host.address )
16201735 .collection (Collection::text (host.tokens ))
16211736 .build ())
@@ -1830,8 +1945,9 @@ int32_t ProtocolHandler::decode_frame(ClientConnection* client, const char* fram
18301945 version_ > request_handler_->highest_supported_protocol_version ()) {
18311946 // Respond using the highest supported protocol that the server
18321947 // supports (don't use the request's version because it's not supported)
1833- request_handler_->invalid_protocol (new Request (request_handler_->highest_supported_protocol_version (),
1834- flags_, stream_, opcode_, String (), client));
1948+ Request::Ptr request (new Request (request_handler_->highest_supported_protocol_version (),
1949+ flags_, stream_, opcode_, String (), client));
1950+ request_handler_->invalid_protocol (request.get ());
18351951 return len - remaining;
18361952 }
18371953 state_ = HEADER;
@@ -1875,13 +1991,8 @@ int32_t ProtocolHandler::decode_frame(ClientConnection* client, const char* fram
18751991}
18761992
18771993void ProtocolHandler::decode_body (ClientConnection* client, const char * body, int32_t len) {
1878- request_handler_->run (new Request (version_, flags_, stream_, opcode_, String (body, len), client));
1879- }
1880-
1881- ClientConnection::~ClientConnection () {
1882- while (!requests_.is_empty ()) {
1883- delete requests_.front (); // Removes itself from the list
1884- }
1994+ Request::Ptr request (new Request (version_, flags_, stream_, opcode_, String (body, len), client));
1995+ request_handler_->run (request.get ());
18851996}
18861997
18871998void ClientConnection::on_read (const char * data, size_t len) {
0 commit comments