Skip to content

Commit bbe82f7

Browse files
committed
Cleaned up subscription_transport_ws code
1 parent e50e50c commit bbe82f7

1 file changed

Lines changed: 133 additions & 125 deletions

File tree

subscriptions/subscription_transport_ws.py

Lines changed: 133 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -44,145 +44,150 @@ class nonlocal:
4444
on_init_resolve = None
4545
on_init_reject = None
4646

47-
def _init_promise_handler(resolve, reject):
47+
def init_promise_handler(resolve, reject):
4848
nonlocal.on_init_resolve = resolve
4949
nonlocal.on_init_reject = reject
5050

51-
self.connection_context['init_promise'] = Promise(_init_promise_handler)
52-
53-
try:
54-
parsed_message = json.loads(message)
55-
except Exception as e:
56-
self.send_subscription_fail(
57-
None,
58-
{'errors': [{'message': str(e)}]}
59-
)
60-
sub_id = parsed_message.get('id')
61-
62-
if parsed_message.get('type') == INIT:
63-
64-
on_connect_promise = Promise.resolve(True)
65-
nonlocal.on_init_resolve(on_connect_promise)
66-
67-
def _init_success_handler(result):
68-
if not result:
69-
raise TypeError('Prohibited connection!')
70-
return {'type': INIT_SUCCESS}
71-
72-
self.connection_context['init_promise'].then(
73-
_init_success_handler
74-
).catch(
75-
lambda error: {
76-
'type': INIT_FAIL,
77-
'error': str(error)}
78-
).then(
79-
lambda result: self.send_init_result(result)
80-
)
81-
82-
elif parsed_message.get('type') == SUBSCRIPTION_START:
83-
84-
def _subscription_start_handler(init_result):
85-
base_params = {
86-
'query': parsed_message.get('query'),
87-
'variables': parsed_message.get('variables'),
88-
'operation_name': parsed_message.get('operation_name'),
89-
'context': init_result if isinstance(
90-
init_result, dict) else {},
91-
'format_response': None,
92-
'format_error': None,
93-
'callback': None
94-
}
95-
promised_params = Promise.resolve(base_params)
96-
97-
if self.connection_subscriptions[sub_id]:
98-
self.unsubscribe(self.connection_subscriptions[sub_id])
99-
del self.connection_subscriptions[sub_id]
100-
101-
def _promised_params_handler(params):
102-
if not isinstance(params, dict):
103-
error = 'Invalid params returned from\
104-
OnSubscribe! Return value must\
105-
be an dict'
106-
self.send_subscription_fail(sub_id, {
107-
'errors': [{'message': error}]
108-
})
109-
raise TypeError(error)
110-
111-
def _params_callback(error, result):
112-
if not error:
113-
self.send_subscription_data(sub_id, result)
114-
elif error.errors:
115-
self.send_subscription_data(sub_id, {
116-
'errors': error.errors
51+
self.connection_context['init_promise'] = Promise(init_promise_handler)
52+
53+
def on_message_return_handler(msg):
54+
try:
55+
parsed_message = json.loads(message)
56+
except Exception as e:
57+
self.send_subscription_fail(
58+
None,
59+
{'errors': [{'message': str(e)}]}
60+
)
61+
62+
sub_id = parsed_message.get('id')
63+
64+
if parsed_message.get('type') == INIT:
65+
66+
on_connect_promise = Promise.resolve(True)
67+
68+
nonlocal.on_init_resolve(on_connect_promise)
69+
70+
def init_success_promise_handler(result):
71+
if not result:
72+
raise TypeError('Prohibited connection!')
73+
return {'type': INIT_SUCCESS}
74+
75+
self.connection_context['init_promise'].then(
76+
init_success_promise_handler
77+
).catch(
78+
lambda error: {
79+
'type': INIT_FAIL,
80+
'error': str(error)}
81+
).then(
82+
lambda result: self.send_init_result(result)
83+
)
84+
85+
elif parsed_message.get('type') == SUBSCRIPTION_START:
86+
87+
def subscription_start_promise_handler(init_result):
88+
base_params = {
89+
'query': parsed_message.get('query'),
90+
'variables': parsed_message.get('variables'),
91+
'operation_name': parsed_message.get('operation_name'),
92+
'context': init_result if isinstance(
93+
init_result, dict) else {},
94+
'format_response': None,
95+
'format_error': None,
96+
'callback': None
97+
}
98+
promised_params = Promise.resolve(base_params)
99+
100+
if self.connection_subscriptions[sub_id]:
101+
self.unsubscribe(self.connection_subscriptions[sub_id])
102+
del self.connection_subscriptions[sub_id]
103+
104+
def promised_params_handler(params):
105+
if not isinstance(params, dict):
106+
error = 'Invalid params returned from\
107+
OnSubscribe! Return value must\
108+
be an dict'
109+
self.send_subscription_fail(sub_id, {
110+
'errors': [{'message': error}]
111+
})
112+
raise TypeError(error)
113+
114+
def params_callback(error, result):
115+
if not error:
116+
self.send_subscription_data(sub_id, result)
117+
elif error.errors:
118+
self.send_subscription_data(sub_id, {
119+
'errors': error.errors
120+
})
121+
elif error.message:
122+
self.send_subscription_data(sub_id, {
123+
'errors': [{'message': error.message}]
124+
})
125+
elif error.get('message'):
126+
self.send_subscription_data(sub_id, {
127+
'errors': [{'message': error.get('message')}]
128+
})
129+
else:
130+
self.send_subscription_data(sub_id, {
131+
'errors': [{'message': str(error)}]
132+
})
133+
134+
params['callback'] = params_callback
135+
136+
return self.subscription_manager.subscribe(**params)
137+
138+
def graphql_sub_id_promise_handler(graphql_sub_id):
139+
self.connection_subscriptions[sub_id] = graphql_sub_id
140+
self.send_subscription_success(sub_id)
141+
142+
def error_catch_handler(e):
143+
if e.errors:
144+
self.send_subscription_fail(sub_id, {
145+
'errors': e.errors
117146
})
118-
elif error.message:
119-
self.send_subscription_data(sub_id, {
120-
'errors': [{'message': error.message}]
147+
elif e.message:
148+
self.send_subscription_fail(sub_id, {
149+
'errors': [{'message': e.message}]
121150
})
122-
elif error.get('message'):
123-
self.send_subscription_data(sub_id, {
124-
'errors': [{'message': error.get('message')}]
151+
elif e.get('message'):
152+
self.send_subscription_fail(sub_id, {
153+
'errors': [{'message': e.get('message')}]
125154
})
126155
else:
127-
self.send_subscription_data(sub_id, {
128-
'errors': [{'message': str(error)}]
156+
self.send_subscription_fail(sub_id, {
157+
'errors': [{'message': str(e)}]
129158
})
130159

131-
params['callback'] = _params_callback
132-
133-
return self.subscription_manager.subscribe(**params)
134-
135-
def _graphql_sub_id_handler(graphql_sub_id):
136-
self.connection_subscriptions[sub_id] = graphql_sub_id
137-
self.send_subscription_success(sub_id)
138-
139-
def _error_catch_handler(e):
140-
if e.errors:
141-
self.send_subscription_fail(sub_id, {
142-
'errors': e.errors
143-
})
144-
elif e.message:
145-
self.send_subscription_fail(sub_id, {
146-
'errors': [{'message': e.message}]
147-
})
148-
elif e.get('message'):
149-
self.send_subscription_fail(sub_id, {
150-
'errors': [{'message': e.get('message')}]
151-
})
152-
else:
153-
self.send_subscription_fail(sub_id, {
154-
'errors': [{'message': str(e)}]
155-
})
156-
157-
promised_params.then(
158-
_promised_params_handler
159-
).then(
160-
_graphql_sub_id_handler
161-
).catch(
162-
_error_catch_handler
163-
)
160+
promised_params.then(
161+
promised_params_handler
162+
).then(
163+
graphql_sub_id_promise_handler
164+
).catch(
165+
error_catch_handler
166+
)
167+
168+
self.connection_context['init_promise'].then(
169+
subscription_start_promise_handler)
164170

165-
self.connection_context['init_promise'].then(
166-
_init_promise_handler)
171+
elif parsed_message.get('type') == SUBSCRIPTION_END:
167172

168-
elif parsed_message.get('type') == SUBSCRIPTION_END:
173+
def subscription_end_promise_handler(result):
174+
if self.connection_subscriptions[sub_id]:
175+
self.unsubscribe(self.connection_subscriptions[sub_id])
176+
del self.connection_subscriptions[sub_id]
169177

170-
def _subscription_end_handler(result):
171-
if isinstance(self.connection_subscriptions[sub_id], None):
172-
self.unsubscribe(self.connection_subscriptions[sub_id])
173-
del self.connection_subscriptions[sub_id]
178+
self.connection_context['init_promise'].then(
179+
subscription_end_promise_handler
180+
)
174181

175-
self.connection_context['init_promise'].then(
176-
_subscription_end_handler
177-
)
182+
else:
178183

179-
else:
184+
self.send_subscription_fail(sub_id, {
185+
'errors': [{
186+
'message': 'Invalid message type!'
187+
}]
188+
})
180189

181-
self.send_subscription_fail(sub_id, {
182-
'errors': [{
183-
'message': 'Invalid message type!'
184-
}]
185-
})
190+
return on_message_return_handler()
186191

187192
def send_subscription_data(self, sub_id, payload):
188193
message = {
@@ -201,7 +206,10 @@ def send_subscription_fail(self, sub_id, payload):
201206
self.ws.send(json.dumps(message))
202207

203208
def send_subscription_success(self, sub_id):
204-
message = {'type': SUBSCRIPTION_SUCCESS, 'id': sub_id}
209+
message = {
210+
'type': SUBSCRIPTION_SUCCESS,
211+
'id': sub_id
212+
}
205213
self.ws.send(json.dumps(message))
206214

207215
def send_init_result(self, result):

0 commit comments

Comments
 (0)