Skip to content
81 changes: 81 additions & 0 deletions karton/core/karton.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Base library for karton subsystems.
"""

import abc
import argparse
import sys
Expand Down Expand Up @@ -351,6 +352,86 @@ def loop(self) -> None:
if task:
self.internal_process(task)

def loop_crashed_tasks(self) -> None:
"""
Blocking loop that consumes crashed tasks.
It's preferable for debugging issueses occured with your service.

Consumer.loop_crashed_tasks is different from Consumer.loop:
- It does not rely on `karton.queue`. It finds crashed doc in `karton.task`.
So RUN ONLY ONE REPLICA to avoid race condition
and large resource consumption.
- It does not rely in task_timeout.
- It does not register new binds.
- It does not shut down another instances on binds / version mismatch.
- It does not listen queue in a traditional way / it dows not subsribe.
It looks for tasks in `CRASHED` state.
- It does not increment `TASK_CRASHED` metrics.
- It reimplements `Cunsumer.internal_process` in simplified way.

:meta private:
"""
self.log.info("Service %s started in crash-consume mode", self.identity)

with self.graceful_killer():
while not self.shutdown:

task: Task
for task in self.backend.iter_all_tasks(parse_resources=False):
if task.status != TaskState.CRASHED:
continue
if task.headers.get("receiver") == self.identity:
break
else:
self.log.warning(
"Crashed task for consumer %s not found."
"Sleeping and trying again..." % (self.identity,)
)
time.sleep(5)
continue

self.current_task = task
self.log_handler.set_task(self.current_task)

exception_str = None

try:
self.log.info("Received new task - %s", self.current_task.uid)
# self.backend.set_task_status(self.current_task, TaskState.STARTED)

self._run_pre_hooks()

saved_exception = None
try:
self.process(self.current_task)
except Exception as exc:
saved_exception = exc
raise
finally:
self._run_post_hooks(saved_exception)

self.log.info("Task done - %s", self.current_task.uid)
except Exception:
exc_info = sys.exc_info()
exception_str = traceback.format_exception(*exc_info)
self.log.exception(
"Failed to process task - %s", self.current_task.uid
)
finally:
self.backend.increment_metrics(
KartonMetrics.TASK_CONSUMED, self.identity
)

task_state = TaskState.FINISHED

# report the task status as crashed
# if an exception was caught while processing
if exception_str is not None:
task_state = TaskState.CRASHED
self.current_task.error = exception_str

self.backend.set_task_status(self.current_task, task_state)


class LogConsumer(KartonServiceBase):
"""
Expand Down