Skip to content

Fixes #3364: task completion cache should not be updated after run() completes#3366

Open
telzhov wants to merge 2 commits intospotify:masterfrom
telzhov:fixes-3364
Open

Fixes #3364: task completion cache should not be updated after run() completes#3366
telzhov wants to merge 2 commits intospotify:masterfrom
telzhov:fixes-3364

Conversation

@telzhov
Copy link

@telzhov telzhov commented Oct 25, 2025

Description

worker.py:

if not self.check_complete_on_run:
    # update the cache
    if self.task_completion_cache is not None:
        self.task_completion_cache[self.task.task_id] = True
    status = DONE
elif self.check_complete(self.task):
    status = DONE

replaced with

if not self.check_complete_on_run or self.check_complete(self.task):
    status = DONE

Motivation and Context

Fixes #3364. The task completion cache should not be updated immediately after run() completes, since completing run() doesn't guarantee that all output targets will also complete. Shouldn't the cache be updated only using the check_complete_cached() function?

Have you tested this? If so, how?

Tested with a debugger using the code example from #3364

@telzhov telzhov requested review from a team and dlstadther as code owners October 25, 2025 20:43
@telzhov
Copy link
Author

telzhov commented Oct 26, 2025

Also updated test/worker_test.py:

-            self.assertEqual(a0.complete_count, 2)
-            self.assertEqual(a1.complete_count, 2)
-            self.assertEqual(a2.complete_count, 2)
+            self.assertEqual(a0.complete_count, 3)
+            self.assertEqual(a1.complete_count, 3)
+            self.assertEqual(a2.complete_count, 3)

'3' seems to be correct, because a downstream task with dynamic requitements seems to have a redundant call to complete() of an upstream task (have no idea why). Try this, for example:

from luigi import Task, build

class A(Task):
    has_run = False
    complete_count = 0

    def run(self):
        self.has_run = True
        print(f'A(): run() is done')

    def complete(self):
        self.complete_count += 1
        print(f'A(): calls to complete() = {self.complete_count}, complete = {self.has_run}')
        return self.has_run

class B(Task):
    has_run = False

    def run(self):
        yield A()
        self.has_run = True

    def complete(self):
        return self.has_run

build([B()], log_level='ERROR', local_scheduler=True)

it prints

A(): calls to complete() = 1, complete = False
A(): calls to complete() = 2, complete = False
A(): run() is done
A(): calls to complete() = 3, complete = True

Now switch on cache_task_completion in /etc/luigi/luigi.cfg, and you get

A(): calls to complete() = 1, complete = False
A(): calls to complete() = 2, complete = False
A(): run() is done

so the '2' value in the worker_test.py is because A's completeness is just not get checked after its run() completes

Again, you'll find in the next test code snippet this assertion

with Worker(scheduler=self.sch, worker_id='2', cache_task_completion=False) as w:
    ...
    self.assertEqual(a12.complete_count, 3)

So it's known that for disabled cache_task_completion the upstream's complete() gets called three times instead of two

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

'cache_task_completion = true' causes luigi to ignore tasks' completion

1 participant