Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
from calculator.enums import InstalmentStatus
from common.authentications import RobotBasicAuthentication
from common.permissions import BFIsHandler
from common.utils import get_request_ip_address
from shared.audit_log import audit_logging
from shared.audit_log.enums import Operation
from shared.audit_log.utils import get_remote_address
from shared.audit_log.viewsets import AuditLoggingModelViewSet

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -221,7 +221,7 @@ def talpa_export_batch(self, request, *args, **kwargs) -> HttpResponse:
filename=file_name
)

ip_address = get_request_ip_address(request)
ip_address = get_remote_address(request)

for a in applications_for_csv:
audit_logging.log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from applications.models import Application
from calculator.enums import InstalmentStatus
from common.authentications import RobotBasicAuthentication
from common.utils import get_request_ip_address
from shared.audit_log import audit_logging
from shared.audit_log.enums import Operation
from shared.audit_log.utils import get_remote_address

LOGGER = logging.getLogger(__name__)

Expand All @@ -45,7 +45,7 @@ def post(self, request, *args, **kwargs):

def process_callback(self, data, request):
if data["status"] in ["Success", "Failure"]:
ip_address = get_request_ip_address(request)
ip_address = get_remote_address(request)
self._handle_successful_applications(
data["successful_applications"],
ip_address,
Expand Down
19 changes: 0 additions & 19 deletions backend/benefit/common/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@
import pytest
from dateutil.relativedelta import relativedelta
from django.core.files.uploadedfile import SimpleUploadedFile
from django.http import HttpRequest

from common.utils import (
date_range_overlap,
days360,
duration_in_months,
get_date_range_end_with_days360,
get_request_ip_address,
hash_file,
)

Expand Down Expand Up @@ -150,20 +148,3 @@ def test_hash_file():

# Assert that the actual hash matches the expected hash
assert actual_hash == expected_hash


def test_get_request_ip_address_with_x_forwarded_for():
request = HttpRequest()
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.1.1, 192.168.1.2"
assert get_request_ip_address(request) == "192.168.1.1"


def test_get_request_ip_address_with_remote_addr():
request = HttpRequest()
request.META["REMOTE_ADDR"] = "192.168.1.1"
assert get_request_ip_address(request) == "192.168.1.1"


def test_get_request_ip_address_with_no_ip():
request = HttpRequest()
assert get_request_ip_address(request) is None
9 changes: 0 additions & 9 deletions backend/benefit/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,3 @@ def hash_file(file: File) -> str:

# Return the hexadecimal representation of the hash
return sha256.hexdigest()


def get_request_ip_address(request: HttpRequest) -> str | None:
"""Get the IP address of a request"""
x_forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
return x_forwarded_for.split(",")[0]

return request.META.get("REMOTE_ADDR", None)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from unittest.mock import Mock

import pytest
from django.http import HttpRequest

from shared.audit_log.utils import get_remote_address


@pytest.mark.parametrize(
"x_forwarded_for", [True, False]
)
@pytest.mark.parametrize(
"remote_address, expected_ip",
[
# IPv4 cases
("12.23.34.45", "12.23.34.45"),
("12.23.34.45:1234", "12.23.34.45"),
# IPv6 cases
("2001:db8::1", "2001:db8::1"),
("[2001:db8::1]", "2001:db8::1"),
("[2001:db8::1]:1234", "2001:db8::1"),
("2001:db8:0:0:0:0:0:1", "2001:db8:0:0:0:0:0:1"),
("[2001:db8:0:0:0:0:0:1]", "2001:db8:0:0:0:0:0:1"),
("[2001:db8:0:0:0:0:0:1]:1234", "2001:db8:0:0:0:0:0:1"),
# IPv4-mapped IPv6 cases
("::ffff:192.0.2.1", "::ffff:192.0.2.1"),
("[::ffff:192.0.2.1]:1234", "::ffff:192.0.2.1"),
],
)
def test_get_remote_address(remote_address, expected_ip, x_forwarded_for):
req_mock = Mock(
headers={"x-forwarded-for": remote_address} if x_forwarded_for else {},
META={"REMOTE_ADDR": remote_address} if not x_forwarded_for else {},
)
assert get_remote_address(req_mock) == expected_ip
16 changes: 16 additions & 0 deletions backend/shared/shared/audit_log/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def get_remote_address(request):
forwarded_for = request.headers.get("x-forwarded-for", "")
client_ip = forwarded_for.split(",")[0] or None

if not client_ip:
client_ip = request.META.get("REMOTE_ADDR")

if client_ip:
# Strip port from ip address if present
if "[" in client_ip:
# Bracketed IPv6 like [2001:db8::1]:1234
client_ip = client_ip.lstrip("[").split("]")[0]
elif "." in client_ip and client_ip.count(":") == 1:
# IPv4 with port
client_ip = client_ip.split(":")[0]
Comment on lines +12 to +15
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The port-stripping logic fails for unbracketed IPv4-mapped IPv6 addresses (e.g., ::ffff:192.0.2.1:8000) because it incorrectly checks for a single colon, leaving the port attached.
Severity: MEDIUM

Suggested Fix

Update the logic to correctly identify and strip ports from unbracketed IPv6 addresses. This could involve checking for a dot and multiple colons, then finding the last colon to reliably split the host from the port, regardless of the IP address format.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: backend/shared/shared/audit_log/utils.py#L12-L15

Potential issue: The logic to strip ports from IP addresses does not correctly handle
unbracketed IPv4-mapped IPv6 addresses that contain a port, such as
`::ffff:192.0.2.1:8000`. The condition `elif "." in client_ip and client_ip.count(":")
== 1` fails because such addresses contain multiple colons. As a result, the port is not
stripped. This causes an IP address with a port to be sent to Elasticsearch, which
cannot index it as an `ip` type, leading to the silent failure of audit log persistence
and data loss.

Did we get this right? 👍 / 👎 to inform future reviews.

return client_ip
15 changes: 4 additions & 11 deletions backend/shared/shared/audit_log/viewsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from shared.audit_log import audit_logging
from shared.audit_log.enums import Operation, Status
from shared.audit_log.utils import get_remote_address

User = get_user_model()

Expand All @@ -38,7 +39,7 @@ def _log_permission_denied(self):
self._get_operation(),
self._get_target(),
Status.FORBIDDEN,
ip_address=self._get_ip_address(),
ip_address=get_remote_address(self.request),
)

def update(self, request, *args, **kwargs):
Expand Down Expand Up @@ -104,7 +105,7 @@ def record_action(
actor_backend,
operation,
target or self._get_target(),
ip_address=self._get_ip_address(),
ip_address=get_remote_address(self.request),
additional_information=additional_information,
)
except (NotAuthenticated, PermissionDenied):
Expand All @@ -114,7 +115,7 @@ def record_action(
operation,
target or self._get_target(),
Status.FORBIDDEN,
ip_address=self._get_ip_address(),
ip_address=get_remote_address(self.request),
additional_information=additional_information,
)
raise
Expand Down Expand Up @@ -148,11 +149,3 @@ def _get_target_object(self):

def _unfiltered_queryset(self):
return self.get_queryset().model.objects.all()

def _get_ip_address(self) -> str:
x_forwarded_for = self.request.META.get("HTTP_X_FORWARDED_FOR")
if x_forwarded_for:
ip = x_forwarded_for.split(",")[0]
else:
ip = self.request.META.get("REMOTE_ADDR")
return ip
Loading