Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Dynamic version in user agent [#328](https://github.com/datagouv/hydra/pull/328) and [#337](https://github.com/datagouv/hydra/pull/337)
- Add performance tests for csv to geojson and geojson to pmtiles conversion on geographical big data, and CLI commands to convert geographical files locally [#319](https://github.com/datagouv/hydra/pull/319)
- Allow crawling of non datagouv URLs with CLI [#312](https://github.com/datagouv/hydra/pull/312)
- Fix CRUD types returns and reduce DB queries when cleanup statuses [#331](https://github.com/datagouv/hydra/pull/331)

## 2.4.1 (2025-09-03)

Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ async def _fake_check(
"geojson_url": "https://example.org/file.geojson" if pmtiles_url else None,
"geojson_size": 1024 if geojson_url else None,
}
check: dict = await Check.insert(data=data, returning="*")
check: dict = await Check.insert(data=data, returning="*", as_dict=True)
data["id"] = check["id"]
if check.get("dataset_id"):
data["dataset_id"] = check["dataset_id"]
Expand Down
4 changes: 2 additions & 2 deletions tests/test_crawl/test_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,10 @@ async def test_wrong_url_in_catalog(
if url_changed:
r = await Resource.get(resource_id=RESOURCE_ID, column_name="url")
assert r["url"] == new_url
check = await Check.get_by_resource_id(RESOURCE_ID)
check = await Check.get_by_resource_id(RESOURCE_ID, as_dict=True)
assert check.get("parsing_finished_at")
else:
check = await Check.get_by_resource_id(RESOURCE_ID)
check = await Check.get_by_resource_id(RESOURCE_ID, as_dict=True)
assert check["status"] == 404


Expand Down
11 changes: 7 additions & 4 deletions udata_hydra/analysis/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ async def analyse_csv(
table_name = hashlib.md5(url.encode("utf-8")).hexdigest()
timer.mark("download-file")

check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)})
check = await Check.update(
check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, as_dict=True
) # type: ignore

# Launch csv-detective against given file
try:
Expand Down Expand Up @@ -156,7 +158,7 @@ async def analyse_csv(
resource_id=resource_id,
debug_insert=debug_insert,
)
check = await Check.update(check["id"], {"parsing_table": table_name})
check = await Check.update(check["id"], {"parsing_table": table_name}, as_dict=True) # type: ignore
timer.mark("csv-to-db")

try:
Expand Down Expand Up @@ -200,7 +202,8 @@ async def analyse_csv(
{
"parsing_finished_at": datetime.now(timezone.utc),
},
)
as_dict=True,
) # type: ignore
await csv_to_db_index(table_name, csv_inspection, check)

except (ParseException, IOException) as e:
Expand Down Expand Up @@ -442,7 +445,7 @@ async def csv_to_db(
await db.execute(q, *data.values())


async def csv_to_db_index(table_name: str, inspection: dict, check: Record) -> None:
async def csv_to_db_index(table_name: str, inspection: dict, check: dict) -> None:
"""Store meta info about a converted CSV table in `DATABASE_URL_CSV.tables_index`"""
db = await context.pool("csv")
q = "INSERT INTO tables_index(parsing_table, csv_detective, resource_id, url) VALUES($1, $2, $3, $4)"
Expand Down
7 changes: 5 additions & 2 deletions udata_hydra/analysis/geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ async def analyse_geojson(
)
timer.mark("download-file")

check = await Check.update(check["id"], {"parsing_started_at": datetime.now(timezone.utc)})
check = await Check.update(
check["id"], {"parsing_started_at": datetime.now(timezone.utc)}, as_dict=True
) # type: ignore

# Convert to PMTiles
try:
Expand All @@ -93,7 +95,8 @@ async def analyse_geojson(
"pmtiles_url": pmtiles_url,
"pmtiles_size": pmtiles_size,
},
)
as_dict=True,
) # type: ignore

except (ParseException, IOException) as e:
check = await handle_parse_exception(e, None, check)
Expand Down
28 changes: 13 additions & 15 deletions udata_hydra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ async def check_resource(resource_id: str, method: str = "get", force_analysis:
@cli(name="analyse-resource")
async def analyse_resource_cli(resource_id: str):
"""Trigger a resource analysis, mainly useful for local debug (with breakpoints)"""
check: Record | None = await Check.get_by_resource_id(resource_id)
check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore
if not check:
log.error("Could not find a check linked to the specified resource ID")
return
Expand All @@ -210,21 +210,19 @@ async def analyse_csv_cli(

# Try to get check from check_id
if check_id:
record = await Check.get_by_id(int(check_id), with_deleted=True)
check = dict(record) if record else None
check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) # type: ignore

# Try to get check from URL
if not check and url:
records = await Check.get_by_url(url)
if records:
if len(records) > 1:
checks: list[Record] | None = await Check.get_by_url(url) # type: ignore
if checks:
if len(checks) > 1:
log.warning(f"Multiple checks found for URL {url}, using the latest one")
check = dict(records[0])
check = checks[0]

# Try to get check from resource_id
if not check and resource_id:
record = await Check.get_by_resource_id(resource_id)
check = dict(record) if record else None
check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore

# We cannot get a check, it's an external URL analysis, we need to create a temporary check
if not check and url:
Expand All @@ -239,13 +237,13 @@ async def analyse_csv_cli(
"timeout": False,
},
returning="*",
)
) # type: ignore

