55from promise import Promise
66from graphql import parse , validate , specified_rules , value_from_ast , execute
77from graphql .language .ast import OperationDefinition
8+ # TODO: replace graphene dependency aith my own utils
9+ # folder and function
10+ from graphene .utils .str_converters import to_snake_case
811
912
1013class RedisPubsub (object ):
11-
1214 def __init__ (self , host = 'localhost' , port = 6379 , * args , ** kwargs ):
1315 redis .connection .socket = gevent .socket
1416 self .redis = redis .StrictRedis (host , port , * args , ** kwargs )
@@ -29,13 +31,10 @@ def subscribe(self, trigger_name, on_message_handler, options):
2931 except IndexError :
3032 self .pubsub .subscribe (trigger_name )
3133 self .subscriptions [self .sub_id_counter ] = [
32- trigger_name ,
33- on_message_handler
34+ trigger_name , on_message_handler
3435 ]
3536 if not self .greenlet :
36- self .greenlet = gevent .spawn (
37- self .wait_and_get_message
38- )
37+ self .greenlet = gevent .spawn (self .wait_and_get_message )
3938 return Promise .resolve (self .sub_id_counter )
4039
4140 def unsubscribe (self , sub_id ):
@@ -63,14 +62,12 @@ def handle_message(self, message):
6362
6463
6564class ValidationError (Exception ):
66-
6765 def __init__ (self , errors ):
6866 self .errors = errors
6967 self .message = 'Subscription query has validation errors'
7068
7169
7270class SubscriptionManager (object ):
73-
7471 def __init__ (self , schema , pubsub , setup_funcs = {}):
7572 self .schema = schema
7673 self .pubsub = pubsub
@@ -89,8 +86,7 @@ def subscribe(self, query, operation_name, callback, variables, context,
8986 parsed_query ,
9087 # TODO: Need to create/add subscriptionHasSingleRootField
9188 # rule from apollo subscription manager package
92- rules = specified_rules
93- )
89+ rules = specified_rules )
9490
9591 if errors :
9692 return Promise .reject (ValidationError (errors ))
@@ -110,29 +106,25 @@ def subscribe(self, query, operation_name, callback, variables, context,
110106 for arg in root_field .arguments :
111107
112108 arg_definition = [
113- arg_def for _ , arg_def in
114- fields .get (subscription_name ). args . iteritems () if
115- arg_def .out_name == arg .name .value
109+ arg_def
110+ for _ , arg_def in fields .get (subscription_name )
111+ . args . iteritems () if arg_def .out_name == arg .name .value
116112 ][0 ]
117113
118114 args [arg_definition .out_name ] = value_from_ast (
119- arg .value ,
120- arg_definition .type ,
121- variables = variables
122- )
123-
124- if self .setup_funcs .get (subscription_name ):
125- trigger_map = self .setup_funcs [subscription_name ](
126- query ,
127- operation_name ,
128- callback ,
129- variables ,
130- context ,
131- format_error ,
132- format_response ,
133- args ,
134- subscription_name
135- )
115+ arg .value , arg_definition .type , variables = variables )
116+
117+ if self .setup_funcs .get (to_snake_case (subscription_name )):
118+ trigger_map = self .setup_funcs [to_snake_case (subscription_name )](
119+ query = query ,
120+ operation_name = operation_name ,
121+ callback = callback ,
122+ variables = variables ,
123+ context = context ,
124+ format_error = format_error ,
125+ format_response = format_response ,
126+ args = args ,
127+ subscription_name = subscription_name )
136128 else :
137129 trigger_map = {}
138130 trigger_map [subscription_name ] = {}
@@ -143,71 +135,53 @@ def subscribe(self, query, operation_name, callback, variables, context,
143135 subscription_promises = []
144136
145137 for trigger_name in trigger_map .viewkeys ():
146- channel_options = trigger_map [trigger_name ].get (
147- 'channel_options' ,
148- {}
149- )
150- filter = trigger_map [trigger_name ].get (
151- 'filter' ,
152- lambda arg1 , arg2 : True
153- )
138+ try :
139+ channel_options = trigger_map [trigger_name ].get (
140+ 'channel_options' , {})
141+ filter = trigger_map [trigger_name ].get ('filter' ,
142+ lambda arg1 , arg2 : True )
143+ # TODO: Think about this some more...Apollo library
144+ # let's all messages through by default, even if
145+ # the users incorrectly uses the setup_funcs (do not
146+ # use 'filter' or 'channel_options' keys); I think it
147+ # would be better to raise an exception here
148+ except AttributeError :
149+ channel_options = {}
150+ filter = lambda arg1 , arg2 : True
154151
155152 def on_message (root_value ):
156-
157153 def context_promise_handler (result ):
158154 if isinstance (context , FunctionType ):
159155 return context ()
160156 else :
161157 return context
162158
163159 def filter_func_promise_handler (context ):
164- return Promise .all ([
165- context ,
166- filter (root_value , context )
167- ])
160+ return Promise .all ([context , filter (root_value , context )])
168161
169162 def context_do_execute_handler (result ):
170163 context , do_execute = result
171164 if not do_execute :
172165 return
173166 else :
174- return execute (
175- self .schema ,
176- parsed_query ,
177- root_value ,
178- context ,
179- variables ,
180- operation_name
181- )
182-
183- return Promise .resolve (
184- True
185- ).then (
186- context_promise_handler
187- ).then (
188- filter_func_promise_handler
189- ).then (
190- context_do_execute_handler
191- ).then (
192- lambda result : callback (None , result )
193- ).catch (
194- lambda error : callback (error , None )
195- )
167+ return execute (self .schema , parsed_query , root_value ,
168+ context , variables , operation_name )
169+
170+ return Promise .resolve (True ).then (
171+ context_promise_handler ).then (
172+ filter_func_promise_handler ).then (
173+ context_do_execute_handler ).then (
174+ lambda result : callback (None , result )).catch (
175+ lambda error : callback (error , None ))
196176
197177 subscription_promises .append (
198- self .pubsub .subscribe (
199- trigger_name ,
200- on_message ,
201- channel_options
202- ).then (
203- lambda id : self .subscriptions [
204- external_subscription_id ].append (id )
205- )
206- )
178+ self .pubsub .
179+ subscribe (trigger_name , on_message , channel_options ).then (
180+ lambda id : self .subscriptions [external_subscription_id ].append (id )
181+ ))
207182
208183 return Promise .all (subscription_promises ).then (
209- lambda result : external_subscription_id
210- )
184+ lambda result : external_subscription_id )
211185
212186 def unsubscribe (self , sub_id ):
213187 for internal_id in self .subscriptions .get (sub_id ):
0 commit comments