|
1 | 1 | # graphql-python-subscriptions |
| 2 | +#### (Work in Progress!) |
2 | 3 | A port of apollographql subscriptions for python, using gevent websockets and redis |
3 | 4 |
|
4 | | -This is a implementation of apollographql subscriptions-transport-ws and graphql-subscriptions in Python. |
5 | | -It currently implements a pubsub using redis.py and uses gevent for concurrency. It also makes heavy use of |
6 | | -syrusakbary/promise python implementation to mirror the logic in the apollo-graphql libraries. |
| 5 | +This is a implementation of apollographql [subscriptions-transport-ws](https://github.com/apollographql/subscriptions-transport-ws) and [graphql-subscriptions](https://github.com/apollographql/graphql-subscriptions) in Python. It currently implements a pubsub using [redis-py](https://github.com/andymccurdy/redis-py) and uses [gevent-websockets](https://bitbucket.org/noppo/gevent-websocket) for concurrency. It also makes heavy use of |
| 6 | +[syrusakbary/promise](https://github.com/syrusakbary/promise) python implementation to mirror the logic in the apollo-graphql libraries. |
7 | 7 |
|
8 | | -Meant to be used in conjunction with graphql-python / graphene server and apollo-graphql client. |
| 8 | +Meant to be used in conjunction with [graphql-python](https://github.com/graphql-python) / [graphene](http://graphene-python.org/) server and [apollo-client](http://dev.apollodata.com/) for graphql. |
9 | 9 |
|
10 | 10 | Very initial implementation. Currently only works with Python 2. No tests yet. |
11 | 11 |
|
12 | 12 | ## Installation |
13 | 13 | ``` |
14 | 14 | $ pip install graphql-subscriptions |
15 | 15 | ``` |
| 16 | + |
| 17 | +## API |
| 18 | +### RedisPubsub(self, host='localhost', port=6379, *args, **kwargs) |
| 19 | +#### Arguments |
| 20 | +- `host`: Redis server instance |
| 21 | +- `port`: Redis server port |
| 22 | +- `args, kwargs`: additional position and keyword args will be passed |
| 23 | + to Redis-py constructor |
| 24 | + |
| 25 | +#### Methods |
| 26 | +- `publish(self, trigger_name, message)`: Trigger name is the subscription |
| 27 | + or pubsub channel; message is the mutation object or message that will end |
| 28 | + up being passed to the subscription root_value; this method called inside of |
| 29 | + mutation resolve function |
| 30 | +- `subscribe(self, trigger_name, on_message_handler, options)`: Trigger name |
| 31 | + is the subscription or pubsub channel; on_message_handler is the callback |
| 32 | + that will be triggered on each mutation; this method called by subscription |
| 33 | + manager |
| 34 | +- `unsubscribe(self, sub_id)`: Sub_id is the subscription ID that is being |
| 35 | + tracked by the pubsub instance -- returned from the `subscribe` method |
| 36 | + and called by the subscription manager |
| 37 | +- `wait_and_get_message(self)`: Called by subscribe method during first |
| 38 | + subscription; run in a separate greenlet and calls Redis `get_message()` |
| 39 | + method to constantly poll for new messages on pubsub channels |
| 40 | +- `handle_message(self, message)`: Called by pubsub when a message is |
| 41 | + received on a channel; check all existing pubsub subscriptons and calls |
| 42 | + `on_message_handler()` for all matches |
| 43 | + |
| 44 | +### SubscriptionManager(self, schema, pubsub, setup_funcs={}) |
| 45 | +#### Arguments |
| 46 | +- `schema`: graphql schema instance |
| 47 | +- `pubsub`: any pubsub instance with publish, subscribe, and unsubscribe |
| 48 | + methods (in this case an instance of the RedisPubsub class) |
| 49 | +- `setup_funcs`: dictionary of setup functions that map from subscription |
| 50 | + name to a map of pubsub channel names and their filter functions; |
| 51 | + kwargs parameters are: `query, operation_name, callback, variables, |
| 52 | + context, format_error, format_response, args, subscription_name` |
| 53 | + |
| 54 | + |
| 55 | + example: |
| 56 | + ``` |
| 57 | + def new_user(**kwargs): |
| 58 | + args = kwargs.get('args') |
| 59 | + return { |
| 60 | + 'new_user_channel': { |
| 61 | + 'filter': lambda user, context: user.active == args.active |
| 62 | + } |
| 63 | + } |
| 64 | +
|
| 65 | + setup_funcs = {'new_user': new_user} |
| 66 | + ``` |
| 67 | + |
| 68 | +#### Methods |
| 69 | +- `publish(self, trigger_name, payload)`: Trigger name is the subscription |
| 70 | + or pubsub channel; payload is the mutation object or message that will |
| 71 | + end up being passed to the subscription root_value; method called inside of mutation resolve function |
| 72 | +- `subscribe(self, query, operation_name, callback, variables, context, |
| 73 | + format_error, format_response)`: Called by ApolloSubscriptionServer upon |
| 74 | + receiving a new subscription from a websocket. Arguments parsed by |
| 75 | + ApolloSubscriptionServer from graphql subscription query |
| 76 | +- `unsubscribe(self, sub_id)`: Sub_id is the subscription ID that is being |
| 77 | + tracked by the subscription manager instance -- returned from the |
| 78 | + `subscribe()` method and called by the ApolloSubscriptionServer |
| 79 | + |
| 80 | +### ApolloSubscriptionServer(self, subscription_manager, websocket, keep_alive=None, on_subscribe=None, on_unsubscribe=None, on_connect=None, on_disconnect=None) |
| 81 | +#### Arguments |
| 82 | +- `subscription_manager`: TODO |
| 83 | +- `websocket`: TODO |
| 84 | +- `keep_alive`: TODO |
| 85 | +- `on_subscribe, on_unsubscribe, on_connect, on_disconnet`: TODO |
| 86 | + |
| 87 | +#### Methods |
| 88 | +- TODO |
| 89 | + |
| 90 | +## Example Usage |
| 91 | +#### Server (using Flask and Flask-Sockets): |
| 92 | + |
| 93 | +``` |
| 94 | +from flask import Flask |
| 95 | +from flask_sqlalchemy import SQLAlchemy |
| 96 | +from flask_sockets import Sockets |
| 97 | +from graphql_subscriptions import SubscriptionManager, RedisPubsub |
| 98 | +from graphql_subscriptions import ApolloSubscriptionServer |
| 99 | +
|
| 100 | +app = Flask(__name__) |
| 101 | +
|
| 102 | +# using Flask Sockets here, but could use gevent-websocket directly |
| 103 | +# to create a websocket app |
| 104 | +sockets = Sockets(app) |
| 105 | +
|
| 106 | +# instantiate pubsub |
| 107 | +pubsub = RedisPubsub() |
| 108 | +
|
| 109 | +# create schema using graphene or another python graphql library |
| 110 | +# not showing models or schema design here for brevity |
| 111 | +schema = graphene.Schema( |
| 112 | + query=Query, |
| 113 | + mutation=Mutation, |
| 114 | + subscription=Subscription |
| 115 | +) |
| 116 | +
|
| 117 | +# instantiate subscription manager object--passing in schema and pubsub |
| 118 | +subscription_mgr = SubscriptionManager(schema, pubsub) |
| 119 | +
|
| 120 | +# using Flask Sockets here, on each new connection instantiate a |
| 121 | +# subscription app / server--passing in subscription manger and websocket |
| 122 | +@sockets.route('/socket') |
| 123 | +def socket_channel(websocket): |
| 124 | + subscription_server = ApolloSubscriptionServer(subscription_mgr, websocket) |
| 125 | + subscription_server.handle() |
| 126 | + return [] |
| 127 | +
|
| 128 | +if __name__ == "__main__": |
| 129 | +
|
| 130 | + # using gevent webserver here so multiple connections can be |
| 131 | + # maintained concurrently -- gevent websocket spawns a new |
| 132 | + # greenlet for each request and forwards to flask app or socket app |
| 133 | + # depending on request type |
| 134 | + from geventwebsocket import WebSocketServer |
| 135 | +
|
| 136 | + server = WebSocketServer(('', 5000), app) |
| 137 | + print ' Serving at host 0.0.0.0:5000...\n' |
| 138 | + server.serve_forever() |
| 139 | +``` |
| 140 | + |
| 141 | +Of course on the server you have to "publish" each time you have a mutation (in this case to a redis channel). That would look something like this (using graphene / sql-alchemy): |
| 142 | + |
| 143 | + |
| 144 | +``` |
| 145 | +class AddUser(graphene.ClientIDMutation): |
| 146 | +
|
| 147 | + class Input: |
| 148 | + username = graphene.String(required=True) |
| 149 | + email = graphene.String() |
| 150 | +
|
| 151 | + ok = graphene.Boolean() |
| 152 | + user = graphene.Field(lambda: User) |
| 153 | +
|
| 154 | + @classmethod |
| 155 | + def mutate_and_get_payload(cls, args, context, info): |
| 156 | + _input = args.copy() |
| 157 | + del _input['clientMutationId'] |
| 158 | + new_user = UserModel(**_input) |
| 159 | + db.session.add(new_user) |
| 160 | + db.session.commit() |
| 161 | + ok = True |
| 162 | + # publish result of mutation to pubsub |
| 163 | + if pubsub.subscriptions: |
| 164 | + pubsub.publish('users', new_user.as_dict()) |
| 165 | + return AddUser(ok=ok, user=new_user) |
| 166 | +
|
| 167 | +class Subscription(graphene.ObjectType): |
| 168 | + users = graphene_sqlalchemy.SQLAlchemyConnectionField( |
| 169 | + User, |
| 170 | + active=graphene.Boolean() |
| 171 | + ) |
| 172 | +
|
| 173 | + # mutation oject that was published will be passed as |
| 174 | + # root_value of subscription |
| 175 | + def resolve_users(self, args, context, info): |
| 176 | + query = User.get_query(context) |
| 177 | + return query.filter_by(id=info.root_value.get('id')) |
| 178 | +``` |
| 179 | + |
| 180 | +#### Client (using Apollo Client library): |
| 181 | +First create create network interface and and client instances and |
| 182 | +then wrap them in a subscription client instance |
| 183 | +``` |
| 184 | +import ReactDOM from 'react-dom' |
| 185 | +import { ApolloProvider } from 'react-apollo' |
| 186 | +import ApolloClient, { createNetworkInterface } from 'apollo-client' |
| 187 | +import { SubscriptionClient, addGraphQLSubscriptions } from 'subscriptions-transport-ws' |
| 188 | +
|
| 189 | +import ChatApp from './screens/ChatApp' |
| 190 | +
|
| 191 | +const networkInterface = createNetworkInterface({ |
| 192 | + uri: 'http://localhost:5000/graphql' |
| 193 | +}) |
| 194 | +
|
| 195 | +const wsClient = new SubscriptionClient(`ws://localhost:5000/socket`, { |
| 196 | + reconnect: true |
| 197 | +}) |
| 198 | +
|
| 199 | +const networkInterfaceWithSubscriptions = addGraphQLSubscriptions( |
| 200 | + networkInterface, |
| 201 | + wsClient, |
| 202 | +) |
| 203 | +
|
| 204 | +const client = new ApolloClient({ |
| 205 | + dataIdFromObject: o => o.id, |
| 206 | + networkInterface: networkInterfaceWithSubscriptions |
| 207 | +}) |
| 208 | +
|
| 209 | +ReactDOM.render( |
| 210 | + <ApolloProvider client={client}> |
| 211 | + <ChatApp /> |
| 212 | + </ApolloProvider>, |
| 213 | + document.getElementById('root') |
| 214 | +) |
| 215 | +``` |
| 216 | +Build a simple component and then call subscribeToMore method on the |
| 217 | +returned data object from the inital graphql query |
| 218 | +``` |
| 219 | +
|
| 220 | +import React from 'react' |
| 221 | +import { graphql } from 'react-apollo' |
| 222 | +import gql from 'graphql-tag' |
| 223 | +import ListBox from '../components/ListBox' |
| 224 | +
|
| 225 | +const SUBSCRIPTION_QUERY = gql` |
| 226 | + subscription newUsers { |
| 227 | + users(active: true) { |
| 228 | + edges { |
| 229 | + node { |
| 230 | + id |
| 231 | + username |
| 232 | + } |
| 233 | + } |
| 234 | + } |
| 235 | + } |
| 236 | +` |
| 237 | +
|
| 238 | +const LIST_BOX_QUERY = gql` |
| 239 | + query AllUsers { |
| 240 | + users(active: true) { |
| 241 | + edges { |
| 242 | + node { |
| 243 | + id |
| 244 | + username |
| 245 | + } |
| 246 | + } |
| 247 | + } |
| 248 | + } |
| 249 | +` |
| 250 | +
|
| 251 | +class ChatListBox extends React.Component { |
| 252 | +
|
| 253 | + componentWillReceiveProps(newProps) { |
| 254 | + if (!newProps.data.loading) { |
| 255 | + if (this.subscription) { |
| 256 | + return |
| 257 | + } |
| 258 | + this.subscription = newProps.data.subscribeToMore({ |
| 259 | + document: SUBSCRIPTION_QUERY, |
| 260 | + updateQuery: (previousResult, {subscriptionData}) => { |
| 261 | + const newUser = subscriptionData.data.users.edges |
| 262 | + const newResult = { |
| 263 | + users: { |
| 264 | + edges: [ |
| 265 | + ...previousResult.users.edges, |
| 266 | + ...newUser |
| 267 | + ] |
| 268 | + } |
| 269 | + } |
| 270 | + return newResult |
| 271 | + }, |
| 272 | + onError: (err) => console.error(err) |
| 273 | + }) |
| 274 | + } |
| 275 | + } |
| 276 | +
|
| 277 | + render() { |
| 278 | + return <ListBox data={this.props.data} /> |
| 279 | + } |
| 280 | +} |
| 281 | +
|
| 282 | +const ChatListBoxWithData = graphql(LIST_BOX_QUERY)(ChatListBox) |
| 283 | +
|
| 284 | +export default ChatListBoxWithData |
| 285 | +
|
| 286 | +``` |
0 commit comments