Skip to content

Commit 7d94540

Browse files
dee077pandafy
authored andcommitted
[feature] Added WebSocket endpoint which broadcasts all location updates #1157
This websocket endpoint is needed for the implementation of dynamic mobile maps. Closes #1157 --------- Co-authored-by: Gagan Deep <pandafy.dev@gmail.com>
1 parent ef85ba3 commit 7d94540

4 files changed

Lines changed: 184 additions & 58 deletions

File tree

openwisp_controller/geo/apps.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
1+
import json
2+
3+
import channels.layers
14
import swapper
5+
from asgiref.sync import async_to_sync
26
from django.conf import settings
37
from django.db import transaction
48
from django.db.models import Case, Count, Sum, When
9+
from django.db.models.signals import post_save
510
from django.utils.translation import gettext_lazy as _
611
from django_loci.apps import LociConfig
712
from swapper import get_model_name

openwisp_controller/geo/channels/consumers.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import swapper
2-
from django_loci.channels.base import BaseLocationBroadcast
2+
from asgiref.sync import async_to_sync
3+
from django_loci.channels.base import BaseCommonLocationBroadcast, BaseLocationBroadcast
34

45
Location = swapper.load_model("geo", "Location")
56

@@ -17,3 +18,25 @@ def is_authorized(self, user, location):
1718
):
1819
return False
1920
return result
21+
22+
23+
class CommonLocationBroadcast(BaseCommonLocationBroadcast):
24+
model = Location
25+
26+
def join_groups(self, user):
27+
"""
28+
Subscribe user to all organizations they manage or bypass if superuser.
29+
"""
30+
if user.is_superuser:
31+
super().join_groups(user)
32+
self.group_name = [self.group_name]
33+
return
34+
self.group_name = []
35+
for org in user.organizations_managed:
36+
group = f"loci.mobile-location.organization.{org}"
37+
self.group_name.append(group)
38+
async_to_sync(self.channel_layer.group_add)(group, self.channel_name)
39+
40+
def disconnect(self, close_code):
41+
for group in getattr(self, "group_name", []):
42+
async_to_sync(self.channel_layer.group_discard)(group, self.channel_name)

openwisp_controller/geo/channels/routing.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,27 @@
22
from channels.routing import ProtocolTypeRouter, URLRouter
33
from channels.security.websocket import AllowedHostsOriginValidator
44
from django.urls import path
5-
from django_loci.channels.base import location_broadcast_path
5+
from django_loci.channels.base import (
6+
common_location_broadcast_path,
7+
location_broadcast_path,
8+
)
69
from openwisp_notifications.websockets.routing import (
710
get_routes as get_notification_routes,
811
)
912

10-
from .consumers import LocationBroadcast
13+
from .consumers import CommonLocationBroadcast, LocationBroadcast
1114

1215

1316
def get_routes():
1417
return [
1518
path(
1619
location_broadcast_path, LocationBroadcast.as_asgi(), name="LocationChannel"
17-
)
20+
),
21+
path(
22+
common_location_broadcast_path,
23+
CommonLocationBroadcast.as_asgi(),
24+
name="CommonLocationChannel",
25+
),
1826
]
1927

2028

Lines changed: 144 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,44 @@
1-
import importlib
1+
import asyncio
22
import os
3+
from contextlib import suppress
34
from unittest import skipIf
45

56
import pytest
67
from channels.db import database_sync_to_async
78
from channels.routing import ProtocolTypeRouter
8-
from channels.testing import WebsocketCommunicator
99
from django.conf import settings
10-
from django.contrib.auth import get_user_model, login
10+
from django.contrib.auth import get_permission_codename, get_user_model
1111
from django.contrib.auth.models import Permission
12-
from django.http.request import HttpRequest
1312
from django.utils.module_loading import import_string
13+
from django_loci.tests import TestChannelsMixin
1414
from swapper import load_model
1515

16+
from openwisp_controller.geo.channels.consumers import (
17+
CommonLocationBroadcast,
18+
LocationBroadcast,
19+
)
20+
from openwisp_users.tests.utils import TestOrganizationMixin
21+
1622
from .utils import TestGeoMixin
1723

1824
Device = load_model("config", "Device")
1925
Location = load_model("geo", "Location")
2026
DeviceLocation = load_model("geo", "DeviceLocation")
2127
User = get_user_model()
2228
OrganizationUser = load_model("openwisp_users", "OrganizationUser")
29+
Group = load_model("openwisp_users", "Group")
2330

2431

