@@ -12,7 +12,7 @@ class RedisPubsub(object):
1212 def __init__ (self , host = 'localhost' , port = 6379 , * args , ** kwargs ):
1313 redis .connection .socket = gevent .socket
1414 self .redis = redis .StrictRedis (host , port , * args , ** kwargs )
15- self .pubsub = self .redis .pubsub (ignore_subscribe_messages = True )
15+ self .pubsub = self .redis .pubsub ()
1616 self .subscriptions = {}
1717 self .sub_id_counter = 0
1818 self .greenlet = None
@@ -22,30 +22,45 @@ def publish(self, trigger_name, message):
2222 return True
2323
2424 def subscribe (self , trigger_name , on_message_handler , options ):
25- self .pubsub .subscribe (trigger_name )
25+ try :
26+ if trigger_name not in self .subscriptions .values ()[0 ]:
27+ self .pubsub .subscribe (trigger_name )
28+ except IndexError :
29+ self .pubsub .subscribe (trigger_name )
30+ self .subscriptions [self .sub_id_counter ] = [
31+ trigger_name ,
32+ on_message_handler
33+ ]
34+ self .sub_id_counter += 1
2635 if not self .greenlet :
2736 self .greenlet = gevent .spawn (
28- self .wait_and_get_message ,
29- on_message_handler
37+ self .wait_and_get_message
3038 )
31- self .sub_id_counter += 1
32- self .subscriptions [self .sub_id_counter ] = trigger_name
3339 return Promise .resolve (self .sub_id_counter )
3440
3541 def unsubscribe (self , sub_id ):
36- trigger_name = self .subscriptions [sub_id ]
42+ trigger_name , on_message_handler = self .subscriptions [sub_id ]
3743 del self .subscriptions [sub_id ]
38- self .pubsub .unsubscribe (trigger_name )
44+ try :
45+ if trigger_name not in self .subscriptions .values ()[0 ]:
46+ self .pubsub .subscribe (trigger_name )
47+ except IndexError :
48+ self .pubsub .subscribe (trigger_name )
3949 if not self .subscriptions :
4050 self .greenlet = self .greenlet .kill ()
4151
42- def wait_and_get_message (self , on_message_handler ):
52+ def wait_and_get_message (self ):
4353 while True :
44- message = self .pubsub .get_message ()
54+ message = self .pubsub .get_message (ignore_subscribe_messages = True )
4555 if message :
46- on_message_handler ( json . loads (message [ 'data' ]) )
56+ self . handle_message (message )
4757 gevent .sleep (.001 ) # may not need this sleep call - test
4858
59+ def handle_message (self , message ):
60+ for sub_id , trigger_map in self .subscriptions .iteritems ():
61+ if trigger_map [0 ] == message ['channel' ]:
62+ trigger_map [1 ](json .loads (message ['data' ]))
63+
4964
5065class ValidationError (Exception ):
5166
@@ -155,14 +170,15 @@ def context_do_execute_handler(result):
155170 context , do_execute = result
156171 if not do_execute :
157172 return
158- execute (
159- self .schema ,
160- parsed_query ,
161- root_value ,
162- context ,
163- variables ,
164- operation_name
165- )
173+ else :
174+ return execute (
175+ self .schema ,
176+ parsed_query ,
177+ root_value ,
178+ context ,
179+ variables ,
180+ operation_name
181+ )
166182
167183 return Promise .resolve (
168184 True
@@ -173,9 +189,9 @@ def context_do_execute_handler(result):
173189 ).then (
174190 context_do_execute_handler
175191 ).then (
176- lambda data : callback (None , data )
192+ lambda result : callback (None , result )
177193 ).catch (
178- lambda error : callback (error )
194+ lambda error : callback (error , None )
179195 )
180196
181197 subscription_promises .append (
0 commit comments