From 765619f3232ec844feaf20bae8fd1a6be890adf6 Mon Sep 17 00:00:00 2001 From: Janne Forsman Date: Thu, 13 Mar 2025 18:06:00 +0200 Subject: [PATCH] feat: Update AdditionalSignReal additional_information and update content_s if possible Refs: LIIK-744, LIIK-745 --- traffic_control/admin/__init__.py | 1 + .../admin/parking_zone_update_info.py | 9 + .../additional_sign_info_enrich.py | 432 ++++++++++++++++++ .../commands/parking_zone_update.py | 45 ++ .../migrations/0089_parkingzoneupdateinfo.py | 25 + traffic_control/models/__init__.py | 1 + .../models/parking_zone_update_info.py | 15 + .../test_additional_sign_info_enrich.py | 274 +++++++++++ .../commands/test_parking_zone_update.py | 26 ++ 9 files changed, 828 insertions(+) create mode 100644 traffic_control/admin/parking_zone_update_info.py create mode 100644 traffic_control/analyze_utils/additional_sign_info_enrich.py create mode 100644 traffic_control/management/commands/parking_zone_update.py create mode 100644 traffic_control/migrations/0089_parkingzoneupdateinfo.py create mode 100644 traffic_control/models/parking_zone_update_info.py create mode 100644 traffic_control/tests/analyze_utils/test_additional_sign_info_enrich.py create mode 100644 traffic_control/tests/management/commands/test_parking_zone_update.py diff --git a/traffic_control/admin/__init__.py b/traffic_control/admin/__init__.py index 2f80945e..fd57e9f8 100644 --- a/traffic_control/admin/__init__.py +++ b/traffic_control/admin/__init__.py @@ -19,6 +19,7 @@ PortalTypeAdmin, ) from traffic_control.admin.owner import OwnerAdmin +from traffic_control.admin.parking_zone_update_info import ParkingZoneUpdateInfoAdmin from traffic_control.admin.plan import PlanAdmin from traffic_control.admin.road_marking import ( RoadMarkingPlanAdmin, diff --git a/traffic_control/admin/parking_zone_update_info.py b/traffic_control/admin/parking_zone_update_info.py new file mode 100644 index 00000000..59e904ed --- /dev/null +++ b/traffic_control/admin/parking_zone_update_info.py @@ -0,0 +1,9 @@ +from django.contrib import admin + +from traffic_control.models import ParkingZoneUpdateInfo + + +@admin.register(ParkingZoneUpdateInfo) +class ParkingZoneUpdateInfoAdmin(admin.ModelAdmin): + list_display = ("id", "start_time", "end_time", "database_update") + ordering = ("-start_time",) diff --git a/traffic_control/analyze_utils/additional_sign_info_enrich.py b/traffic_control/analyze_utils/additional_sign_info_enrich.py new file mode 100644 index 00000000..0828a714 --- /dev/null +++ b/traffic_control/analyze_utils/additional_sign_info_enrich.py @@ -0,0 +1,432 @@ +import csv +import re +from datetime import datetime, timezone +from typing import Any, Dict, List, NamedTuple, Tuple, Union + +from django.db import transaction + +from traffic_control.models import AdditionalSignReal, ParkingZoneUpdateInfo, TrafficControlDeviceType + + +class AdditionalSignInfoUpdateInfo(NamedTuple): + additional_sign_id: str + location: str + device_type_code: str + old_additional_information: str + new_additional_information: str + streetsmart_link: str + admin_link: str + content_schema: Union[Dict[str, Any], None] + errors: List[str] + + +def _get_admin_link(additional_sign_id: str) -> str: + """TDOO now just hardcoded production admin link""" + return f"https://cityinfra.api.hel.fi/en/admin/traffic_control/additionalsignreal/{additional_sign_id}/change/" + + +def _get_content_schema_update( + additional_sign: AdditionalSignReal, content_s: Union[Dict[str, str], None] +) -> Union[Dict, None]: + """Do not get content schema update if additional sign has it already""" + if not additional_sign.content_s and additional_sign.missing_content is True: + dt = TrafficControlDeviceType.objects.get(code=additional_sign.device_type.code) + if dt.content_schema and content_s: + return content_s + return None + + +def _get_error_info(additional_sign: AdditionalSignReal, errors: List[str]) -> AdditionalSignInfoUpdateInfo: + return AdditionalSignInfoUpdateInfo( + additional_sign_id=str(additional_sign.id), + location=additional_sign.location.ewkt, + device_type_code=additional_sign.device_type.code, + old_additional_information=additional_sign.additional_information, + new_additional_information="", + streetsmart_link=additional_sign.attachment_url, + admin_link=_get_admin_link(str(additional_sign.id)), + content_schema=None, + errors=errors, + ) + + +def _get_success_info( + additional_sign: AdditionalSignReal, new_additional_information, content_s: Union[Dict[str, str], None] = None +) -> AdditionalSignInfoUpdateInfo: + return AdditionalSignInfoUpdateInfo( + additional_sign_id=str(additional_sign.id), + location=additional_sign.location.ewkt, + device_type_code=additional_sign.device_type.code, + old_additional_information=additional_sign.additional_information, + new_additional_information=new_additional_information, + streetsmart_link=additional_sign.attachment_url, + admin_link=_get_admin_link(str(additional_sign.id)), + content_schema=_get_content_schema_update(additional_sign, content_s), + errors=[], + ) + + +SUPPORTED_DTYPE_CODES = [ + "H20.71", + "H20.71S", + "H20.72", + "H20.72S", + "H20.73", + "H20.73S", + "H20.74", + "H20.74S", + "H20.75", + "H20.75S", + "H20.8", + "H20.8S", +] + + +TEXT_NUMBERCODE_PATTERN = re.compile(r"text:\s*(.*?);\s*numbercode:\s*(.*)") +PERMIT_REPLACEMENT_PATTERN = re.compile(r"(\b\w+\b)\s*/(\b\w+\b)") +ZONE_LIMIT_VALUE_PATTERN = re.compile(r"(\d+)\s+(\d+)\s+(min|h)|(\d+)\s*(\d+)(min|h|mın)") +ALLOWED_PERMIT_VALUES = [ + "A", + "B", + "C", + "D", + "E", + "F", + "H", + "I", + "J", + "K", + "L", + "M", + "N", + "O", + "P", + "Z", + "A/B", + "A/F", + "B/C", + "C/E", + "F/H", + "H/L", + "I/J", + "J/K", +] + + +ALLOWED_ZONE_VALUES = ["1", "2", "3"] +ALLOWED_UNIT_VALUES = ["min", "h"] +H20_71X_BASIC_ERROR_STR = "Could not get zone and permit from additional_information" +H20_72X_BASIC_ERROR_STR = "Could not get zone, limit and unit from additional_information" +TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR = "text and numbercode not found from additional_information" +H20_71_FORMAT_STR = "Vyöhyke/Zon {zone}. Ei koske P-tunnuksella/Gäller ej med P-tecknet {permit}" +H20_71S_FORMAT_STR = "Vyöhyke/Zon {zone}. Maksu ei koske P-tunnuksella/Avgiften gäller ej med P-tecknet {permit}" +H20_8_FORMAT_STR = "Ei koske P-tunnuksella/Gäller ej med P-tecknet {permit}" +H20_8S_FORMAT_STR = "Maksu ei koske P-tunnuksella/Avgiften gäller ej med P-tecknet {permit}" + + +def h20_73_handler(additional_sign: AdditionalSignReal): + return basic_str_handler(additional_sign, "Vyöhyke/Zon 1") + + +def h20_74_handler(additional_sign: AdditionalSignReal): + return basic_str_handler(additional_sign, "Vyöhyke/Zon 2") + + +def h20_75_handler(additional_sign: AdditionalSignReal): + return basic_str_handler(additional_sign, "Vyöhyke/Zon 3") + + +def h20_8_handler(additional_sign: AdditionalSignReal): + return _get_h20_8x_update_info(additional_sign, H20_8_FORMAT_STR) + + +def h20_8s_handler(additional_sign: AdditionalSignReal) -> AdditionalSignInfoUpdateInfo: + return _get_h20_8x_update_info(additional_sign, H20_8S_FORMAT_STR) + + +def h20_71_handler(additional_sign: AdditionalSignReal) -> AdditionalSignInfoUpdateInfo: + return _get_h20_71x_update_info(additional_sign, H20_71_FORMAT_STR) + + +def h20_71s_handler(additional_sign: AdditionalSignReal) -> AdditionalSignInfoUpdateInfo: + return _get_h20_71x_update_info(additional_sign, H20_71S_FORMAT_STR) + + +def h20_72_handler(additional_sign: AdditionalSignReal) -> AdditionalSignInfoUpdateInfo: + return get_h20_72x_update_info( + additional_sign, "Vyöhyke/Zon {zone}. Kertamaksu enintään/Engångsbetalning max {limit} {unit}" + ) + + +def h20_72s_handler(additional_sign: AdditionalSignReal) -> AdditionalSignInfoUpdateInfo: + return get_h20_72x_update_info( + additional_sign, "Vyöhyke/Zon {zone}. Kertamaksu enintään/Engångsbetalning max {limit} {unit}" + ) + + +def get_h20_72x_update_info(additional_sign: AdditionalSignReal, text_format_str: str) -> AdditionalSignInfoUpdateInfo: + text_value, numbercode_value = _get_text_and_numbercode_values(additional_sign.additional_information) + if text_value is None or numbercode_value is None: + return _get_error_info(additional_sign, [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR]) + + zone, limit, unit = _get_zone_limit_and_unit_from_h20_72_text(text_value) + if zone is None and limit is None and unit is None: + return _get_error_info(additional_sign, [H20_72X_BASIC_ERROR_STR]) + + errors = [] + if zone not in ALLOWED_ZONE_VALUES: + errors.append(f"Parsed zone is not allowed: {zone}") + if unit not in ALLOWED_UNIT_VALUES: + errors.append(f"Parsed unit is not allowed: {unit}") + if errors: + return _get_error_info(additional_sign, errors) + + return _get_success_info( + additional_sign, + _make_additional_information_str(text_format_str.format(zone=zone, limit=limit, unit=unit), numbercode_value), + {"zone": zone, "limit": limit, "unit": unit}, + ) + + +def _get_h20_71x_update_info(additional_sign: AdditionalSignReal, text_format_str: str) -> AdditionalSignInfoUpdateInfo: + text_value, numbercode_value = _get_text_and_numbercode_values(additional_sign.additional_information) + if text_value is None and numbercode_value is None: + return _get_error_info(additional_sign, [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR]) + + zone, permit_code = _get_zone_and_permit_from_h20_71x_text(text_value) + if zone is None and permit_code is None: + return _get_error_info(additional_sign, [H20_71X_BASIC_ERROR_STR]) + + errors = [] + if permit_code not in ALLOWED_PERMIT_VALUES: + errors.append(f"Parsed permit code is not allowed: {permit_code}") + if zone not in ALLOWED_ZONE_VALUES: + errors.append(f"Parsed zone is not allowed: {zone}") + if errors: + return _get_error_info(additional_sign, errors) + + return _get_success_info( + additional_sign, + _make_additional_information_str( + _h20_71x_additional_sign_text(zone, permit_code, text_format_str), numbercode_value + ), + {"zone": zone, "permit": permit_code}, + ) + + +def _get_h20_8x_update_info(additional_sign: AdditionalSignReal, text_format_str: str) -> AdditionalSignInfoUpdateInfo: + text_value, numbercode_value = _get_text_and_numbercode_values(additional_sign.additional_information) + + if text_value is None or numbercode_value is None: + return _get_error_info(additional_sign, [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR]) + + permit_code = _get_permit_from_h20_8_text(text_value) + if not permit_code: + return _get_error_info(additional_sign, ["Could not get permit code from additional_information"]) + else: + if permit_code not in ALLOWED_PERMIT_VALUES: + return _get_error_info(additional_sign, [f"Parsed permit code is not allowed: {permit_code}"]) + else: + return _get_success_info( + additional_sign, + _make_additional_information_str( + _h20_8_additional_sign_text(permit_code, text_format_str), numbercode_value + ), + {"permit": permit_code}, + ) + + +def _h20_8_additional_sign_text(permit_code: str, text_format_str: str) -> str: + return text_format_str.format(permit=permit_code) + + +def _h20_71x_additional_sign_text(zone: str, permit_code: str, text_format_str: str) -> str: + return text_format_str.format(zone=zone, permit=permit_code) + + +def _get_permit_from_h20_8_text(text: str) -> Union[str, None]: + stripped_text = text.replace("unreadable", "").strip() + # after this there should be only one substring left that should be the permit code + # if not then it should be reported as an error + if len(stripped_text.split()) != 1: + return None + + return _map_permit_code(stripped_text.upper()) + + +def _get_zone_and_permit_from_h20_71x_text(text: str) -> Tuple[str | None, str | None]: + """Known formats are '; and ' ' + Any other is considered to be erroneous data. + """ + parts = re.sub(PERMIT_REPLACEMENT_PATTERN, r"\1/\2", text).replace(";", " ").split() + if len(parts) != 2: + return None, None + + return parts[0].strip(), _map_permit_code(parts[1].strip().upper()) + + +def _get_zone_limit_and_unit_from_h20_72_text(text: str) -> Tuple[str | None, str | None, str | None]: + pre_cleaned = text.replace(";", " ") + match = re.match(ZONE_LIMIT_VALUE_PATTERN, pre_cleaned) + if match: + if match.group(1): + # format with spaces eg. 1 20 min + zone = match.group(1) + limit = match.group(2) + unit = match.group(3) + else: + # format without spaces eg. 1 20min + zone = match.group(4) + limit = match.group(5) + unit = match.group(6) + + return zone, limit, _map_unit(unit) + + return None, None, None + + +def _map_permit_code(permit_code: str) -> str: + """Currently just map 0 to O. Streetscan data has interpreted many O to 0""" + if permit_code == "0": + return "O" + return permit_code + + +def _map_unit(unit: str) -> str: + """in streetscan data what is supposed to be min is actually mın (with dotless I)""" + if unit == "mın": + return "min" + return unit + + +def basic_str_handler(additional_sign: AdditionalSignReal, hardcoded_text: str) -> AdditionalSignInfoUpdateInfo: + text_value, number_value = _get_text_and_numbercode_values(additional_sign.additional_information) + if text_value is not None and number_value is not None: + return _get_success_info(additional_sign, _make_additional_information_str(hardcoded_text, number_value)) + + return _get_error_info(additional_sign, [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR]) + + +def _get_default_queryset(): + """Exclude is so that those simple Vyökyke additional signs are not handled twice""" + return ( + AdditionalSignReal.objects.filter(device_type__code__in=SUPPORTED_DTYPE_CODES) + .exclude(additional_information__icontains="vyöhyke") + .exclude(additional_information__icontains="Ei koske P-tunnuksella") + .select_related("device_type") + ) + + +def _get_text_and_numbercode_values(additional_info_str: str) -> Tuple[Any, Any]: + match = re.search(TEXT_NUMBERCODE_PATTERN, _pre_clean_additional_info(additional_info_str)) + if match: + return match.group(1), match.group(2) + return None, None + + +def _pre_clean_additional_info(additional_info_str: str) -> str: + # handle case where permit code is 'A /B' + pre_cleaned_text = additional_info_str.replace("\n", "") + return pre_cleaned_text + + +def _make_additional_information_str(text_value: str, numbercode_value: str) -> str: + return f"text:{text_value}; numbercode:{numbercode_value}" + + +HANDLERS_BY_DTYPE_CODE = { + "H20.71": h20_71_handler, + "H20.71S": h20_71s_handler, + "H20.72": h20_72_handler, + "H20.72S": h20_72s_handler, + "H20.73": h20_73_handler, + "H20.73S": h20_73_handler, + "H20.74": h20_74_handler, + "H20.74S": h20_74_handler, + "H20.75": h20_75_handler, + "H20.75S": h20_75_handler, + "H20.8": h20_8_handler, + "H20.8S": h20_8s_handler, +} + + +def get_update_infos(qset=None): + if qset is None: + qset = _get_default_queryset() + + update_infos = [] + for additional_sign in qset: + dtype_code = additional_sign.device_type.code + if not HANDLERS_BY_DTYPE_CODE.get(dtype_code): + update_infos.append(_get_error_info(additional_sign, [[f"Device type: {dtype_code} not supported"]])) + else: + update_infos.append(HANDLERS_BY_DTYPE_CODE[dtype_code](additional_sign)) + + return update_infos + + +def get_error_infos( + update_infos: List[AdditionalSignInfoUpdateInfo], order_by="device_type_code" +) -> List[AdditionalSignInfoUpdateInfo]: + return sorted((filter(lambda x: x.errors, update_infos)), key=lambda x: getattr(x, order_by)) + + +def get_success_infos( + update_infos: List[AdditionalSignInfoUpdateInfo], order_by="device_type_code" +) -> List[AdditionalSignInfoUpdateInfo]: + return sorted((filter(lambda x: not x.errors, update_infos)), key=lambda x: getattr(x, order_by)) + + +def do_database_update( + update_infos: List[AdditionalSignInfoUpdateInfo], update_errors: List[AdditionalSignInfoUpdateInfo] +) -> Union[ParkingZoneUpdateInfo, None]: + def get_update_params(): + ud_params = {"additional_information": update_info.new_additional_information} + if update_info.content_schema: + ud_params["content_s"] = update_info.content_schema + ud_params["missing_content"] = False + return ud_params + + with transaction.atomic(): + start_time = datetime.now(timezone.utc) + for update_info in update_infos: + AdditionalSignReal.objects.filter(id=update_info.additional_sign_id).update(**get_update_params()) + end_time = datetime.now(timezone.utc) + return ParkingZoneUpdateInfo.objects.create( + start_time=start_time, + end_time=end_time, + update_infos=update_infos, + database_update=True, + update_errors=update_errors, + ) + + +def write_infos_to_csv(update_infos: List[AdditionalSignInfoUpdateInfo], file_path): + headers = [ + "additional_sign_id", + "location", + "device_type_code", + "old_additional_information", + "new_additional_information", + "streetsmart_link", + "admin_link", + "errors", + ] + with open(file_path, "w") as csv_file: + writer = csv.writer(csv_file, delimiter=";") + writer.writerow(headers) + writer.writerows(update_infos) + + +def _rows_for_csv(update_infos: List[AdditionalSignInfoUpdateInfo]): + for update_info in update_infos: + yield [ + update_info.additional_sign_id, + update_info.location, + update_info.device_type_code, + update_info.old_additional_information, + update_info.new_additional_information, + update_info.streetsmart_link, + update_info.admin_link, + ";".join(update_info.errors), + ] diff --git a/traffic_control/management/commands/parking_zone_update.py b/traffic_control/management/commands/parking_zone_update.py new file mode 100644 index 00000000..c3731325 --- /dev/null +++ b/traffic_control/management/commands/parking_zone_update.py @@ -0,0 +1,45 @@ +from datetime import datetime, timezone + +from django.core.management.base import BaseCommand + +from traffic_control.analyze_utils.additional_sign_info_enrich import ( + do_database_update, + get_error_infos, + get_success_infos, + get_update_infos, +) +from traffic_control.models import ParkingZoneUpdateInfo + + +class Command(BaseCommand): + help = "Update AdditionalSignReal additional_information field. Also json schema updated if that can be done." + + def add_arguments(self, parser): + parser.add_argument( + "-u", + "--update", + action="store_true", + dest="update", + default=False, + help="Actually do updates on the AdditionalSignReals", + ) + + def handle(self, *args, **options): + start_time = datetime.now(timezone.utc) + update_infos = get_update_infos() + update_errors = get_error_infos(update_infos) + update_success = get_success_infos(update_infos) + + if options["update"]: + self.stdout.write("Updating additional_informations and schemas..") + do_database_update(update_success, update_errors) + else: + self.stdout.write("Checking what to update...") + end_time = datetime.now(timezone.utc) + ParkingZoneUpdateInfo.objects.create( + start_time=start_time, + end_time=end_time, + update_infos=update_success, + update_errors=update_errors, + database_update=False, + ) diff --git a/traffic_control/migrations/0089_parkingzoneupdateinfo.py b/traffic_control/migrations/0089_parkingzoneupdateinfo.py new file mode 100644 index 00000000..1943a14c --- /dev/null +++ b/traffic_control/migrations/0089_parkingzoneupdateinfo.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.18 on 2025-03-20 10:09 + +from django.db import migrations, models +import uuid + + +class Migration(migrations.Migration): + + dependencies = [ + ('traffic_control', '0088_plan_decision_url'), + ] + + operations = [ + migrations.CreateModel( + name='ParkingZoneUpdateInfo', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('start_time', models.DateTimeField()), + ('end_time', models.DateTimeField()), + ('update_infos', models.JSONField(null=True)), + ('update_errors', models.JSONField(null=True)), + ('database_update', models.BooleanField(default=False)), + ], + ), + ] diff --git a/traffic_control/models/__init__.py b/traffic_control/models/__init__.py index c65f5ba3..40f42d2a 100644 --- a/traffic_control/models/__init__.py +++ b/traffic_control/models/__init__.py @@ -29,6 +29,7 @@ PortalType, ) from traffic_control.models.operational_area import GroupOperationalArea, OperationalArea +from traffic_control.models.parking_zone_update_info import ParkingZoneUpdateInfo from traffic_control.models.plan import Plan from traffic_control.models.responsible_entity import GroupResponsibleEntity, ResponsibleEntity from traffic_control.models.road_marking import ( diff --git a/traffic_control/models/parking_zone_update_info.py b/traffic_control/models/parking_zone_update_info.py new file mode 100644 index 00000000..de1583d0 --- /dev/null +++ b/traffic_control/models/parking_zone_update_info.py @@ -0,0 +1,15 @@ +import uuid + +from django.db import models + + +class ParkingZoneUpdateInfo(models.Model): + """Just a small informative db table to track changes done when programmatically updating + AdditionalSignReal.additional_information""" + + id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) + start_time = models.DateTimeField() + end_time = models.DateTimeField() + update_infos = models.JSONField(blank=False, null=True) + update_errors = models.JSONField(blank=False, null=True) + database_update = models.BooleanField(default=False) diff --git a/traffic_control/tests/analyze_utils/test_additional_sign_info_enrich.py b/traffic_control/tests/analyze_utils/test_additional_sign_info_enrich.py new file mode 100644 index 00000000..bda2946e --- /dev/null +++ b/traffic_control/tests/analyze_utils/test_additional_sign_info_enrich.py @@ -0,0 +1,274 @@ +import pytest + +from traffic_control.analyze_utils.additional_sign_info_enrich import ( + do_database_update, + get_update_infos, + H20_8_FORMAT_STR, + H20_8S_FORMAT_STR, + H20_71_FORMAT_STR, + H20_71S_FORMAT_STR, + H20_71X_BASIC_ERROR_STR, + H20_72X_BASIC_ERROR_STR, + TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR, +) +from traffic_control.tests.factories import AdditionalSignRealFactory, TrafficControlDeviceTypeFactory + + +def _get_h20_71x_params(dtype_code): + def get_text_part(zone, permit): + if dtype_code == "H20.71": + return H20_71_FORMAT_STR.format(zone=zone, permit=permit) + elif dtype_code == "H20.71S": + return H20_71S_FORMAT_STR.format(zone=zone, permit=permit) + + return ( + ( + dtype_code, + "text:;numbercode:", + "", + [H20_71X_BASIC_ERROR_STR], + ), + ( + dtype_code, + "text:2;numbercode:", + "", + [H20_71X_BASIC_ERROR_STR], + ), + ( + dtype_code, + "", + "", + [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR], + ), + ( + dtype_code, + "text:6 A; numbercode:", + "", + ["Parsed zone is not allowed: 6"], + ), + ( + dtype_code, + "text:3 ASD; numbercode:", + "", + ["Parsed permit code is not allowed: ASD"], + ), + ( + dtype_code, + "text:6 ASD; numbercode:", + "", + ["Parsed permit code is not allowed: ASD", "Parsed zone is not allowed: 6"], + ), + ( + dtype_code, + "text:3 A/B; numbercode:asda", + f"text:{get_text_part(3, 'A/B')}; numbercode:asda", + [], + ), + ( + dtype_code, + "text:3; a/b; numbercode:asda", + f"text:{get_text_part(3, 'A/B')}; numbercode:asda", + [], + ), + ( + dtype_code, + "text:3 A\n/B; numbercode:asda", + f"text:{get_text_part(3, 'A/B')}; numbercode:asda", + [], + ), + ( + dtype_code, + "text:3 ; A/\nB; numbercode:asda", + f"text:{get_text_part(3, 'A/B')}; numbercode:asda", + [], + ), + ) + + +def _get_h20_72x_params(dtype_code): + return ( + (dtype_code, "text:;numbercode:", "", [H20_72X_BASIC_ERROR_STR]), + (dtype_code, "text:2;numbercode:", "", [H20_72X_BASIC_ERROR_STR]), + (dtype_code, "text:2 60;numbercode:", "", [H20_72X_BASIC_ERROR_STR]), + ( + dtype_code, + "", + "", + [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR], + ), + ( + dtype_code, + "text:6 60 min; numbercode:", + "", + ["Parsed zone is not allowed: 6"], + ), + ( + dtype_code, + "text:2 60 sekunttei; numbercode:", + "", + [H20_72X_BASIC_ERROR_STR], + ), + ( + dtype_code, + "text:2 60 min; numbercode:", + "text:Vyöhyke/Zon 2. Kertamaksu enintään/Engångsbetalning max 60 min; numbercode:", + [], + ), + ( + dtype_code, + "text:2 60min; numbercode:", + "text:Vyöhyke/Zon 2. Kertamaksu enintään/Engångsbetalning max 60 min; numbercode:", + [], + ), + ( + dtype_code, + "text:3;50min; numbercode:", + "text:Vyöhyke/Zon 3. Kertamaksu enintään/Engångsbetalning max 50 min; numbercode:", + [], + ), + ) + + +def _get_h20_73x_params(dtype_code): + return ( + ( + dtype_code, + "", + "", + [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR], + ), + ( + dtype_code, + "text:;numbercode:", + "text:Vyöhyke/Zon 1; numbercode:", + [], + ), + ( + dtype_code, + "text:anything;numbercode:anything", + "text:Vyöhyke/Zon 1; numbercode:anything", + [], + ), + ) + + +def _get_h20_74x_params(dtype_code): + return ( + ( + dtype_code, + "", + "", + [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR], + ), + ( + dtype_code, + "text:;numbercode:", + "text:Vyöhyke/Zon 2; numbercode:", + [], + ), + ( + dtype_code, + "text:anything;numbercode:anything", + "text:Vyöhyke/Zon 2; numbercode:anything", + [], + ), + ) + + +def _get_h20_75x_params(dtype_code): + return ( + ( + dtype_code, + "", + "", + [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR], + ), + ( + dtype_code, + "text:;numbercode:", + "text:Vyöhyke/Zon 3; numbercode:", + [], + ), + ( + dtype_code, + "text:anything;numbercode:anything", + "text:Vyöhyke/Zon 3; numbercode:anything", + [], + ), + ) + + +def _get_h20_8x_params(dtype_code): + def get_text_part(permit): + if dtype_code == "H20.8": + return H20_8_FORMAT_STR.format(permit=permit) + elif dtype_code == "H20.8S": + return H20_8S_FORMAT_STR.format(permit=permit) + + return ( + ( + dtype_code, + "", + "", + [TEXT_NUMBERCODE_NOT_FOUND_ERROR_STR], + ), + (dtype_code, "text:;numbercode:", "", ["Could not get permit code from additional_information"]), + (dtype_code, "text: NOTALLOWED; numbercode:", "", ["Parsed permit code is not allowed: NOTALLOWED"]), + (dtype_code, "text: A/B; numbercode:", f"text:{get_text_part('A/B')}; numbercode:", []), + (dtype_code, "text: a/B; numbercode:", f"text:{get_text_part('A/B')}; numbercode:", []), + (dtype_code, "text: \na/\nb; numbercode:", f"text:{get_text_part('A/B')}; numbercode:", []), + ) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "dtype_code, additional_information, expected_new_info, expected_errors", + _get_h20_71x_params("H20.71") + + _get_h20_71x_params("H20.71S") + + _get_h20_72x_params("H20.72") + + _get_h20_72x_params("H20.72S") + + _get_h20_73x_params("H20.73") + + _get_h20_73x_params("H20.73S") + + _get_h20_74x_params("H20.74") + + _get_h20_74x_params("H20.74S") + + _get_h20_75x_params("H20.75") + + _get_h20_75x_params("H20.75S") + + _get_h20_8x_params("H20.8") + + _get_h20_8x_params("H20.8S"), +) +def test_additional_sign_info_enrich(dtype_code, additional_information, expected_new_info, expected_errors): + adsr = AdditionalSignRealFactory(device_type__code=dtype_code, additional_information=additional_information) + update_info = get_update_infos() + assert len(update_info) == 1 + + assert update_info[0].additional_sign_id == str(adsr.id) + assert update_info[0].device_type_code == dtype_code + assert update_info[0].old_additional_information == additional_information + assert update_info[0].new_additional_information == expected_new_info + assert update_info[0].streetsmart_link == adsr.attachment_url + assert update_info[0].errors == expected_errors + + +@pytest.mark.django_db +def test_additional_sign_info_do_database_update(): + dt = TrafficControlDeviceTypeFactory(code="H20.8", content_schema={"jotain": "jossain"}) + adsr1 = AdditionalSignRealFactory( + device_type=dt, additional_information="text: A/B; numbercode:", content_s=None, missing_content=True + ) + adsr2 = AdditionalSignRealFactory( + device_type=dt, additional_information="text: A/B; numbercode:", content_s={"test": "me"}, missing_content=False + ) + update_infos = get_update_infos() + pz_update_info_o = do_database_update(update_infos, []) + adsr1.refresh_from_db() + assert adsr1.missing_content is False + assert adsr1.content_s == {"permit": "A/B"} + assert adsr1.additional_information == "text:Ei koske P-tunnuksella/Gäller ej med P-tecknet A/B; numbercode:" + + adsr2.refresh_from_db() + assert adsr2.missing_content is False + assert adsr2.content_s == {"test": "me"} + assert adsr2.additional_information == "text:Ei koske P-tunnuksella/Gäller ej med P-tecknet A/B; numbercode:" + + assert pz_update_info_o.database_update is True + assert pz_update_info_o.update_infos == update_infos diff --git a/traffic_control/tests/management/commands/test_parking_zone_update.py b/traffic_control/tests/management/commands/test_parking_zone_update.py new file mode 100644 index 00000000..6b10629e --- /dev/null +++ b/traffic_control/tests/management/commands/test_parking_zone_update.py @@ -0,0 +1,26 @@ +import pytest +from django.core.management import call_command + +from traffic_control.models import ParkingZoneUpdateInfo +from traffic_control.tests.factories import AdditionalSignRealFactory, TrafficControlDeviceTypeFactory + + +@pytest.mark.parametrize("update", (True, False)) +@pytest.mark.django_db +def test_parking_zone_update(update): + dt = TrafficControlDeviceTypeFactory(code="H20.8", content_schema={"jotain": "jossain"}) + adsr1 = AdditionalSignRealFactory( + device_type=dt, additional_information="text: A/B; numbercode:", content_s=None, missing_content=True + ) + adsr2 = AdditionalSignRealFactory( + device_type=dt, additional_information="text: A/B; numbercode:", content_s={"test": "me"}, missing_content=False + ) + + call_command("parking_zone_update", update=update) + adsr1.refresh_from_db() + adsr2.refresh_from_db() + + assert ParkingZoneUpdateInfo.objects.count() == 1 + parking_zone_info = ParkingZoneUpdateInfo.objects.first() + assert len(parking_zone_info.update_infos) == 2 + assert parking_zone_info.database_update == update