From 033802bca157846a0fe002139044346926943d46 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Tue, 7 Jan 2025 12:55:13 +0300 Subject: [PATCH 1/6] Experimental feature: loop for crashed tasks --- karton/core/karton.py | 78 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/karton/core/karton.py b/karton/core/karton.py index 5d819b9c..1b069de4 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -346,6 +346,84 @@ def loop(self) -> None: if task: self.internal_process(task) + def loop_crashed_tasks(self) -> None: + """ + Blocking loop that consumes crashed tasks. + It's preferable for debugging issueses occured with your service. + + Consumer.loop_crashed_tasks is different from Consumer.loop: + - It does not rely on `karton.queue`. It finds crashed doc in `karton.task`. + So RUN ONLY ONE REPLICA to avoid race condition + and large resource consumption. + - It does not rely in task_timeout. + - It does not register new binds. + - It does not shut down another instances on binds / version mismatch. + - It does not listen queue in a traditional way / it dows not subsribe. + It looks for tasks in `CRASHED` state. + - It does not increment `TASK_CRASHED` metrics. + - It reimplements `Cunsumer.internal_process` in simplified way. + + :meta private: + """ + self.log.info("Service %s started in crash-consume mode", self.identity) + + with self.graceful_killer(): + while not self.shutdown: + + task: Task + for task in self.backend.iter_all_tasks(parse_resources=False): + if task.headers["receiver"] == self.identity: + break + else: + self.log.warning( + "Crashed task for consumer %s not found." + "Sleeping and trying again..." % (self.identity,) + ) + time.sleep(5) + continue + + self.current_task = task + self.log_handler.set_task(self.current_task) + + exception_str = None + + try: + self.log.info("Received new task - %s", self.current_task.uid) + self.backend.set_task_status(self.current_task, TaskState.STARTED) + + self._run_pre_hooks() + + saved_exception = None + try: + self.process(self.current_task) + except Exception as exc: + saved_exception = exc + raise + finally: + self._run_post_hooks(saved_exception) + + self.log.info("Task done - %s", self.current_task.uid) + except Exception: + exc_info = sys.exc_info() + exception_str = traceback.format_exception(*exc_info) + self.log.exception( + "Failed to process task - %s", self.current_task.uid + ) + finally: + self.backend.increment_metrics( + KartonMetrics.TASK_CONSUMED, self.identity + ) + + task_state = TaskState.FINISHED + + # report the task status as crashed + # if an exception was caught while processing + if exception_str is not None: + task_state = TaskState.CRASHED + self.current_task.error = exception_str + + self.backend.set_task_status(self.current_task, task_state) + class LogConsumer(KartonServiceBase): """ From 6073d1a3cd881dbb9918c8cd5a40267d1c045222 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Tue, 7 Jan 2025 13:18:03 +0300 Subject: [PATCH 2/6] Do not make task STARTED for not losing the task in this status --- karton/core/karton.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index 1b069de4..ddf01c46 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -389,7 +389,7 @@ def loop_crashed_tasks(self) -> None: try: self.log.info("Received new task - %s", self.current_task.uid) - self.backend.set_task_status(self.current_task, TaskState.STARTED) + #self.backend.set_task_status(self.current_task, TaskState.STARTED) self._run_pre_hooks() From 944a4cbcbf577fc26b73fc0e307f34aca4a8e44d Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Tue, 7 Jan 2025 13:20:52 +0300 Subject: [PATCH 3/6] linting --- karton/core/karton.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index ddf01c46..1414331c 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -389,7 +389,7 @@ def loop_crashed_tasks(self) -> None: try: self.log.info("Received new task - %s", self.current_task.uid) - #self.backend.set_task_status(self.current_task, TaskState.STARTED) + # self.backend.set_task_status(self.current_task, TaskState.STARTED) self._run_pre_hooks() From 4d1dbcadcd44a1015979eed5745bed5b1f140f42 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Fri, 21 Feb 2025 10:33:44 +0300 Subject: [PATCH 4/6] Fix cases when there is no 'receiver' field in headers yet --- karton/core/karton.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index 361fdf7e..d65f98ef 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -1,6 +1,7 @@ """ Base library for karton subsystems. """ + import abc import argparse import sys @@ -377,7 +378,7 @@ def loop_crashed_tasks(self) -> None: task: Task for task in self.backend.iter_all_tasks(parse_resources=False): - if task.headers["receiver"] == self.identity: + if task.headers.get("receiver") == self.identity: break else: self.log.warning( From ed00175b7ccffac4a7053301250b58bc61aebd6d Mon Sep 17 00:00:00 2001 From: Rakovskij Stanislav Date: Wed, 2 Apr 2025 10:52:41 +0300 Subject: [PATCH 5/6] Add check that task.status is TaskState.CRASHED --- karton/system/system.py | 83 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/karton/system/system.py b/karton/system/system.py index 844c0875..2389d102 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -324,6 +324,89 @@ def loop(self) -> None: if not self.enable_router: time.sleep(1) # Avoid a busy loop + def loop_crashed_tasks(self) -> None: + """ + Blocking loop that consumes crashed tasks. + It's preferable for debugging issueses occured with your service. + Consumer.loop_crashed_tasks is different from Consumer.loop: + - It does not rely on `karton.queue`. It finds crashed doc in `karton.task`. + So RUN ONLY ONE REPLICA to avoid race condition + and large resource consumption. + - It does not rely in task_timeout. + - It does not register new binds. + - It does not shut down another instances on binds / version mismatch. + - It does not listen queue in a traditional way / it dows not subsribe. + It looks for tasks in `CRASHED` state. + - It does not increment `TASK_CRASHED` metrics. + - It reimplements `Cunsumer.internal_process` in simplified way. + :meta private: + """ + import sys + import traceback + + from karton.core.backend import KartonMetrics + + self.log.info("Service %s started in crash-consume mode", self.identity) + + with self.graceful_killer(): + while not self.shutdown: + + task: Task + for task in self.backend.iter_all_tasks(parse_resources=False): + if task.status != TaskState.CRASHED: + continue + if task.headers.get("receiver") == self.identity: + break + else: + self.log.warning( + "Crashed task for consumer %s not found." + "Sleeping and trying again..." % (self.identity,) + ) + time.sleep(5) + continue + + self.current_task = task + self.log_handler.set_task(self.current_task) + + exception_str = None + + try: + self.log.info("Received new task - %s", self.current_task.uid) + # self.backend.set_task_status(self.current_task, TaskState.STARTED) + + self._run_pre_hooks() + + saved_exception = None + try: + self.process(self.current_task) + except Exception as exc: + saved_exception = exc + raise + finally: + self._run_post_hooks(saved_exception) + + self.log.info("Task done - %s", self.current_task.uid) + except Exception: + exc_info = sys.exc_info() + exception_str = traceback.format_exception(*exc_info) + self.log.exception( + "Failed to process task - %s", self.current_task.uid + ) + finally: + self.backend.increment_metrics( + KartonMetrics.TASK_CONSUMED, self.identity + ) + + task_state = TaskState.FINISHED + + # report the task status as crashed + # if an exception was caught while processing + if exception_str is not None: + task_state = TaskState.CRASHED + self.current_task.error = exception_str + + self.backend.set_task_status(self.current_task, task_state) + @classmethod def args_parser(cls) -> argparse.ArgumentParser: parser = super().args_parser() From d0912f81dc5d8a9aa33560e71c487ed4092a2925 Mon Sep 17 00:00:00 2001 From: Stanislav Rakovsky Date: Wed, 2 Apr 2025 10:56:01 +0300 Subject: [PATCH 6/6] Delete wrong-placed code in system.py & add required condition of TaskState.CRASHED to karton.py --- karton/core/karton.py | 2 + karton/system/system.py | 83 ----------------------------------------- 2 files changed, 2 insertions(+), 83 deletions(-) diff --git a/karton/core/karton.py b/karton/core/karton.py index d65f98ef..858b0c4b 100644 --- a/karton/core/karton.py +++ b/karton/core/karton.py @@ -378,6 +378,8 @@ def loop_crashed_tasks(self) -> None: task: Task for task in self.backend.iter_all_tasks(parse_resources=False): + if task.status != TaskState.CRASHED: + continue if task.headers.get("receiver") == self.identity: break else: diff --git a/karton/system/system.py b/karton/system/system.py index 2389d102..844c0875 100644 --- a/karton/system/system.py +++ b/karton/system/system.py @@ -324,89 +324,6 @@ def loop(self) -> None: if not self.enable_router: time.sleep(1) # Avoid a busy loop - def loop_crashed_tasks(self) -> None: - """ - Blocking loop that consumes crashed tasks. - It's preferable for debugging issueses occured with your service. - Consumer.loop_crashed_tasks is different from Consumer.loop: - - It does not rely on `karton.queue`. It finds crashed doc in `karton.task`. - So RUN ONLY ONE REPLICA to avoid race condition - and large resource consumption. - - It does not rely in task_timeout. - - It does not register new binds. - - It does not shut down another instances on binds / version mismatch. - - It does not listen queue in a traditional way / it dows not subsribe. - It looks for tasks in `CRASHED` state. - - It does not increment `TASK_CRASHED` metrics. - - It reimplements `Cunsumer.internal_process` in simplified way. - :meta private: - """ - import sys - import traceback - - from karton.core.backend import KartonMetrics - - self.log.info("Service %s started in crash-consume mode", self.identity) - - with self.graceful_killer(): - while not self.shutdown: - - task: Task - for task in self.backend.iter_all_tasks(parse_resources=False): - if task.status != TaskState.CRASHED: - continue - if task.headers.get("receiver") == self.identity: - break - else: - self.log.warning( - "Crashed task for consumer %s not found." - "Sleeping and trying again..." % (self.identity,) - ) - time.sleep(5) - continue - - self.current_task = task - self.log_handler.set_task(self.current_task) - - exception_str = None - - try: - self.log.info("Received new task - %s", self.current_task.uid) - # self.backend.set_task_status(self.current_task, TaskState.STARTED) - - self._run_pre_hooks() - - saved_exception = None - try: - self.process(self.current_task) - except Exception as exc: - saved_exception = exc - raise - finally: - self._run_post_hooks(saved_exception) - - self.log.info("Task done - %s", self.current_task.uid) - except Exception: - exc_info = sys.exc_info() - exception_str = traceback.format_exception(*exc_info) - self.log.exception( - "Failed to process task - %s", self.current_task.uid - ) - finally: - self.backend.increment_metrics( - KartonMetrics.TASK_CONSUMED, self.identity - ) - - task_state = TaskState.FINISHED - - # report the task status as crashed - # if an exception was caught while processing - if exception_str is not None: - task_state = TaskState.CRASHED - self.current_task.error = exception_str - - self.backend.set_task_status(self.current_task, task_state) - @classmethod def args_parser(cls) -> argparse.ArgumentParser: parser = super().args_parser()