Skip to content

Commit 039c5c3

Browse files
fix: [#35] use index_name in transformer (#38)
* fix: [#35] use index_name in transformer * fix: [#35] delete index_name from .env * fix: [#35] updates after code review * fix: [#35] batch size improved --------- Co-authored-by: Tobias Schweizer <tobias.schweizer@switch.ch>
1 parent 68fae1c commit 039c5c3

File tree

2 files changed

+25
-14
lines changed

2 files changed

+25
-14
lines changed

src/tasks.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ def __init__(self) -> None:
7777

7878
@celery_app.task(base=TransformTask, bind=True, ignore_result=True)
7979
def transform_batch(self: Any, batch: list[HarvestEventQueue], index_name: str) -> Any:
80+
if not self.client.indices.exists(index=index_name):
81+
raise ValueError(f'Index {index_name} does not exist in OpenSearch')
82+
8083
# transform to JSON and normalize
8184

8285
# Error handling: if an error is thrown, psycopg will roll back the whole transaction and the whole batch fails because the exception is re-raised,

src/transform.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,13 @@
1717
dictConfig(LOGGING_CONFIG)
1818
logger = logging.getLogger(__name__)
1919

20-
BATCH_SIZE = os.environ.get('CELERY_BATCH_SIZE')
21-
if not BATCH_SIZE or not BATCH_SIZE.isnumeric():
22-
raise ValueError('Missing or invalid CELERY_BATCH_SIZE environment variable')
20+
BATCH_SIZE_DEFAULT = 125
21+
batch_size_raw = os.environ.get('CELERY_BATCH_SIZE', BATCH_SIZE_DEFAULT)
22+
23+
try:
24+
BATCH_SIZE = int(batch_size_raw)
25+
except (TypeError, ValueError):
26+
raise ValueError('CELERY_BATCH_SIZE should be an integer')
2327

2428
tags_metadata = [
2529
{
@@ -122,7 +126,6 @@ class HarvestRunCloseRequest(BaseModel):
122126
class HarvestRunCloseResponse(BaseModel):
123127
id: str = Field(description='ID of the closed harvest run')
124128

125-
126129
def get_latest_harvest_run_in_db(harvest_url: str) -> HarvestRunGetResponse:
127130
with psycopg.connect(dbname=postgres_config.user, user=postgres_config.user, host=postgres_config.address,
128131
password=postgres_config.password, port=postgres_config.port, row_factory=dict_row) as conn:
@@ -330,29 +333,32 @@ def get_config_from_db() -> list[EndpointConfig]:
330333
raise HTTPException(status_code=500, detail=str(e))
331334

332335

333-
def create_jobs_in_queue(harvest_run_id: str) -> int:
336+
def create_jobs_in_queue(
337+
harvest_run_id: str,
338+
index_name: str
339+
) -> int:
334340
"""
335341
Creates and enqueues transformation jobs from harvest_events table.
336342
337-
:param harvest_run_id: ID of the harvest run the harvest events belong to..
343+
:param harvest_run_id: ID of the harvest run the harvest events belong to.
344+
:param index_name: Name of the OpenSearch index to use.
338345
:return: Number of batches scheduled for processing.
339346
"""
340347

341348
batch: list[HarvestEventQueue] = []
342349
tasks = 0
343350
offset = 0
344-
limit = int(BATCH_SIZE) if BATCH_SIZE else 250
351+
limit = BATCH_SIZE
345352
fetch = True
346353

347-
logger.info(f'Preparing jobs')
354+
logger.info(f'Preparing jobs for index: {index_name}')
355+
348356

349357
with psycopg.connect(dbname=postgres_config.user, user=postgres_config.user, host=postgres_config.address,
350358
password=postgres_config.password, port=postgres_config.port, row_factory=dict_row) as conn:
351359

352-
cur = conn.cursor()
353-
354-
# print(db)
355360

361+
cur = conn.cursor()
356362
while fetch:
357363

358364
cur.execute("""
@@ -396,7 +402,7 @@ def create_jobs_in_queue(harvest_run_id: str) -> int:
396402

397403
# https://docs.celeryq.dev/en/stable/getting-started/first-steps-with-celery.html#keeping-results
398404
logger.info(f'Putting batch of {len(batch)} in queue with offset {offset}')
399-
transform_batch.delay(batch, 'test_datacite')
405+
transform_batch.delay(batch, index_name)
400406
tasks += 1
401407

402408
# increment offset by limit
@@ -411,11 +417,13 @@ def create_jobs_in_queue(harvest_run_id: str) -> int:
411417

412418
@app.get('/index', tags=['index'])
413419
def init_index(
414-
harvest_run_id: str = Query(default=None, description='Id of the harvest run to be indexed')) -> IndexGetResponse:
420+
harvest_run_id: str = Query(default=None, description='Id of the harvest run to be indexed'),
421+
index_name: str = Query(default=None, description='Name of the OpenSearch index to use for indexing')
422+
) -> IndexGetResponse:
415423
# this long-running method is synchronous and runs in an external threadpool, see https://fastapi.tiangolo.com/async/#path-operation-functions
416424
# this way, it does not block the server
417425
try:
418-
results = create_jobs_in_queue(harvest_run_id)
426+
results = create_jobs_in_queue(harvest_run_id, index_name)
419427
except Exception as e:
420428
logger.exception("Indexing failed")
421429
raise HTTPException(status_code=500, detail=str(e))

0 commit comments

Comments
 (0)