elif not check:
log.error("Could not find a check for the specified parameters")
return

await analyse_csv(check=check, debug_insert=debug_insert)
await analyse_csv(check=dict(check), debug_insert=debug_insert)
log.info("CSV analysis completed")

if url and tmp_resource_id:
Expand All @@ -259,7 +257,7 @@ async def analyse_csv_cli(
await csv_pool.execute(f"DELETE FROM tables_index WHERE parsing_table='{table_hash}'")

# Clean up the temporary resource and temporary check from catalog
check = await Check.get_by_resource_id(tmp_resource_id)
check: Record | None = await Check.get_by_resource_id(tmp_resource_id) # type: ignore
if check:
await Check.delete(check["id"])
await Resource.delete(resource_id=tmp_resource_id, hard_delete=True)
Expand All @@ -284,14 +282,14 @@ async def analyse_geojson_cli(
assert check_id or url or resource_id
check = None
if check_id:
check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True)
check: Record | None = await Check.get_by_id(int(check_id), with_deleted=True) # type: ignore
if not check and url:
checks: list[Record] | None = await Check.get_by_url(url)
checks: list[Record] | None = await Check.get_by_url(url) # type: ignore
if checks and len(checks) > 1:
log.warning(f"Multiple checks found for URL {url}, using the latest one")
check = checks[0] if checks else None
if not check and resource_id:
check: Record | None = await Check.get_by_resource_id(resource_id)
check: Record | None = await Check.get_by_resource_id(resource_id) # type: ignore
if not check:
if check_id:
log.error("Could not retrieve the specified check")
Expand Down
12 changes: 5 additions & 7 deletions udata_hydra/crawl/preprocess_check_data.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import json
from datetime import datetime, timezone

from asyncpg import Record

from udata_hydra.crawl.calculate_next_check import calculate_next_check_date
from udata_hydra.crawl.helpers import get_content_type_from_header, is_valid_status
from udata_hydra.db.check import Check
Expand All @@ -26,14 +24,14 @@ async def preprocess_check_data(dataset_id: str, check_data: dict) -> tuple[dict

check_data["resource_id"] = str(check_data["resource_id"])

last_check: dict | None = None
last_check_record: Record | None = await Check.get_by_resource_id(check_data["resource_id"])
if last_check_record:
last_check = dict(last_check_record)
last_check: dict | None = await Check.get_by_resource_id(
check_data["resource_id"], as_dict=True
) # type: ignore

has_changed: bool = await has_check_changed(check_data, last_check)
check_data["next_check_at"] = calculate_next_check_date(has_changed, last_check, None)
new_check: dict = await Check.insert(data=check_data, returning="*")

new_check: dict = await Check.insert(data=check_data, returning="*", as_dict=True) # type: ignore

if has_changed:
queue.enqueue(
Expand Down
75 changes: 50 additions & 25 deletions udata_hydra/db/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,24 @@
class Check:
"""Represents a check in the "checks" DB table"""

@staticmethod
def _convert_to_dict_if_needed(result: Record | None, as_dict: bool) -> Record | dict | None:
if as_dict and result:
return dict(result)
return result

@staticmethod
def _convert_list_to_dict_if_needed(
results: list[Record], as_dict: bool
) -> list[Record] | list[dict]:
if as_dict:
return [dict(result) for result in results]
return results

@classmethod
async def get_by_id(cls, check_id: int, with_deleted: bool = False) -> Record | None:
async def get_by_id(
cls, check_id: int, with_deleted: bool = False, as_dict: bool = False
) -> Record | dict | None:
pool = await context.pool()
async with pool.acquire() as connection:
q = """
Expand All @@ -24,12 +40,13 @@ async def get_by_id(cls, check_id: int, with_deleted: bool = False) -> Record |
"""
if not with_deleted:
q += " AND catalog.deleted = FALSE"
return await connection.fetchrow(q, check_id)
result = await connection.fetchrow(q, check_id)
return cls._convert_to_dict_if_needed(result, as_dict)

@classmethod
async def get_by_resource_id(
cls, resource_id: str, with_deleted: bool = False
) -> Record | None:
cls, resource_id: str, with_deleted: bool = False, as_dict: bool = False
) -> Record | dict | None:
pool = await context.pool()
async with pool.acquire() as connection:
q = """
Expand All @@ -39,23 +56,25 @@ async def get_by_resource_id(
"""
if not with_deleted:
q += " AND catalog.deleted = FALSE"
return await connection.fetchrow(q, resource_id)
result = await connection.fetchrow(q, resource_id)
return cls._convert_to_dict_if_needed(result, as_dict)

@classmethod
async def get_by_url(cls, url: str) -> list[Record]:
async def get_by_url(cls, url: str, as_dict: bool = False) -> list[Record] | list[dict]:
pool = await context.pool()
async with pool.acquire() as connection:
q = """
SELECT * FROM checks
WHERE url = $1
ORDER BY created_at DESC
"""
return await connection.fetch(q, url)
results = await connection.fetch(q, url)
return cls._convert_list_to_dict_if_needed(results, as_dict)

@classmethod
async def get_latest(
cls, url: str | None = None, resource_id: str | None = None
) -> Record | None:
cls, url: str | None = None, resource_id: str | None = None, as_dict: bool = False
) -> Record | dict | None:
column: str = "url" if url else "resource_id"
pool = await context.pool()
async with pool.acquire() as connection:
Expand All @@ -66,10 +85,13 @@ async def get_latest(
WHERE catalog.{column} = $1
AND checks.id = catalog.last_check
"""
return await connection.fetchrow(q, url or resource_id)
result = await connection.fetchrow(q, url or resource_id)
return cls._convert_to_dict_if_needed(result, as_dict)

@classmethod
async def get_all(cls, url: str | None = None, resource_id: str | None = None) -> list[Record]:
async def get_all(
cls, url: str | None = None, resource_id: str | None = None, as_dict: bool = False
) -> list[Record] | list[dict]:
column: str = "url" if url else "resource_id"
pool = await context.pool()
async with pool.acquire() as connection:
Expand All @@ -81,12 +103,13 @@ async def get_all(cls, url: str | None = None, resource_id: str | None = None) -
AND catalog.{column} = checks.{column}
ORDER BY created_at DESC
"""
return await connection.fetch(q, url or resource_id)
results = await connection.fetch(q, url or resource_id)
return cls._convert_list_to_dict_if_needed(results, as_dict)

@classmethod
async def get_group_by_for_date(
cls, column: str, date: date, page_size: int = 20
) -> list[Record]:
cls, column: str, date: date, page_size: int = 20, as_dict: bool = False
) -> list[Record] | list[dict]:
pool = await context.pool()
async with pool.acquire() as connection:
q = f"""
Expand All @@ -97,22 +120,20 @@ async def get_group_by_for_date(
ORDER BY count desc
LIMIT $2
"""
return await connection.fetch(q, date, page_size)
results = await connection.fetch(q, date, page_size)
return cls._convert_list_to_dict_if_needed(results, as_dict)

@classmethod
async def insert(cls, data: dict, returning: str = "id") -> dict:
async def insert(cls, data: dict, returning: str = "id", as_dict: bool = True) -> Record | dict:
"""
Insert a new check in DB, associate it with the resource and return the check dict, optionally associated with the resource dataset_id.
This uses the info from the last check of the same resource.

Note: Returns dict instead of Record because this method performs additional operations beyond simple insertion (joins with catalog table, adds dataset_id).
"""
json_data = convert_dict_values_to_json(data)
q1: str = compute_insert_query(table_name="checks", data=json_data, returning=returning)
pool = await context.pool()
async with pool.acquire() as connection:
last_check: Record = await connection.fetchrow(q1, *json_data.values())
last_check_dict = dict(last_check)
q2 = (
"""UPDATE catalog SET last_check = $1 WHERE resource_id = $2 RETURNING dataset_id"""
)
Expand All @@ -121,17 +142,21 @@ async def insert(cls, data: dict, returning: str = "id") -> dict:
)
# Add the dataset_id arg to the check response, if we can, and if it's asked
if returning in ["*", "dataset_id"] and updated_resource:
last_check_dict = dict(last_check)
last_check_dict["dataset_id"] = updated_resource["dataset_id"]
return last_check_dict
return last_check_dict if as_dict else last_check
return dict(last_check) if as_dict else last_check

@classmethod
async def update(cls, check_id: int, data: dict) -> Record | None:
"""Update a check in DB with new data and return the check id in DB"""
return await update_table_record(table_name="checks", record_id=check_id, data=data)
async def update(cls, check_id: int, data: dict, as_dict: bool = False) -> Record | dict | None:
check: Record | None = await update_table_record(
table_name="checks", record_id=check_id, data=data
)
return cls._convert_to_dict_if_needed(check, as_dict)

@classmethod
async def delete(cls, check_id: int) -> int:
async def delete(cls, check_id: int) -> None:
pool = await context.pool()
async with pool.acquire() as connection:
q = """DELETE FROM checks WHERE id = $1"""
return await connection.fetch(q, check_id)
await connection.execute(q, check_id)
Loading