Skip to content

Commit 87cec35

Browse files
committed
Add requirements file and initial implementation modules
1 parent 92fdd55 commit 87cec35

3 files changed

Lines changed: 407 additions & 0 deletions

File tree

requirements.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
appdirs==1.4.0
2+
gevent==1.2.1
3+
gevent-websocket==0.9.5
4+
greenlet==0.4.12
5+
packaging==16.8
6+
Promises==0.6.27
7+
pyparsing==2.1.10
8+
redis==2.10.5
9+
six==1.10.0

subscription_manager.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
import redis
2+
import gevent
3+
from types import FunctionType
4+
from promise import Promise
5+
from graphql import parse, validate, specified_rules, value_from_ast, execute
6+
from graphql.language.ast import OperationDefinition
7+
8+
9+
class RedisPubsub(object):
10+
11+
def __init__(self, host='localhost', port=6379, *args, **kwargs):
12+
self.redis = redis.StrictRedis(host, port, *args, **kwargs)
13+
self.pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
14+
self.subscriptions = {}
15+
self.sub_id_counter = 0
16+
self.greenlet = None
17+
18+
def publish(self, trigger_name, message):
19+
self.redis.publish(trigger_name, message)
20+
return True
21+
22+
def subscribe(self, trigger_name, on_message_handler, options):
23+
trigger = {}
24+
trigger[trigger_name] = on_message_handler
25+
self.pubsub.subscribe(**trigger)
26+
if not self.greenlet:
27+
self.greenlet = gevent.spawn(self.wait_and_get_message)
28+
self.sub_id_counter += 1
29+
self.subscriptions[self.sub_id_counter] = trigger_name
30+
return Promise.resolve(self.sub_id_counter)
31+
32+
def unsubscribe(self, sub_id):
33+
trigger_name = self.subscriptions[sub_id]
34+
del self.subscriptions[sub_id]
35+
self.pubsub.unsubscribe(trigger_name)
36+
if not self.subscriptions:
37+
self.greenlet = self.greenlet.kill()
38+
39+
def wait_and_get_message(self):
40+
while True:
41+
self.pubsub.get_message()
42+
gevent.sleep(.001)
43+
44+
45+
class ValidationError(Exception):
46+
47+
def __init__(self, errors):
48+
self.errors = errors
49+
self.message = 'Subscription query has validation errors'
50+
51+
52+
class SubscriptionManager(object):
53+
54+
def __init__(self, schema, pubsub, setup_funcs={}):
55+
self.schema = schema
56+
self.pubsub = pubsub
57+
self.setup_funcs = setup_funcs
58+
self.subscriptions = {}
59+
self.max_subscription_id = 0
60+
61+
def publish(self, trigger_name, payload):
62+
self.pubsub.publish(trigger_name, payload)
63+
64+
def subscribe(self, query, operation_name, callback, variables, context,
65+
format_error, format_response):
66+
parsed_query = parse(query)
67+
errors = validate(
68+
self.schema,
69+
parsed_query,
70+
# TODO: Need to create/add subscriptionHasSingleRootField
71+
# rule
72+
rules=specified_rules
73+
)
74+
75+
if errors:
76+
return Promise.reject(ValidationError(errors))
77+
78+
args = {}
79+
80+
subscription_name = ''
81+
82+
for definition in parsed_query.definitions:
83+
84+
if isinstance(definition, OperationDefinition):
85+
root_field = definition.selection_set.selections[0]
86+
subscription_name = root_field.name.value
87+
88+
fields = self.schema.get_subscription_type().get_fields()
89+
90+
for arg in root_field.arguments:
91+
92+
arg_definition = [arg_def for arg_def in
93+
fields[subscription_name].args if
94+
arg_def == arg.name.value][0]
95+
96+
args[arg_definition.name] = value_from_ast(
97+
arg.value,
98+
arg_definition.type,
99+
variables=variables
100+
)
101+
102+
if self.setup_funcs[subscription_name]:
103+
trigger_map = self.setup_funcs[subscription_name](
104+
query,
105+
operation_name,
106+
callback,
107+
variables,
108+
context,
109+
format_error,
110+
format_response,
111+
args,
112+
subscription_name
113+
)
114+
else:
115+
trigger_map = {}
116+
trigger_map[subscription_name] = {}
117+
118+
external_subscription_id = self.max_subscription_id
119+
self.max_subscription_id += 1
120+
self.subscriptions[external_subscription_id] = []
121+
subscription_promises = []
122+
123+
for trigger_name in trigger_map.keys():
124+
channel_options = trigger_map[trigger_name].get(
125+
'channel_options',
126+
{}
127+
)
128+
filter_func = trigger_map[trigger_name].get(
129+
'filter_func',
130+
True
131+
)
132+
133+
def on_message(root_value):
134+
if isinstance(context, FunctionType):
135+
context_promise = Promise(
136+
lambda resolve: resolve(context())
137+
)
138+
else:
139+
context_promise = Promise.resolve(context)
140+
141+
def context_promise_then(context):
142+
if not filter_func(root_value, context):
143+
return
144+
145+
execute(
146+
self.schema,
147+
parsed_query,
148+
root_value,
149+
context,
150+
variables,
151+
operation_name
152+
)
153+
154+
context_promise.then(
155+
context_promise_then
156+
).then(
157+
lambda data: callback(None, data)
158+
).catch(
159+
lambda error: callback(error)
160+
)
161+
162+
subs_promise = Promise(
163+
lambda resolve: resolve(self.pubsub.subscribe(
164+
trigger_name,
165+
on_message,
166+
channel_options
167+
))
168+
)
169+
170+
subs_promise.then(
171+
lambda id: self.subscriptions[
172+
external_subscription_id].append(id)
173+
)
174+
175+
subscription_promises.append(subs_promise)
176+
177+
return Promise.all(subscription_promises).then(
178+
lambda result: external_subscription_id
179+
)
180+
181+
def unsubscribe(self, sub_id):
182+
for internal_id in self.subscriptions.get(sub_id):
183+
self.pubsub.unsubscribe(internal_id)
184+
self.subscriptions.pop(sub_id, None)

0 commit comments

Comments
 (0)