Skip to content

Commit d3f4f2c

Browse files
committed
[tests] Add WebSocket consumer tests #677
Implement tests for RadiusBatchConsumer to verify authentication, authorization, and group messaging logic. Increases coverage for consumers.py to 84%. Fixes #677
1 parent 2646a5c commit d3f4f2c

File tree

3 files changed

+187
-7
lines changed

3 files changed

+187
-7
lines changed

.github/workflows/ci.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ jobs:
3131
- python-version: "3.13"
3232
django-version: django~=4.2.0
3333

34+
permissions:
35+
pull-requests: write
36+
contents: read
37+
3438
steps:
3539
- uses: actions/checkout@v6
3640
with:
@@ -99,6 +103,12 @@ jobs:
99103
if: ${{ failure() }}
100104
run: cat geckodriver.log
101105

106+
- name: Pytest coverage comment
107+
if: ${{ success() }}
108+
uses: MishaKav/pytest-coverage-comment@v1.2.0
109+
with:
110+
pytest-coverage-path: coverage.xml
111+
102112
- name: Upload Coverage
103113
if: ${{ !cancelled() && steps.deps.conclusion == 'success' }}
104114
uses: coverallsapp/github-action@v2

openwisp_radius/consumers.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,9 @@ def _user_can_access_batch(self, user, batch_id):
1212
if user.is_superuser:
1313
return RadiusBatch.objects.filter(pk=batch_id).exists()
1414
# For non-superusers, check their managed organizations
15-
try:
16-
RadiusBatch.objects.filter(
17-
pk=batch_id, organization__in=user.organizations_managed
18-
).exists()
19-
return True
20-
except ObjectDoesNotExist:
21-
return False
15+
return RadiusBatch.objects.filter(
16+
pk=batch_id, organization__in=user.organizations_managed
17+
).exists()
2218

2319
async def connect(self):
2420
self.batch_id = self.scope["url_route"]["kwargs"]["batch_id"]
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
from asgiref.sync import async_to_sync
2+
from channels.routing import URLRouter
3+
from channels.testing import WebsocketCommunicator
4+
from django.contrib.auth import get_user_model
5+
from django.test import TransactionTestCase
6+
from django.urls import re_path
7+
8+
from openwisp_users.tests.utils import TestOrganizationMixin
9+
10+
from ..consumers import RadiusBatchConsumer
11+
from ..utils import load_model
12+
from . import CreateRadiusObjectsMixin
13+
14+
User = get_user_model()
15+
RadiusBatch = load_model("RadiusBatch")
16+
17+
application = URLRouter(
18+
[
19+
re_path(
20+
r"^ws/radius/batch/(?P<batch_id>[^/]+)/$",
21+
RadiusBatchConsumer.as_asgi(),
22+
),
23+
]
24+
)
25+
26+
27+
class TestRadiusBatchConsumer(
28+
CreateRadiusObjectsMixin, TestOrganizationMixin, TransactionTestCase
29+
):
30+
31+
TEST_PASSWORD = "test_password" # noqa: S105
32+
33+
def _create_test_data(self):
34+
org = self._create_org()
35+
user = self._get_admin()
36+
user.is_staff = True
37+
user.is_superuser = True
38+
user.save()
39+
40+
batch = self._create_radius_batch(
41+
name="test-batch",
42+
strategy="prefix",
43+
prefix="test-",
44+
organization=org,
45+
)
46+
return org, user, batch
47+
48+
def _create_staff_user(self, org):
49+
user = User.objects.create_user(
50+
username="staff_user",
51+
email="staff@test.com",
52+
password=self.TEST_PASSWORD,
53+
is_staff=True,
54+
)
55+
org.add_user(user, is_admin=True)
56+
return user
57+
58+
def _create_regular_user(self):
59+
return User.objects.create_user(
60+
username="regular_user",
61+
email="regular@test.com",
62+
password=self.TEST_PASSWORD,
63+
is_staff=False,
64+
)
65+
66+
def test_websocket_connect_superuser(self):
67+
_, user, batch = self._create_test_data()
68+
69+
async def test():
70+
communicator = WebsocketCommunicator(
71+
application,
72+
f"/ws/radius/batch/{batch.pk}/",
73+
)
74+
communicator.scope["user"] = user
75+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
76+
77+
connected, _ = await communicator.connect()
78+
assert connected is True
79+
await communicator.disconnect()
80+
81+
async_to_sync(test)()
82+
83+
def test_websocket_connect_staff_with_permission(self):
84+
org, _, batch = self._create_test_data()
85+
staff_user = self._create_staff_user(org)
86+
87+
async def test():
88+
communicator = WebsocketCommunicator(
89+
application,
90+
f"/ws/radius/batch/{batch.pk}/",
91+
)
92+
communicator.scope["user"] = staff_user
93+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
94+
95+
connected, _ = await communicator.connect()
96+
assert connected is True
97+
await communicator.disconnect()
98+
99+
async_to_sync(test)()
100+
101+
def test_websocket_reject_unauthenticated(self):
102+
_, _, batch = self._create_test_data()
103+
104+
async def test():
105+
communicator = WebsocketCommunicator(
106+
application,
107+
f"/ws/radius/batch/{batch.pk}/",
108+
)
109+
from django.contrib.auth.models import AnonymousUser
110+
111+
communicator.scope["user"] = AnonymousUser()
112+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
113+
114+
connected, _ = await communicator.connect()
115+
assert connected is False
116+
117+
async_to_sync(test)()
118+
119+
def test_websocket_reject_non_staff(self):
120+
_, _, batch = self._create_test_data()
121+
regular_user = self._create_regular_user()
122+
123+
async def test():
124+
communicator = WebsocketCommunicator(
125+
application,
126+
f"/ws/radius/batch/{batch.pk}/",
127+
)
128+
communicator.scope["user"] = regular_user
129+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
130+
131+
connected, _ = await communicator.connect()
132+
assert connected is False
133+
134+
async_to_sync(test)()
135+
136+
def test_websocket_reject_no_permission(self):
137+
_, _, batch = self._create_test_data()
138+
139+
staff_user = User.objects.create_user(
140+
username="staff_no_access",
141+
email="staff2@test.com",
142+
password=self.TEST_PASSWORD,
143+
is_staff=True,
144+
)
145+
146+
async def test():
147+
communicator = WebsocketCommunicator(
148+
application,
149+
f"/ws/radius/batch/{batch.pk}/",
150+
)
151+
communicator.scope["user"] = staff_user
152+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
153+
154+
connected, _ = await communicator.connect()
155+
assert connected is False
156+
157+
async_to_sync(test)()
158+
159+
def test_websocket_group_connection(self):
160+
_, user, batch = self._create_test_data()
161+
162+
async def test():
163+
communicator = WebsocketCommunicator(
164+
application,
165+
f"/ws/radius/batch/{batch.pk}/",
166+
)
167+
communicator.scope["user"] = user
168+
communicator.scope["url_route"] = {"kwargs": {"batch_id": str(batch.pk)}}
169+
170+
connected, _ = await communicator.connect()
171+
assert connected is True
172+
await communicator.disconnect()
173+
174+
async_to_sync(test)()

0 commit comments

Comments
 (0)