Skip to content

Commit 5758770

Browse files
committed
Added keep_alive messages and on_connect, on_subscribe, etc.
optional server setup funcs
1 parent 1b8905b commit 5758770

2 files changed

Lines changed: 49 additions & 7 deletions

File tree

subscriptions/subscription_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def wait_and_get_message(self):
5454
message = self.pubsub.get_message(ignore_subscribe_messages=True)
5555
if message:
5656
self.handle_message(message)
57-
gevent.sleep(.001) # may not need this sleep call - test
57+
gevent.sleep(.001)
5858

5959
def handle_message(self, message):
6060
for sub_id, trigger_map in self.subscriptions.iteritems():
@@ -142,7 +142,7 @@ def subscribe(self, query, operation_name, callback, variables, context,
142142
self.subscriptions[external_subscription_id] = []
143143
subscription_promises = []
144144

145-
for trigger_name in trigger_map.keys():
145+
for trigger_name in trigger_map.viewkeys():
146146
channel_options = trigger_map[trigger_name].get(
147147
'channel_options',
148148
{}

subscriptions/subscription_transport_ws.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
import gevent
23
from geventwebsocket import WebSocketApplication
34
from promise import Promise
45

@@ -13,32 +14,60 @@
1314
INIT_FAIL = 'init_fail'
1415
GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions'
1516

16-
# TODO: Implement 'keep_alive' message sent to client that is in
17-
# apollo subscription-transport constructor
18-
1917

2018
class ApolloSubscriptionServer(WebSocketApplication):
2119

22-
def __init__(self, subscription_manager, websocket):
20+
def __init__(self, subscription_manager, websocket, keep_alive=None,
21+
on_subscribe=None, on_unsubscribe=None, on_connect=None,
22+
on_disconnect=None):
23+
2324
assert subscription_manager, "Must provide\
2425
'subscription_manager' to websocket app constructor"
26+
2527
self.subscription_manager = subscription_manager
28+
self.on_subscribe = on_subscribe
29+
self.on_unsubscribe = on_unsubscribe
30+
self.on_connect = on_connect
31+
self.on_disconnect = on_disconnect
32+
self.keep_alive = keep_alive
2633
self.connection_subscriptions = {}
2734
self.connection_context = {}
35+
2836
super(ApolloSubscriptionServer, self).__init__(websocket)
2937

38+
def timer(self, callback, period):
39+
while True:
40+
callback()
41+
gevent.sleep(period)
42+
3043
def unsubscribe(self, graphql_sub_id):
3144
self.subscription_manager.unsubscribe(graphql_sub_id)
3245

46+
if self.on_unsubscribe:
47+
self.on_unsubscribe(self.ws)
48+
3349
def on_open(self):
3450
if self.ws.protocol is None or (GRAPHQL_SUBSCRIPTIONS not in self.ws.protocol):
3551
self.ws.close(1002)
3652

53+
def keep_alive_callback():
54+
if not self.ws.closed:
55+
self.send_keep_alive()
56+
else:
57+
gevent.kill(keep_alive_timer)
58+
59+
if self.keep_alive:
60+
keep_alive_timer = gevent.spawn(self.timer, keep_alive_callback,
61+
self.keep_alive)
62+
3763
def on_close(self, reason):
38-
for sub_id in self.connection_subscriptions.keys():
64+
for sub_id in self.connection_subscriptions.viewkeys():
3965
self.unsubscribe(self.connection_subscriptions[sub_id])
4066
del self.connection_subscriptions[sub_id]
4167

68+
if self.on_disconnect:
69+
self.on_disconnect(self.ws)
70+
4271
def on_message(self, msg):
4372
if msg is None:
4473
return
@@ -67,6 +96,12 @@ def on_message_return_handler(message):
6796
if parsed_message.get('type') == INIT:
6897

6998
on_connect_promise = Promise.resolve(True)
99+
100+
if self.on_connect:
101+
on_connect_promise = Promise.resolve(self.on_connect(
102+
parsed_message.get('payload'), self.ws
103+
))
104+
70105
nonlocal.on_init_resolve(on_connect_promise)
71106

72107
def init_success_promise_handler(result):
@@ -99,6 +134,13 @@ def subscription_start_promise_handler(init_result):
99134
}
100135
promised_params = Promise.resolve(base_params)
101136

137+
if self.on_subscribe:
138+
promised_params = Promise.resolve(self.on_subscribe(
139+
parsed_message,
140+
base_params,
141+
self.ws
142+
))
143+
102144
if self.connection_subscriptions.get(sub_id):
103145
self.unsubscribe(self.connection_subscriptions[sub_id])
104146
del self.connection_subscriptions[sub_id]

0 commit comments

Comments
 (0)