2532
@skipIf(os.environ.get("SAMPLE_APP", False), "Running tests on SAMPLE_APP")
26-
class TestChannels(TestGeoMixin):
33+
class TestChannels(TestGeoMixin, TestChannelsMixin, TestOrganizationMixin):
34+
location_consumer = LocationBroadcast
35+
common_location_consumer = CommonLocationBroadcast
2736
application = import_string(getattr(settings, "ASGI_APPLICATION"))
2837
object_model = Device
2938
location_model = Location
3039
object_location_model = DeviceLocation
3140
user_model = get_user_model()
3241

33-
def _force_login(self, user, backend=None):
34-
engine = importlib.import_module(settings.SESSION_ENGINE)
35-
request = HttpRequest()
36-
request.session = engine.SessionStore()
37-
login(request, user, backend)
38-
request.session.save()
39-
return request.session
40-
41-
async def _get_request_dict(self, pk=None, user=None):
42-
if not pk:
43-
location = await database_sync_to_async(self._create_location)(
44-
is_mobile=True
45-
)
46-
await database_sync_to_async(self._create_object_location)(
47-
location=location
48-
)
49-
pk = location.pk
50-
path = "/ws/loci/location/{0}/".format(pk)
51-
session = None
52-
if user:
53-
session = await database_sync_to_async(self._force_login)(user)
54-
return {"pk": pk, "path": path, "session": session}
55-
56-
def _get_communicator(self, request_vars, user=None):
57-
communicator = WebsocketCommunicator(self.application, request_vars["path"])
58-
if user:
59-
communicator.scope.update(
60-
{
61-
"user": user,
62-
"session": request_vars["session"],
63-
"url_route": {"kwargs": {"pk": request_vars["pk"]}},
64-
}
65-
)
66-
return communicator
67-
6842
@pytest.mark.asyncio
6943
@pytest.mark.django_db(transaction=True)
7044
async def test_consumer_staff_but_no_change_permission(self):
@@ -74,37 +48,153 @@ async def test_consumer_staff_but_no_change_permission(self):
7448
location = await database_sync_to_async(self._create_location)(is_mobile=True)
7549
await database_sync_to_async(self._create_object_location)(location=location)
7650
pk = location.pk
77-
request_vars = await self._get_request_dict(user=user, pk=pk)
78-
communicator = self._get_communicator(request_vars, user)
51+
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
52+
communicator = self._get_specific_location_communicator(request_vars, user)
7953
connected, _ = await communicator.connect()
8054
assert not connected
8155
await communicator.disconnect()
8256
# add permission to change location and repeat
83-
perm = await database_sync_to_async(
84-
(
85-
await database_sync_to_async(Permission.objects.filter)(
86-
name="Can change location"
87-
)
88-
).first
89-
)()
57+
perm = await Permission.objects.filter(
58+
codename=f"change_{self.location_model._meta.model_name}",
59+
content_type__app_label=self.location_model._meta.app_label,
60+
).afirst()
9061
await database_sync_to_async(user.user_permissions.add)(perm)
9162
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
92-
request_vars = await self._get_request_dict(user=user, pk=pk)
93-
communicator = self._get_communicator(request_vars, user)
63+
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
64+
communicator = self._get_specific_location_communicator(request_vars, user)
9465
connected, _ = await communicator.connect()
9566
assert not connected
9667
await communicator.disconnect()
9768
# add user to organization
9869
await database_sync_to_async(OrganizationUser.objects.create)(
99-
organization=location.organization, user=user, is_admin=True
70+
organization=location.organization,
71+
user=user,
72+
is_admin=True,
10073
)
10174
await database_sync_to_async(location.organization.save)()
10275
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
103-
request_vars = await self._get_request_dict(user=user, pk=pk)
104-
communicator = self._get_communicator(request_vars, user)
76+
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
77+
communicator = self._get_specific_location_communicator(request_vars, user)
10578
connected, _ = await communicator.connect()
10679
assert connected
10780
await communicator.disconnect()
10881

