Skip to content

Commit cf31713

Browse files
Michael Feromikefero
authored andcommitted
CPP-402 - Add callback API for host events
1 parent ec55266 commit cf31713

11 files changed

Lines changed: 293 additions & 33 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
host_listener
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
cmake_minimum_required(VERSION 2.6.4)
2+
3+
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY ".")
4+
set(PROJECT_EXAMPLE_NAME host_listener)
5+
6+
file(GLOB EXAMPLE_SRC_FILES ${CASS_ROOT_DIR}/examples/host_listener/*.c)
7+
include_directories(${INCLUDES})
8+
add_executable(${PROJECT_EXAMPLE_NAME} ${EXAMPLE_SRC_FILES})
9+
target_link_libraries(${PROJECT_EXAMPLE_NAME} ${PROJECT_LIB_NAME_TARGET} ${CASS_LIBS})
10+
add_dependencies(${PROJECT_EXAMPLE_NAME} ${PROJECT_LIB_NAME_TARGET})
11+
12+
set_property(
13+
TARGET ${PROJECT_EXAMPLE_NAME}
14+
APPEND PROPERTY COMPILE_FLAGS ${CASS_EXAMPLE_C_FLAGS})
15+
set_property(TARGET ${PROJECT_EXAMPLE_NAME} PROPERTY FOLDER "Examples")
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
This is free and unencumbered software released into the public domain.
3+
4+
Anyone is free to copy, modify, publish, use, compile, sell, or
5+
distribute this software, either in source code form or as a compiled
6+
binary, for any purpose, commercial or non-commercial, and by any
7+
means.
8+
9+
In jurisdictions that recognize copyright laws, the author or authors
10+
of this software dedicate any and all copyright interest in the
11+
software to the public domain. We make this dedication for the benefit
12+
of the public at large and to the detriment of our heirs and
13+
successors. We intend this dedication to be an overt act of
14+
relinquishment in perpetuity of all present and future rights to this
15+
software under copyright law.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18+
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19+
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
20+
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
21+
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
22+
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23+
OTHER DEALINGS IN THE SOFTWARE.
24+
25+
For more information, please refer to <http://unlicense.org/>
26+
*/
27+
28+
#include <cassandra.h>
29+
30+
31+
#include <stdio.h>
32+
#include <uv.h>
33+
34+
void on_signal(uv_signal_t* handle, int signum) {
35+
uv_signal_stop(handle);
36+
}
37+
38+
void on_host_listener(CassHostListenerEvent event, CassInet inet, void* data);
39+
40+
void print_error(CassFuture* future) {
41+
const char* message;
42+
size_t message_length;
43+
cass_future_error_message(future, &message, &message_length);
44+
fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
45+
}
46+
47+
CassCluster* create_cluster(const char* hosts) {
48+
CassCluster* cluster = cass_cluster_new();
49+
cass_log_set_level(CASS_LOG_DISABLED);
50+
cass_cluster_set_contact_points(cluster, hosts);
51+
cass_cluster_set_host_listener_callback(cluster, on_host_listener, NULL);
52+
return cluster;
53+
}
54+
55+
CassError connect_session(CassSession* session, const CassCluster* cluster) {
56+
CassError rc = CASS_OK;
57+
CassFuture* future = cass_session_connect(session, cluster);
58+
59+
cass_future_wait(future);
60+
rc = cass_future_error_code(future);
61+
if (rc != CASS_OK) {
62+
print_error(future);
63+
}
64+
cass_future_free(future);
65+
66+
return rc;
67+
}
68+
69+
void on_host_listener(CassHostListenerEvent event, CassInet inet, void* data) {
70+
char address[CASS_INET_STRING_LENGTH];
71+
72+
cass_inet_string(inet, address);
73+
if (event == CASS_HOST_LISTENER_EVENT_ADD) {
74+
printf("Host %s has been ADDED\n", address);
75+
} else if (event == CASS_HOST_LISTENER_EVENT_REMOVE) {
76+
printf("Host %s has been REMOVED\n", address);
77+
} else if (event == CASS_HOST_LISTENER_EVENT_UP) {
78+
printf("Host %s is UP\n", address);
79+
} else if (event == CASS_HOST_LISTENER_EVENT_DOWN) {
80+
printf("Host %s is DOWN\n", address);
81+
}
82+
}
83+
84+
int main(int argc, char* argv[]) {
85+
CassCluster* cluster = NULL;
86+
CassSession* session = cass_session_new();
87+
char* hosts = "127.0.0.1";
88+
uv_loop_t loop;
89+
uv_signal_t signal;
90+
if (argc > 1) {
91+
hosts = argv[1];
92+
}
93+
94+
cluster = create_cluster(hosts);
95+
96+
if (connect_session(session, cluster) != CASS_OK) {
97+
cass_cluster_free(cluster);
98+
cass_session_free(session);
99+
return -1;
100+
}
101+
102+
uv_loop_init(&loop);
103+
uv_signal_init(&loop, &signal);
104+
uv_signal_start(&signal, on_signal, SIGINT);
105+
fprintf(stderr, "Press CTRL+C to exit ...\n");
106+
uv_run(&loop, UV_RUN_DEFAULT);
107+
uv_loop_close(&loop);
108+
109+
cass_cluster_free(cluster);
110+
cass_session_free(session);
111+
112+
return 0;
113+
}

cpp-driver/include/cassandra.h

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,6 @@ typedef void (*CassFreeFunction)(void* ptr);
809809
*/
810810
typedef struct CassAuthenticator_ CassAuthenticator;
811811

812-
813812
/**
814813
* A callback used to initiate an authentication exchange.
815814
*
@@ -869,7 +868,6 @@ typedef void (*CassAuthenticatorSuccessCallback)(CassAuthenticator* auth,
869868
typedef void (*CassAuthenticatorCleanupCallback)(CassAuthenticator* auth,
870869
void* data);
871870

872-
873871
/**
874872
* A callback used to cleanup resources.
875873
*
@@ -887,6 +885,25 @@ typedef struct CassAuthenticatorCallbacks_ {
887885
CassAuthenticatorCleanupCallback cleanup_callback;
888886
} CassAuthenticatorCallbacks;
889887

888+
typedef enum CassHostListenerEvent_ {
889+
CASS_HOST_LISTENER_EVENT_UP,
890+
CASS_HOST_LISTENER_EVENT_DOWN,
891+
CASS_HOST_LISTENER_EVENT_ADD,
892+
CASS_HOST_LISTENER_EVENT_REMOVE
893+
} CassHostListenerEvent;
894+
895+
/**
896+
* A callback used to indicate the host state for a node in the cluster.
897+
*
898+
* @param[in] event
899+
* @param[in] address
900+
* @param[in] data
901+
* @see cass_cluster_set_host_listener_callback()
902+
*/
903+
typedef void(*CassHostListenerCallback)(CassHostListenerEvent event,
904+
const CassInet address,
905+
void* data);
906+
890907
/***********************************************************************************
891908
*
892909
* Execution Profile
@@ -2655,6 +2672,24 @@ CASS_EXPORT CassError
26552672
cass_cluster_set_no_compact(CassCluster* cluster,
26562673
cass_bool_t enabled);
26572674

2675+
/**
2676+
* Sets a callback for handling host state changes in the cluster.
2677+
*
2678+
* <b>Note:</b> The callback is invoked only when state changes in the cluster
2679+
* are applicable to the configured load balancing policy(s).
2680+
*
2681+
* @public @memberor CassCluster
2682+
*
2683+
* @param[in] cluster
2684+
* @param[in] callback
2685+
* @param[in] data
2686+
* @return CASS_OK if successful, otherwise and error occurred
2687+
*/
2688+
CASS_EXPORT CassError
2689+
cass_cluster_set_host_listener_callback(CassCluster* cluster,
2690+
CassHostListenerCallback callback,
2691+
void* data);
2692+
26582693
/***********************************************************************************
26592694
*
26602695
* Session

cpp-driver/src/cluster.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ class Cluster : public RefCounted<Cluster>
210210
* Constructor. Don't use directly.
211211
*
212212
* @param connection The current control connection.
213-
* @param listener A listener to handle events.
213+
* @param listener A listener to handle cluster events.
214214
* @param event_loop The event loop.
215215
* @param connected_host The currently connected host.
216216
* @param hosts Available hosts for the cluster (based on load balancing

cpp-driver/src/cluster_config.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ void cass_cluster_set_use_schema(CassCluster* cluster,
434434
}
435435

436436
CassError cass_cluster_set_use_hostname_resolution(CassCluster* cluster,
437-
cass_bool_t enabled) {
437+
cass_bool_t enabled) {
438438
cluster->config().set_use_hostname_resolution(enabled == cass_true);
439439
return CASS_OK;
440440
}
@@ -479,9 +479,9 @@ CassError cass_cluster_set_execution_profile(CassCluster* cluster,
479479
}
480480

481481
CassError cass_cluster_set_execution_profile_n(CassCluster* cluster,
482-
const char* name,
483-
size_t name_length,
484-
CassExecProfile* profile) {
482+
const char* name,
483+
size_t name_length,
484+
CassExecProfile* profile) {
485485
if (name_length == 0 || !profile) {
486486
return CASS_ERROR_LIB_BAD_PARAMS;
487487
}
@@ -503,13 +503,13 @@ CassError cass_cluster_set_prepare_on_up_or_add_host(CassCluster* cluster,
503503
}
504504

505505
CassError cass_cluster_set_local_address(CassCluster* cluster,
506-
const char* name) {
506+
const char* name) {
507507
return cass_cluster_set_local_address_n(cluster, name, SAFE_STRLEN(name));
508508
}
509509

510510
CassError cass_cluster_set_local_address_n(CassCluster* cluster,
511-
const char* name,
512-
size_t name_length) {
511+
const char* name,
512+
size_t name_length) {
513513
cass::Address address; // default to AF_UNSPEC
514514
if (name_length == 0 ||
515515
name == NULL ||
@@ -527,6 +527,15 @@ CassError cass_cluster_set_no_compact(CassCluster* cluster,
527527
return CASS_OK;
528528
}
529529

530+
CassError cass_cluster_set_host_listener_callback(CassCluster* cluster,
531+
CassHostListenerCallback callback,
532+
void* data) {
533+
cluster->config().set_host_listener(
534+
cass::DefaultHostListener::Ptr(
535+
cass::Memory::allocate<cass::ExternalHostListener>(callback, data)));
536+
return CASS_OK;
537+
}
538+
530539
void cass_cluster_free(CassCluster* cluster) {
531540
cass::Memory::deallocate(cluster->from());
532541
}

cpp-driver/src/config.hpp

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ class Config {
6767
, max_reusable_write_objects_(CASS_DEFAULT_MAX_REUSABLE_WRITE_OBJECTS)
6868
, prepare_on_all_hosts_(CASS_DEFAULT_PREPARE_ON_ALL_HOSTS)
6969
, prepare_on_up_or_add_host_(CASS_DEFAULT_PREPARE_ON_UP_OR_ADD_HOST)
70-
, no_compact_(CASS_DEFAULT_NO_COMPACT) {
70+
, no_compact_(CASS_DEFAULT_NO_COMPACT)
71+
, host_listener_(Memory::allocate<DefaultHostListener>()) {
7172
profiles_.set_empty_key(String());
7273

7374
// Assign the defaults to the cluster profile
@@ -397,6 +398,16 @@ class Config {
397398
application_version_ = application_version;
398399
}
399400

401+
const DefaultHostListener::Ptr& host_listener() const { return host_listener_; }
402+
403+
void set_host_listener(const DefaultHostListener::Ptr& listener) {
404+
if (listener) {
405+
host_listener_ = listener;
406+
} else {
407+
host_listener_.reset(Memory::allocate<DefaultHostListener>());
408+
}
409+
}
410+
400411
private:
401412
void init_profiles();
402413

@@ -440,6 +451,7 @@ class Config {
440451
bool no_compact_;
441452
String application_name_;
442453
String application_version_;
454+
DefaultHostListener::Ptr host_listener_;
443455
};
444456

445457
} // namespace cass

cpp-driver/src/host.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,33 @@ void Host::set(const Row* row, bool use_tokens) {
133133
}
134134
}
135135

136+
ExternalHostListener::ExternalHostListener(const CassHostListenerCallback callback,
137+
void *data)
138+
: callback_(callback)
139+
, data_(data) { }
140+
141+
void ExternalHostListener::on_up(const Host::Ptr& host) {
142+
CassInet address;
143+
address.address_length = host->address().to_inet(address.address);
144+
callback_(CASS_HOST_LISTENER_EVENT_UP, address, data_);
145+
}
146+
147+
void ExternalHostListener::on_down(const Host::Ptr& host) {
148+
CassInet address;
149+
address.address_length = host->address().to_inet(address.address);
150+
callback_(CASS_HOST_LISTENER_EVENT_DOWN, address, data_);
151+
}
152+
153+
void ExternalHostListener::on_add(const Host::Ptr& host) {
154+
CassInet address;
155+
address.address_length = host->address().to_inet(address.address);
156+
callback_(CASS_HOST_LISTENER_EVENT_ADD, address, data_);
157+
}
158+
159+
void ExternalHostListener::on_remove(const Host::Ptr& host) {
160+
CassInet address;
161+
address.address_length = host->address().to_inet(address.address);
162+
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, address, data_);
163+
}
164+
136165
} // namespace cass

cpp-driver/src/host.hpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,34 @@ class HostListener {
263263
virtual void on_remove(const Host::Ptr& host) = 0;
264264
};
265265

266+
class DefaultHostListener
267+
: public HostListener
268+
, public RefCounted<DefaultHostListener> {
269+
public:
270+
typedef SharedRefPtr<DefaultHostListener> Ptr;
271+
272+
virtual void on_up(const Host::Ptr& host) { }
273+
virtual void on_down(const Host::Ptr& host) { }
274+
virtual void on_add(const Host::Ptr& host) { }
275+
virtual void on_remove(const Host::Ptr& host) { }
276+
};
277+
278+
class ExternalHostListener : public DefaultHostListener {
279+
public:
280+
typedef SharedRefPtr<ExternalHostListener> Ptr;
281+
ExternalHostListener(const CassHostListenerCallback callback,
282+
void *data);
283+
284+
virtual void on_up(const Host::Ptr& host);
285+
virtual void on_down(const Host::Ptr& host);
286+
virtual void on_add(const Host::Ptr& host);
287+
virtual void on_remove(const Host::Ptr& host);
288+
289+
private:
290+
const CassHostListenerCallback callback_;
291+
void* data_;
292+
};
293+
266294
typedef Map<Address, Host::Ptr> HostMap;
267295

268296
struct GetAddress {

0 commit comments

Comments
 (0)