Skip to content

Commit 9e85074

Browse files
committed
add no timescale option
1 parent 806c6d2 commit 9e85074

18 files changed

+385
-88
lines changed

database/db_job.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from uuid import UUID
1010
from psycopg.rows import dict_row
1111

12-
from ..helper.database import Database
12+
from ..helper.database import Database, DatabaseConfiguration
1313
from ..helper.error import QueuerError
1414
from ..helper.sql import SQLLoader, run_ddl
1515
from ..model.job import Job
@@ -25,7 +25,7 @@ class JobDBHandler:
2525
def __init__(
2626
self,
2727
db_connection: Database,
28-
with_table_drop: bool = False,
28+
db_config: DatabaseConfiguration,
2929
encryption_key: str = "",
3030
):
3131
"""Initialize job database handler."""
@@ -35,12 +35,13 @@ def __init__(
3535
raise ValueError("Database connection is not established")
3636

3737
self.encryption_key: str = encryption_key
38+
self.with_timescale: bool = db_config.with_timescale
3839

3940
sql_loader: SQLLoader = SQLLoader()
40-
sql_loader.load_notify_sql(self.db.instance, force=with_table_drop)
41-
sql_loader.load_job_sql(self.db.instance, force=with_table_drop)
41+
sql_loader.load_notify_sql(self.db.instance, force=db_config.with_table_drop)
42+
sql_loader.load_job_sql(self.db.instance, force=db_config.with_table_drop)
4243

43-
if with_table_drop:
44+
if db_config.with_table_drop:
4445
self.drop_tables()
4546

4647
self.create_table()
@@ -72,7 +73,7 @@ def create_table(self) -> None:
7273
if self.db.instance is None:
7374
raise ValueError("Database connection is not established")
7475

75-
run_ddl(self.db.instance, "SELECT init_job();")
76+
run_ddl(self.db.instance, f"SELECT init_job({self.with_timescale});")
7677

7778
def drop_tables(self) -> None:
7879
"""Drop job tables with DDL deadlock protection."""
@@ -379,9 +380,15 @@ def add_retention_archive(self, days: int) -> None:
379380
if self.db.instance is None:
380381
raise ValueError("Database connection is not established")
381382

382-
with self.db.instance.cursor() as cur:
383-
cur.execute("SELECT add_retention_archive(%s);", (days,))
384-
self.db.instance.commit()
383+
try:
384+
with self.db.instance.cursor() as cur:
385+
cur.execute(
386+
"SELECT add_retention_archive(%s, %s);", (days, self.with_timescale)
387+
)
388+
self.db.instance.commit()
389+
except Exception as e:
390+
self.db.instance.rollback()
391+
raise e
385392

386393
def remove_retention_archive(self) -> None:
387394
"""
@@ -391,9 +398,15 @@ def remove_retention_archive(self) -> None:
391398
if self.db.instance is None:
392399
raise ValueError("Database connection is not established")
393400

394-
with self.db.instance.cursor() as cur:
395-
cur.execute("SELECT remove_retention_archive();")
396-
self.db.instance.commit()
401+
try:
402+
with self.db.instance.cursor() as cur:
403+
cur.execute(
404+
"SELECT remove_retention_archive(%s);", (self.with_timescale,)
405+
)
406+
self.db.instance.commit()
407+
except Exception as e:
408+
self.db.instance.rollback()
409+
raise e
397410

398411
def select_job_from_archive(self, rid: UUID) -> Optional[Job]:
399412
"""

database/db_job_test.py

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@
99

1010
from .db_job import JobDBHandler
1111
from .db_worker import WorkerDBHandler
12-
from ..helper.test_database import DatabaseTestMixin
12+
from ..helper.test_database import TimescaleTestMixin, PostgresTestMixin
1313
from ..model.job import Job, JobStatus, new_job
1414
from ..model.options import Options, Schedule
1515
from ..model.options_on_error import OnError, RetryBackoff
1616
from ..model.worker import new_worker
1717

1818

19-
class TestJobDBHandler(DatabaseTestMixin, unittest.TestCase):
19+
class TestJobDBHandler(TimescaleTestMixin, unittest.TestCase):
2020
"""Test JobDBHandler matching Go's dbJob_test.go exactly."""
2121

2222
@classmethod
@@ -33,7 +33,7 @@ def setUp(self):
3333
# NewJobDBHandler
3434
def test_new_job_db_handler_valid_call(self):
3535
"""Test valid call to create new JobDBHandler."""
36-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
36+
job_db_handler = JobDBHandler(self.db, self.db_config)
3737

3838
self.assertIsNotNone(
3939
job_db_handler, "Expected JobDBHandler to return a non-nil instance"
@@ -55,7 +55,7 @@ def test_new_job_db_handler_valid_call(self):
5555
def test_new_job_db_handler_invalid_call_with_nil_database(self):
5656
"""Test invalid call with None database."""
5757
with self.assertRaises(AttributeError) as context:
58-
JobDBHandler(None, with_table_drop=True) # type: ignore
58+
JobDBHandler(None, self.db_config) # type: ignore
5959
# The error should be about accessing 'instance' attribute on NoneType
6060
self.assertIn(
6161
"'NoneType' object has no attribute 'instance'", str(context.exception)
@@ -64,27 +64,27 @@ def test_new_job_db_handler_invalid_call_with_nil_database(self):
6464
# CheckTableExistance
6565
def test_check_table_existence(self):
6666
"""Test checking job table existence."""
67-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
67+
job_db_handler = JobDBHandler(self.db, self.db_config)
6868

6969
exists = job_db_handler.check_tables_existence()
7070
self.assertTrue(exists, "Expected job table to exist")
7171

7272
# CreateTable
7373
def test_create_table(self):
7474
"""Test creating job table."""
75-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
75+
job_db_handler = JobDBHandler(self.db, self.db_config)
7676
job_db_handler.create_table()
7777

7878
# DropTable
7979
def test_drop_table(self):
8080
"""Test dropping job table."""
81-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
81+
job_db_handler = JobDBHandler(self.db, self.db_config)
8282
job_db_handler.drop_tables()
8383

8484
# InsertJob
8585
def test_insert_job(self):
8686
"""Test inserting a job."""
87-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
87+
job_db_handler = JobDBHandler(self.db, self.db_config)
8888

8989
job = new_job("TestTask", None)
9090

@@ -107,7 +107,7 @@ def test_insert_job(self):
107107
# InsertJobTx
108108
def test_insert_job_tx(self):
109109
"""Test inserting a job using transaction."""
110-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
110+
job_db_handler = JobDBHandler(self.db, self.db_config)
111111

112112
job = new_job("TestTask", None)
113113

@@ -132,7 +132,7 @@ def test_insert_job_tx(self):
132132
# BatchInsertJobs (all 4 sub-tests)
133133
def test_batch_insert_jobs_successful(self):
134134
"""Test successful batch insert jobs."""
135-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
135+
job_db_handler = JobDBHandler(self.db, self.db_config)
136136

137137
job_count = 5
138138
jobs: List[Job] = []
@@ -144,7 +144,7 @@ def test_batch_insert_jobs_successful(self):
144144

145145
def test_batch_insert_jobs_with_error_options(self):
146146
"""Test successful batch insert jobs with error options."""
147-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
147+
job_db_handler = JobDBHandler(self.db, self.db_config)
148148

149149
job_count = 5
150150
jobs: List[Job] = []
@@ -164,7 +164,7 @@ def test_batch_insert_jobs_with_error_options(self):
164164

165165
def test_batch_insert_jobs_with_schedule_options(self):
166166
"""Test successful batch insert jobs with schedule options."""
167-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
167+
job_db_handler = JobDBHandler(self.db, self.db_config)
168168

169169
job_count = 5
170170
jobs: List[Job] = []
@@ -184,7 +184,7 @@ def test_batch_insert_jobs_with_schedule_options(self):
184184

185185
def test_batch_insert_jobs_with_parameters(self):
186186
"""Test successful batch insert jobs with parameters."""
187-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
187+
job_db_handler = JobDBHandler(self.db, self.db_config)
188188

189189
job_count = 5
190190
jobs: List[Job] = []
@@ -210,7 +210,7 @@ def test_update_jobs_initial(self):
210210
self.assertIsNotNone(updated_worker, "Expected worker to be updated")
211211

212212
# Now we can proceed with the job insertion and update
213-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
213+
job_db_handler = JobDBHandler(self.db, self.db_config)
214214

215215
job = new_job("TestTask", None)
216216
inserted_job = job_db_handler.insert_job(job)
@@ -246,7 +246,7 @@ def test_update_jobs_initial(self):
246246
# UpdateJobFinal
247247
def test_update_job_final(self):
248248
"""Test updating job final."""
249-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
249+
job_db_handler = JobDBHandler(self.db, self.db_config)
250250

251251
job = new_job("TestTask", None)
252252
inserted_job = job_db_handler.insert_job(job)
@@ -271,7 +271,7 @@ def test_update_job_final(self):
271271
# TestUpdateStaleJobs
272272
def test_update_stale_jobs(self):
273273
"""Test updating stale jobs functionality."""
274-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
274+
job_db_handler = JobDBHandler(self.db, self.db_config)
275275

276276
updated_count = job_db_handler.update_stale_jobs()
277277

@@ -282,7 +282,7 @@ def test_update_stale_jobs(self):
282282
# DeleteJob
283283
def test_delete_job(self):
284284
"""Test deleting a job from archive."""
285-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
285+
job_db_handler = JobDBHandler(self.db, self.db_config)
286286

287287
job = new_job("TestTask", None)
288288
inserted_job = job_db_handler.insert_job(job)
@@ -303,7 +303,7 @@ def test_delete_job(self):
303303
# SelectJob
304304
def test_select_job(self):
305305
"""Test selecting a job."""
306-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
306+
job_db_handler = JobDBHandler(self.db, self.db_config)
307307

308308
job = new_job("TestTask", None)
309309
inserted_job = job_db_handler.insert_job(job)
@@ -322,7 +322,7 @@ def test_select_job(self):
322322
def test_select_all_jobs(self):
323323
"""Test selecting all jobs."""
324324
new_job_count = 5
325-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
325+
job_db_handler = JobDBHandler(self.db, self.db_config)
326326

327327
for i in range(new_job_count):
328328
job = new_job(f"TestJob{i}", None)
@@ -346,7 +346,7 @@ def test_select_all_jobs_by_worker_rid(self):
346346
"""Test selecting jobs by worker RID."""
347347
# This test requires worker functionality
348348
worker_db_handler = WorkerDBHandler(self.db, with_table_drop=True)
349-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
349+
job_db_handler = JobDBHandler(self.db, self.db_config)
350350

351351
# Create a worker with available tasks
352352
worker = new_worker("TestWorker", 2)
@@ -388,7 +388,7 @@ def test_select_all_jobs_by_search(self):
388388
new_job_count_search = 5
389389
new_job_count_other = 3
390390

391-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
391+
job_db_handler = JobDBHandler(self.db, self.db_config)
392392

393393
# Insert multiple jobs with different names
394394
for _ in range(new_job_count_search):
@@ -419,7 +419,7 @@ def test_select_all_jobs_by_search(self):
419419
# AddRetentionArchive
420420
def test_add_retention_archive(self):
421421
"""Test adding retention policy for archive cleanup"""
422-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
422+
job_db_handler = JobDBHandler(self.db, self.db_config)
423423

424424
# Test adding retention policy with 30 days
425425
retention_days = 30
@@ -429,7 +429,7 @@ def test_add_retention_archive(self):
429429
# RemoveRetentionArchive
430430
def test_remove_retention_archive(self):
431431
"""Test removing retention policy for archive cleanup"""
432-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
432+
job_db_handler = JobDBHandler(self.db, self.db_config)
433433

434434
# First add a retention policy
435435
job_db_handler.add_retention_archive(30)
@@ -441,7 +441,7 @@ def test_remove_retention_archive(self):
441441
# SelectJobFromArchive
442442
def test_select_job_from_archive(self):
443443
"""Test selecting job from archive."""
444-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
444+
job_db_handler = JobDBHandler(self.db, self.db_config)
445445

446446
job = new_job("TestTask", None)
447447
inserted_job = job_db_handler.insert_job(job)
@@ -471,7 +471,7 @@ def test_select_job_from_archive(self):
471471
# SelectAllJobsFromArchive
472472
def test_select_all_jobs_from_archive(self):
473473
"""Test selecting all jobs from archive."""
474-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
474+
job_db_handler = JobDBHandler(self.db, self.db_config)
475475

476476
new_job_count = 5
477477
for i in range(new_job_count):
@@ -506,7 +506,7 @@ def test_select_all_jobs_from_archive_by_search(self):
506506
new_job_count_search = 5
507507
new_job_count_other = 3
508508

509-
job_db_handler = JobDBHandler(self.db, with_table_drop=True)
509+
job_db_handler = JobDBHandler(self.db, self.db_config)
510510

511511
# Insert multiple jobs with different names
512512
for _ in range(new_job_count_search):
@@ -550,7 +550,7 @@ def test_select_all_jobs_from_archive_by_search(self):
550550
def test_update_job_final_encrypted(self):
551551
"""Test updating job final with encryption"""
552552
job_db_handler = JobDBHandler(
553-
self.db, with_table_drop=True, encryption_key="test-encryption-key"
553+
self.db, self.db_config, encryption_key="test-encryption-key"
554554
)
555555

556556
task_name = "TestTask"
@@ -578,7 +578,7 @@ def test_update_job_final_encrypted(self):
578578
def test_select_job_encrypted(self):
579579
"""Test selecting job with encryption"""
580580
job_db_handler = JobDBHandler(
581-
self.db, with_table_drop=True, encryption_key="test-encryption-key"
581+
self.db, self.db_config, encryption_key="test-encryption-key"
582582
)
583583

584584
task_name = "TestTask"
@@ -608,7 +608,7 @@ def test_select_job_encrypted(self):
608608
def test_select_all_jobs_encrypted(self):
609609
"""Test selecting all jobs with encryption"""
610610
job_db_handler = JobDBHandler(
611-
self.db, with_table_drop=True, encryption_key="test-encryption-key"
611+
self.db, self.db_config, encryption_key="test-encryption-key"
612612
)
613613

614614
new_job_count = 5

database/db_listener_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010

1111
from .db_listener import QueuerListener, new_queuer_db_listener
1212
from ..helper.database import DatabaseConfiguration
13-
from ..helper.test_database import DatabaseTestMixin
13+
from ..helper.test_database import TimescaleTestMixin
1414
from ..helper.error import QueuerError
1515

1616

17-
class TestQueuerListenerWithContainer(DatabaseTestMixin):
17+
class TestQueuerListenerWithContainer(TimescaleTestMixin):
1818
"""Test class for async QueuerListener functionality using real PostgreSQL containers."""
1919

2020
def setup_method(self, method: Any = None):

database/db_master_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
import uuid
88

99
from .db_master import MasterDBHandler
10-
from ..helper.test_database import DatabaseTestMixin
10+
from ..helper.test_database import TimescaleTestMixin
1111
from ..model.master import MasterSettings
1212
from ..model.worker import Worker
1313

1414

15-
class TestMasterDBHandler(DatabaseTestMixin, unittest.TestCase):
15+
class TestMasterDBHandler(TimescaleTestMixin, unittest.TestCase):
1616
"""Test cases for MasterDBHandler. Mirrors Go's dbMaster_test.go."""
1717

1818
@classmethod

database/db_worker_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
from datetime import datetime, timedelta, timezone
99

1010
from .db_worker import WorkerDBHandler
11-
from ..helper.test_database import DatabaseTestMixin
11+
from ..helper.test_database import TimescaleTestMixin
1212
from ..model.worker import Worker, WorkerStatus
1313

1414

15-
class TestWorkerDBHandler(DatabaseTestMixin, unittest.TestCase):
15+
class TestWorkerDBHandler(TimescaleTestMixin, unittest.TestCase):
1616
"""Test WorkerDBHandler with real database using testcontainers."""
1717

1818
@classmethod

0 commit comments

Comments
 (0)