Skip to content

Commit e50e50c

Browse files
committed
Cleaned up subscription_manager code and promises a bit
1 parent d825b41 commit e50e50c

1 file changed

Lines changed: 33 additions & 25 deletions

File tree

subscriptions/subscription_manager.py

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
class RedisPubsub(object):
1010

1111
def __init__(self, host='localhost', port=6379, *args, **kwargs):
12+
redis.connection.socket = gevent.socket # may not need this -- test
1213
self.redis = redis.StrictRedis(host, port, *args, **kwargs)
1314
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
1415
self.subscriptions = {}
@@ -38,8 +39,8 @@ def unsubscribe(self, sub_id):
3839

3940
def wait_and_get_message(self):
4041
while True:
41-
self.pubsub.get_message() # might need to monkey patch
42-
gevent.sleep(.001) # may not need this sleep call
42+
self.pubsub.get_message()
43+
gevent.sleep(.001) # may not need this sleep call - test
4344

4445

4546
class ValidationError(Exception):
@@ -91,7 +92,7 @@ def subscribe(self, query, operation_name, callback, variables, context,
9192

9293
arg_definition = [arg_def for arg_def in
9394
fields[subscription_name].args if
94-
arg_def == arg.name.value][0]
95+
arg_def.name == arg.name.value][0]
9596

9697
args[arg_definition.name] = value_from_ast(
9798
arg.value,
@@ -131,49 +132,56 @@ def subscribe(self, query, operation_name, callback, variables, context,
131132
)
132133

133134
def on_message(root_value):
134-
if isinstance(context, FunctionType):
135-
context_promise = Promise(
136-
lambda resolve: resolve(context())
137-
)
138-
else:
139-
context_promise = Promise.resolve(context)
140135

141-
def context_promise_then(context):
142-
if not filter_func(root_value, context):
143-
return
136+
def context_promise_handler():
137+
if isinstance(context, FunctionType):
138+
return context()
139+
else:
140+
return context
141+
142+
def filter_func_promise_handler(context):
143+
return Promise.all([
144+
context,
145+
filter_func(root_value, context)
146+
])
144147

148+
def context_do_execute_handler(result):
149+
root_value, do_execute = result
150+
if not do_execute:
151+
return
145152
execute(
146153
self.schema,
147154
parsed_query,
148155
root_value,
149156
context,
150157
variables,
151158
operation_name
159+
).then(
160+
lambda data: callback(None, data)
152161
)
153162

154-
context_promise.then(
155-
context_promise_then
163+
return Promise.resolve(
164+
).then(
165+
context_promise_handler
156166
).then(
157-
lambda data: callback(None, data)
167+
filter_func_promise_handler
168+
).then(
169+
context_do_execute_handler
158170
).catch(
159171
lambda error: callback(error)
160172
)
161173

162-
subs_promise = Promise(
163-
lambda resolve: resolve(self.pubsub.subscribe(
174+
subscription_promises.append(
175+
self.pubsub.subscribe(
164176
trigger_name,
165177
on_message,
166178
channel_options
167-
))
168-
)
169-
170-
subs_promise.then(
171-
lambda id: self.subscriptions[
172-
external_subscription_id].append(id)
179+
).then(
180+
lambda id: self.subscriptions[
181+
external_subscription_id].append(id)
182+
)
173183
)
174184

175-
subscription_promises.append(subs_promise)
176-
177185
return Promise.all(subscription_promises).then(
178186
lambda result: external_subscription_id
179187
)

0 commit comments

Comments
 (0)