From 537e8cf51f0f582d67bd6cf7974d2c080edf3233 Mon Sep 17 00:00:00 2001 From: Juha Louhiranta Date: Mon, 2 Feb 2026 18:15:56 +0200 Subject: [PATCH] fix: parse IP address without the port number IP address type doesn't accept port numbers in elastic. Refs: HL-1614 --- .../api/v1/application_batch_views.py | 4 +-- .../api/v1/talpa_integration_views.py | 4 +-- backend/benefit/common/tests/test_utils.py | 19 ---------- backend/benefit/common/utils.py | 9 ----- .../tests/test_get_remote_address.py | 35 +++++++++++++++++++ backend/shared/shared/audit_log/utils.py | 16 +++++++++ backend/shared/shared/audit_log/viewsets.py | 15 +++----- 7 files changed, 59 insertions(+), 43 deletions(-) create mode 100644 backend/shared/shared/audit_log/tests/test_get_remote_address.py create mode 100644 backend/shared/shared/audit_log/utils.py diff --git a/backend/benefit/applications/api/v1/application_batch_views.py b/backend/benefit/applications/api/v1/application_batch_views.py index 9ffd914235..539d75d5f4 100755 --- a/backend/benefit/applications/api/v1/application_batch_views.py +++ b/backend/benefit/applications/api/v1/application_batch_views.py @@ -34,9 +34,9 @@ from calculator.enums import InstalmentStatus from common.authentications import RobotBasicAuthentication from common.permissions import BFIsHandler -from common.utils import get_request_ip_address from shared.audit_log import audit_logging from shared.audit_log.enums import Operation +from shared.audit_log.utils import get_remote_address from shared.audit_log.viewsets import AuditLoggingModelViewSet LOGGER = logging.getLogger(__name__) @@ -221,7 +221,7 @@ def talpa_export_batch(self, request, *args, **kwargs) -> HttpResponse: filename=file_name ) - ip_address = get_request_ip_address(request) + ip_address = get_remote_address(request) for a in applications_for_csv: audit_logging.log( diff --git a/backend/benefit/applications/api/v1/talpa_integration_views.py b/backend/benefit/applications/api/v1/talpa_integration_views.py index feee3bd4f2..567f88164c 100644 --- a/backend/benefit/applications/api/v1/talpa_integration_views.py +++ b/backend/benefit/applications/api/v1/talpa_integration_views.py @@ -16,9 +16,9 @@ from applications.models import Application from calculator.enums import InstalmentStatus from common.authentications import RobotBasicAuthentication -from common.utils import get_request_ip_address from shared.audit_log import audit_logging from shared.audit_log.enums import Operation +from shared.audit_log.utils import get_remote_address LOGGER = logging.getLogger(__name__) @@ -45,7 +45,7 @@ def post(self, request, *args, **kwargs): def process_callback(self, data, request): if data["status"] in ["Success", "Failure"]: - ip_address = get_request_ip_address(request) + ip_address = get_remote_address(request) self._handle_successful_applications( data["successful_applications"], ip_address, diff --git a/backend/benefit/common/tests/test_utils.py b/backend/benefit/common/tests/test_utils.py index 5a22e60c96..0de0a53ed6 100644 --- a/backend/benefit/common/tests/test_utils.py +++ b/backend/benefit/common/tests/test_utils.py @@ -6,14 +6,12 @@ import pytest from dateutil.relativedelta import relativedelta from django.core.files.uploadedfile import SimpleUploadedFile -from django.http import HttpRequest from common.utils import ( date_range_overlap, days360, duration_in_months, get_date_range_end_with_days360, - get_request_ip_address, hash_file, ) @@ -150,20 +148,3 @@ def test_hash_file(): # Assert that the actual hash matches the expected hash assert actual_hash == expected_hash - - -def test_get_request_ip_address_with_x_forwarded_for(): - request = HttpRequest() - request.META["HTTP_X_FORWARDED_FOR"] = "192.168.1.1, 192.168.1.2" - assert get_request_ip_address(request) == "192.168.1.1" - - -def test_get_request_ip_address_with_remote_addr(): - request = HttpRequest() - request.META["REMOTE_ADDR"] = "192.168.1.1" - assert get_request_ip_address(request) == "192.168.1.1" - - -def test_get_request_ip_address_with_no_ip(): - request = HttpRequest() - assert get_request_ip_address(request) is None diff --git a/backend/benefit/common/utils.py b/backend/benefit/common/utils.py index 9f11f7c20f..1408136432 100644 --- a/backend/benefit/common/utils.py +++ b/backend/benefit/common/utils.py @@ -445,12 +445,3 @@ def hash_file(file: File) -> str: # Return the hexadecimal representation of the hash return sha256.hexdigest() - - -def get_request_ip_address(request: HttpRequest) -> str | None: - """Get the IP address of a request""" - x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") - if x_forwarded_for: - return x_forwarded_for.split(",")[0] - - return request.META.get("REMOTE_ADDR", None) diff --git a/backend/shared/shared/audit_log/tests/test_get_remote_address.py b/backend/shared/shared/audit_log/tests/test_get_remote_address.py new file mode 100644 index 0000000000..88a5d0dbbe --- /dev/null +++ b/backend/shared/shared/audit_log/tests/test_get_remote_address.py @@ -0,0 +1,35 @@ +from unittest.mock import Mock + +import pytest +from django.http import HttpRequest + +from shared.audit_log.utils import get_remote_address + + +@pytest.mark.parametrize( + "x_forwarded_for", [True, False] +) +@pytest.mark.parametrize( + "remote_address, expected_ip", + [ + # IPv4 cases + ("12.23.34.45", "12.23.34.45"), + ("12.23.34.45:1234", "12.23.34.45"), + # IPv6 cases + ("2001:db8::1", "2001:db8::1"), + ("[2001:db8::1]", "2001:db8::1"), + ("[2001:db8::1]:1234", "2001:db8::1"), + ("2001:db8:0:0:0:0:0:1", "2001:db8:0:0:0:0:0:1"), + ("[2001:db8:0:0:0:0:0:1]", "2001:db8:0:0:0:0:0:1"), + ("[2001:db8:0:0:0:0:0:1]:1234", "2001:db8:0:0:0:0:0:1"), + # IPv4-mapped IPv6 cases + ("::ffff:192.0.2.1", "::ffff:192.0.2.1"), + ("[::ffff:192.0.2.1]:1234", "::ffff:192.0.2.1"), + ], +) +def test_get_remote_address(remote_address, expected_ip, x_forwarded_for): + req_mock = Mock( + headers={"x-forwarded-for": remote_address} if x_forwarded_for else {}, + META={"REMOTE_ADDR": remote_address} if not x_forwarded_for else {}, + ) + assert get_remote_address(req_mock) == expected_ip diff --git a/backend/shared/shared/audit_log/utils.py b/backend/shared/shared/audit_log/utils.py new file mode 100644 index 0000000000..798e3d38d2 --- /dev/null +++ b/backend/shared/shared/audit_log/utils.py @@ -0,0 +1,16 @@ +def get_remote_address(request): + forwarded_for = request.headers.get("x-forwarded-for", "") + client_ip = forwarded_for.split(",")[0] or None + + if not client_ip: + client_ip = request.META.get("REMOTE_ADDR") + + if client_ip: + # Strip port from ip address if present + if "[" in client_ip: + # Bracketed IPv6 like [2001:db8::1]:1234 + client_ip = client_ip.lstrip("[").split("]")[0] + elif "." in client_ip and client_ip.count(":") == 1: + # IPv4 with port + client_ip = client_ip.split(":")[0] + return client_ip diff --git a/backend/shared/shared/audit_log/viewsets.py b/backend/shared/shared/audit_log/viewsets.py index 656bede256..efc4b7cfca 100644 --- a/backend/shared/shared/audit_log/viewsets.py +++ b/backend/shared/shared/audit_log/viewsets.py @@ -13,6 +13,7 @@ from shared.audit_log import audit_logging from shared.audit_log.enums import Operation, Status +from shared.audit_log.utils import get_remote_address User = get_user_model() @@ -38,7 +39,7 @@ def _log_permission_denied(self): self._get_operation(), self._get_target(), Status.FORBIDDEN, - ip_address=self._get_ip_address(), + ip_address=get_remote_address(self.request), ) def update(self, request, *args, **kwargs): @@ -104,7 +105,7 @@ def record_action( actor_backend, operation, target or self._get_target(), - ip_address=self._get_ip_address(), + ip_address=get_remote_address(self.request), additional_information=additional_information, ) except (NotAuthenticated, PermissionDenied): @@ -114,7 +115,7 @@ def record_action( operation, target or self._get_target(), Status.FORBIDDEN, - ip_address=self._get_ip_address(), + ip_address=get_remote_address(self.request), additional_information=additional_information, ) raise @@ -148,11 +149,3 @@ def _get_target_object(self): def _unfiltered_queryset(self): return self.get_queryset().model.objects.all() - - def _get_ip_address(self) -> str: - x_forwarded_for = self.request.META.get("HTTP_X_FORWARDED_FOR") - if x_forwarded_for: - ip = x_forwarded_for.split(",")[0] - else: - ip = self.request.META.get("REMOTE_ADDR") - return ip