82+
@pytest.mark.asyncio
83+
@pytest.mark.django_db(transaction=True)
84+
async def test_common_location_consumer_staff_but_no_change_permission(self):
85+
user = await database_sync_to_async(self._create_user)(is_staff=True)
86+
location = await database_sync_to_async(self._create_location)(is_mobile=True)
87+
await database_sync_to_async(self._create_object_location)(location=location)
88+
pk = location.pk
89+
request_vars = await self._get_common_location_request_dict(pk=pk, user=user)
90+
communicator = self._get_common_location_communicator(request_vars, user)
91+
connected, _ = await communicator.connect()
92+
assert not connected
93+
await communicator.disconnect()
94+
# After granting change permission, the user can connect to the common
95+
# location endpoint, but must receive updates only for locations
96+
# belonging to their organization.
97+
perm = await Permission.objects.filter(
98+
codename=f"change_{self.location_model._meta.model_name}",
99+
content_type__app_label=self.location_model._meta.app_label,
100+
).afirst()
101+
await database_sync_to_async(user.user_permissions.add)(perm)
102+
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
103+
request_vars = await self._get_common_location_request_dict(pk=pk, user=user)
104+
communicator = self._get_common_location_communicator(request_vars, user)
105+
connected, _ = await communicator.connect()
106+
assert connected
107+
await communicator.disconnect()
108+
109+
@pytest.mark.asyncio
110+
@pytest.mark.django_db(transaction=True)
111+
async def test_common_location_org_isolation(self):
112+
administrator = await Group.objects.acreate(name="Administrator")
113+
perm = await Permission.objects.filter(
114+
codename=get_permission_codename("change", self.location_model._meta),
115+
).afirst()
116+
await administrator.permissions.aadd(perm)
117+
org1 = await database_sync_to_async(self._get_org)(org_name="test1")
118+
org2 = await database_sync_to_async(self._get_org)(org_name="test2")
119+
org1_location = await database_sync_to_async(self._create_location)(
120+
is_mobile=True, organization=org1
121+
)
122+
org2_location = await database_sync_to_async(self._create_location)(
123+
is_mobile=True, organization=org2
124+
)
125+
org1_user = await database_sync_to_async(self._create_administrator)(
126+
organizations=[org1],
127+
username="user1",
128+
password="password",
129+
email="user1@test.org",
130+
)
131+
org2_user = await database_sync_to_async(self._create_administrator)(
132+
organizations=[org2],
133+
username="user2",
134+
password="password",
135+
email="user2@test.org",
136+
)
137+
admin = await database_sync_to_async(self._get_admin)()
138+
org1_communicator = self._get_common_location_communicator(
139+
await self._get_common_location_request_dict(
140+
pk=org1_location.pk, user=org1_user
141+
),
142+
org1_user,
143+
)
144+
org2_communicator = self._get_common_location_communicator(
145+
await self._get_common_location_request_dict(
146+
pk=org2_location.pk, user=org2_user
147+
),
148+
org2_user,
149+
)
150+
admin_communicator = self._get_common_location_communicator(
151+
await self._get_common_location_request_dict(
152+
pk=org1_location.pk, user=admin
153+
),
154+
admin,
155+
)
156+
connected, _ = await org1_communicator.connect()
157+
assert connected
158+
connected, _ = await org2_communicator.connect()
159+
assert connected
160+
connected, _ = await admin_communicator.connect()
161+
assert connected
162+
163+
# Updating co-ordinates for org1_location should notify org1_user and admin,
164+
await self._save_location(str(org1_location.pk))
165+
org1_response = await org1_communicator.receive_json_from(timeout=1)
166+
assert org1_response["id"] == str(org1_location.pk)
167+
admin_response = await admin_communicator.receive_json_from(timeout=1)
168+
assert admin_response["id"] == str(org1_location.pk)
169+
with pytest.raises(asyncio.TimeoutError):
170+
await org2_communicator.receive_json_from(timeout=1)
171+
172+
with suppress(asyncio.CancelledError):
173+
await org2_communicator.disconnect()
174+
175+
org2_communicator = self._get_common_location_communicator(
176+
await self._get_common_location_request_dict(
177+
pk=org2_location.pk, user=org2_user
178+
),
179+
org2_user,
180+
)
181+
connected, _ = await org2_communicator.connect()
182+
assert connected
183+
184+
# Updating co-ordinates for org2_location should notify org2_user and admin,
185+
await self._save_location(str(org2_location.pk))
186+
org2_response = await org2_communicator.receive_json_from(timeout=1)
187+
assert org2_response["id"] == str(org2_location.pk)
188+
admin_response = await admin_communicator.receive_json_from(timeout=1)
189+
assert admin_response["id"] == str(org2_location.pk)
190+
with pytest.raises(asyncio.TimeoutError):
191+
await org1_communicator.receive_json_from(timeout=1)
192+
193+
# The task is been cancelled if not completed in the given timeout
194+
with suppress(asyncio.CancelledError):
195+
await org1_communicator.disconnect()
196+
await org2_communicator.disconnect()
197+
await admin_communicator.disconnect()
198+
109199
def test_asgi_application_router(self):
110200
assert isinstance(self.application, ProtocolTypeRouter)

0 commit comments

Comments
 (0)