Skip to content

Commit abe4772

Browse files
committed
Almost working verison. Minor tweaks..still returning null data
1 parent a508a20 commit abe4772

2 files changed

Lines changed: 43 additions & 27 deletions

File tree

subscriptions/subscription_manager.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import redis
22
import gevent
3+
import json
34
from types import FunctionType
45
from promise import Promise
56
from graphql import parse, validate, specified_rules, value_from_ast, execute
@@ -9,23 +10,24 @@
910
class RedisPubsub(object):
1011

1112
def __init__(self, host='localhost', port=6379, *args, **kwargs):
12-
redis.connection.socket = gevent.socket # may not need this -- test
13+
redis.connection.socket = gevent.socket
1314
self.redis = redis.StrictRedis(host, port, *args, **kwargs)
1415
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
1516
self.subscriptions = {}
1617
self.sub_id_counter = 0
1718
self.greenlet = None
1819

1920
def publish(self, trigger_name, message):
20-
self.redis.publish(trigger_name, message)
21+
self.redis.publish(trigger_name, json.dumps(message))
2122
return True
2223

2324
def subscribe(self, trigger_name, on_message_handler, options):
24-
trigger = {}
25-
trigger[trigger_name] = on_message_handler
26-
self.pubsub.subscribe(**trigger)
25+
self.pubsub.subscribe(trigger_name)
2726
if not self.greenlet:
28-
self.greenlet = gevent.spawn(self.wait_and_get_message)
27+
self.greenlet = gevent.spawn(
28+
self.wait_and_get_message,
29+
on_message_handler
30+
)
2931
self.sub_id_counter += 1
3032
self.subscriptions[self.sub_id_counter] = trigger_name
3133
return Promise.resolve(self.sub_id_counter)
@@ -37,9 +39,11 @@ def unsubscribe(self, sub_id):
3739
if not self.subscriptions:
3840
self.greenlet = self.greenlet.kill()
3941

40-
def wait_and_get_message(self):
42+
def wait_and_get_message(self, on_message_handler):
4143
while True:
42-
self.pubsub.get_message()
44+
message = self.pubsub.get_message()
45+
if message:
46+
on_message_handler(json.loads(message['data']))
4347
gevent.sleep(.001) # may not need this sleep call - test
4448

4549

@@ -69,7 +73,7 @@ def subscribe(self, query, operation_name, callback, variables, context,
6973
self.schema,
7074
parsed_query,
7175
# TODO: Need to create/add subscriptionHasSingleRootField
72-
# rule
76+
# rule from apollo subscription manager package
7377
rules=specified_rules
7478
)
7579

@@ -86,21 +90,23 @@ def subscribe(self, query, operation_name, callback, variables, context,
8690
root_field = definition.selection_set.selections[0]
8791
subscription_name = root_field.name.value
8892

89-
fields = self.schema.get_subscription_type().get_fields()
93+
fields = self.schema.get_subscription_type().fields
9094

9195
for arg in root_field.arguments:
9296

93-
arg_definition = [arg_def for arg_def in
94-
fields[subscription_name].args if
95-
arg_def.name == arg.name.value][0]
97+
arg_definition = [
98+
arg_def for _, arg_def in
99+
fields.get(subscription_name).args.iteritems() if
100+
arg_def.out_name == arg.name.value
101+
][0]
96102

97-
args[arg_definition.name] = value_from_ast(
103+
args[arg_definition.out_name] = value_from_ast(
98104
arg.value,
99105
arg_definition.type,
100106
variables=variables
101107
)
102108

103-
if self.setup_funcs[subscription_name]:
109+
if self.setup_funcs.get(subscription_name):
104110
trigger_map = self.setup_funcs[subscription_name](
105111
query,
106112
operation_name,
@@ -126,14 +132,14 @@ def subscribe(self, query, operation_name, callback, variables, context,
126132
'channel_options',
127133
{}
128134
)
129-
filter_func = trigger_map[trigger_name].get(
130-
'filter_func',
131-
True
135+
filter = trigger_map[trigger_name].get(
136+
'filter',
137+
lambda arg1, arg2: True
132138
)
133139

134140
def on_message(root_value):
135141

136-
def context_promise_handler():
142+
def context_promise_handler(result):
137143
if isinstance(context, FunctionType):
138144
return context()
139145
else:
@@ -142,11 +148,11 @@ def context_promise_handler():
142148
def filter_func_promise_handler(context):
143149
return Promise.all([
144150
context,
145-
filter_func(root_value, context)
151+
filter(root_value, context)
146152
])
147153

148154
def context_do_execute_handler(result):
149-
root_value, do_execute = result
155+
context, do_execute = result
150156
if not do_execute:
151157
return
152158
execute(
@@ -156,17 +162,18 @@ def context_do_execute_handler(result):
156162
context,
157163
variables,
158164
operation_name
159-
).then(
160-
lambda data: callback(None, data)
161165
)
162166

163167
return Promise.resolve(
168+
True
164169
).then(
165170
context_promise_handler
166171
).then(
167172
filter_func_promise_handler
168173
).then(
169174
context_do_execute_handler
175+
).then(
176+
lambda data: callback(None, data)
170177
).catch(
171178
lambda error: callback(error)
172179
)

subscriptions/subscription_transport_ws.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ def on_message_return_handler(message):
6767
if parsed_message.get('type') == INIT:
6868

6969
on_connect_promise = Promise.resolve(True)
70-
7170
nonlocal.on_init_resolve(on_connect_promise)
7271

7372
def init_success_promise_handler(result):
@@ -100,7 +99,7 @@ def subscription_start_promise_handler(init_result):
10099
}
101100
promised_params = Promise.resolve(base_params)
102101

103-
if self.connection_subscriptions[sub_id]:
102+
if self.connection_subscriptions.get(sub_id):
104103
self.unsubscribe(self.connection_subscriptions[sub_id])
105104
del self.connection_subscriptions[sub_id]
106105

@@ -168,16 +167,26 @@ def error_catch_handler(e):
168167
error_catch_handler
169168
)
170169

170+
# Promise from init statement (line 54)
171+
# seems to reset between if statements
172+
# not sure if this behavior is correct or
173+
# not per promises A spec...need to
174+
# investigate
175+
nonlocal.on_init_resolve(Promise.resolve(True))
176+
171177
self.connection_context['init_promise'].then(
172178
subscription_start_promise_handler)
173179

174180
elif parsed_message.get('type') == SUBSCRIPTION_END:
175181

176182
def subscription_end_promise_handler(result):
177-
if self.connection_subscriptions[sub_id]:
183+
if self.connection_subscriptions.get(sub_id):
178184
self.unsubscribe(self.connection_subscriptions[sub_id])
179185
del self.connection_subscriptions[sub_id]
180186

187+
# same rationale as above
188+
nonlocal.on_init_resolve(Promise.resolve(True))
189+
181190
self.connection_context['init_promise'].then(
182191
subscription_end_promise_handler
183192
)
@@ -216,7 +225,7 @@ def send_subscription_success(self, sub_id):
216225
self.ws.send(json.dumps(message))
217226

218227
def send_init_result(self, result):
219-
self.ws.send(json.dumps(result)) # may need to use promise here
228+
self.ws.send(json.dumps(result))
220229
if result.get('type') == INIT_FAIL:
221230
self.ws.close(1011)
222231

0 commit comments

Comments
 (0)