2222#define SSL_HANDSHAKE_MAX_BUFFER_SIZE (16 * 1024 + 5 )
2323
2424using namespace datastax ;
25+ using namespace datastax ::internal;
2526using namespace datastax ::internal::core;
2627
2728namespace datastax { namespace internal { namespace core {
2829
30+ namespace {
31+
32+ // Used for debugging resolved addresses.
33+ String to_string (const AddressVec& addresses) {
34+ String result;
35+ for (AddressVec::const_iterator it = addresses.begin (), end = addresses.end (); it != end; ++it) {
36+ if (!result.empty ()) result.append (" , " );
37+ result.append (it->to_string ());
38+ }
39+ return result;
40+ }
41+
42+ } // namespace
43+
2944/* *
3045 * A socket handler that handles the SSL handshake process.
3146 */
@@ -85,6 +100,8 @@ SocketSettings::SocketSettings(const Config& config)
85100 , max_reusable_write_objects(config.max_reusable_write_objects())
86101 , local_address(config.local_address()) {}
87102
103+ Atomic<size_t > SocketConnector::resolved_address_offset_ (0 );
104+
88105SocketConnector::SocketConnector (const Address& address, const Callback& callback)
89106 : address_(address)
90107 , callback_(callback)
@@ -99,23 +116,34 @@ SocketConnector* SocketConnector::with_settings(const SocketSettings& settings)
99116void SocketConnector::connect (uv_loop_t * loop) {
100117 inc_ref (); // For the event loop
101118
102- if (settings_.hostname_resolution_enabled ) {
103- // Run hostname resolution then connect.
104- resolver_.reset (new NameResolver (address_, bind_callback (&SocketConnector::on_resolve, this )));
119+ if (!address_.is_resolved ()) { // Address not resolved
120+ hostname_ = address_.to_string ();
121+
122+ resolver_.reset (new Resolver (hostname_, address_.port (),
123+ bind_callback (&SocketConnector::on_resolve, this )));
105124 resolver_->resolve (loop, settings_.resolve_timeout_ms );
106125 } else {
107- // Postpone the connection process until after this method ends because it
108- // can call the callback (via on_error() when when the socket fails to
109- // init/bind) and destroy its parent.
110- no_resolve_timer_.start (loop,
111- 0 , // Run connect immediately after.
112- bind_callback (&SocketConnector::on_no_resolve, this ));
126+ resolved_address_ = address_;
127+
128+ if (settings_.hostname_resolution_enabled ) { // Run hostname resolution then connect.
129+ name_resolver_.reset (
130+ new NameResolver (address_, bind_callback (&SocketConnector::on_name_resolve, this )));
131+ name_resolver_->resolve (loop, settings_.resolve_timeout_ms );
132+ } else {
133+ // Postpone the connection process until after this method ends because it
134+ // can call the callback (via on_error() when when the socket fails to
135+ // init/bind) and destroy its parent.
136+ no_resolve_timer_.start (loop,
137+ 0 , // Run connect immediately after.
138+ bind_callback (&SocketConnector::on_no_resolve, this ));
139+ }
113140 }
114141}
115142
116143void SocketConnector::cancel () {
117144 error_code_ = SOCKET_CANCELED;
118145 if (resolver_) resolver_->cancel ();
146+ if (name_resolver_) name_resolver_->cancel ();
119147 if (connector_) connector_->cancel ();
120148 if (socket_) socket_->close ();
121149}
@@ -127,7 +155,7 @@ Socket::Ptr SocketConnector::release_socket() {
127155}
128156
129157void SocketConnector::internal_connect (uv_loop_t * loop) {
130- Socket::Ptr socket (new Socket (address_ , settings_.max_reusable_write_objects ));
158+ Socket::Ptr socket (new Socket (resolved_address_ , settings_.max_reusable_write_objects ));
131159
132160 if (uv_tcp_init (loop, socket->handle ()) != 0 ) {
133161 on_error (SOCKET_ERROR_INIT, " Unable to initialize TCP object" );
@@ -159,11 +187,11 @@ void SocketConnector::internal_connect(uv_loop_t* loop) {
159187 }
160188
161189 if (settings_.ssl_context ) {
162- ssl_session_.reset (
163- settings_. ssl_context -> create_session (address_, hostname_, address_.server_name ()));
190+ ssl_session_.reset (settings_. ssl_context -> create_session (resolved_address_, hostname_,
191+ address_.server_name ()));
164192 }
165193
166- connector_.reset (new TcpConnector (address_ ));
194+ connector_.reset (new TcpConnector (resolved_address_ ));
167195 connector_->connect (socket_->handle (), bind_callback (&SocketConnector::on_connect, this ));
168196}
169197
@@ -253,7 +281,25 @@ void SocketConnector::on_connect(TcpConnector* tcp_connector) {
253281 }
254282}
255283
256- void SocketConnector::on_resolve (NameResolver* resolver) {
284+ void SocketConnector::on_resolve (Resolver* resolver) {
285+ if (resolver->is_success ()) {
286+ const AddressVec& addresses (resolver->addresses ());
287+ LOG_DEBUG (" Resolved the addresses %s for hostname %s" , to_string (addresses).c_str (),
288+ hostname_.c_str ());
289+ resolved_address_ =
290+ addresses[resolved_address_offset_.fetch_add (MEMORY_ORDER_RELAXED) % addresses.size ()];
291+ internal_connect (resolver->loop ());
292+ } else if (is_canceled () || resolver->is_canceled ()) {
293+ finish ();
294+ } else if (resolver->is_timed_out ()) {
295+ on_error (SOCKET_ERROR_RESOLVE_TIMEOUT, " Timed out attempting to resolve hostname" );
296+ } else {
297+ on_error (SOCKET_ERROR_RESOLVE,
298+ " Unable to resolve hostname '" + String (uv_strerror (resolver->uv_status ())) + " '" );
299+ }
300+ }
301+
302+ void SocketConnector::on_name_resolve (NameResolver* resolver) {
257303 if (resolver->is_success ()) {
258304 LOG_DEBUG (" Resolved the hostname %s for address %s" , resolver->hostname ().c_str (),
259305 resolver->address ().to_string ().c_str ());
0 commit comments