|
4 | 4 | import logging |
5 | 5 | from cronitor import State, Monitor |
6 | 6 | import cronitor |
| 7 | +import functools |
| 8 | +import shutil |
| 9 | +import tempfile |
7 | 10 | import sys |
8 | 11 |
|
9 | 12 | logger = logging.getLogger(__name__) |
@@ -51,10 +54,17 @@ def initialize(app, celerybeat_only=False, api_key=None): # type: (celery.Celer |
51 | 54 | global ping_monitor_on_retry |
52 | 55 |
|
53 | 56 | def celerybeat_startup(sender, **kwargs): # type: (celery.beat.Service, Dict) -> None |
54 | | - scheduler = sender.get_scheduler() # type: celery.beat.Scheduler |
55 | | - schedules = scheduler.get_schedule() |
| 57 | + # To avoid recursion, since restarting celerybeat will result in this |
| 58 | + # signal being called again, we disconnect the signal. |
| 59 | + beat_init.disconnect(celerybeat_startup, dispatch_uid=1) |
| 60 | + |
| 61 | + # Must use the cached_property from scheduler so as not to re-open the shelve database |
| 62 | + scheduler = sender.scheduler # type: celery.beat.Scheduler |
| 63 | + # Also need to use the property here, including for django-celery-beat |
| 64 | + schedules = scheduler.schedule |
56 | 65 | monitors = [] # type: List[Dict[str, str]] |
57 | 66 |
|
| 67 | + add_periodic_task_deferred = [] |
58 | 68 | for name in schedules: |
59 | 69 | if name.startswith('celery.'): |
60 | 70 | continue |
@@ -95,21 +105,37 @@ def celerybeat_startup(sender, **kwargs): # type: (celery.beat.Service, Dict) - |
95 | 105 | 'x-cronitor-celerybeat-name': name, |
96 | 106 | }) |
97 | 107 |
|
98 | | - app.add_periodic_task(entry.schedule, |
| 108 | + add_periodic_task_deferred.append( |
| 109 | + functools.partial(app.add_periodic_task, |
| 110 | + entry.schedule, |
99 | 111 | # Setting headers in the signature |
100 | | - # works better then in periodic task options |
| 112 | + # works better than in periodic task options |
101 | 113 | app.tasks.get(entry.task).s().set(headers=headers), |
102 | 114 | args=entry.args, kwargs=entry.kwargs, |
103 | 115 | name=entry.name, **(entry.options or {})) |
| 116 | + ) |
| 117 | + |
| 118 | + if isinstance(sender.scheduler, celery.beat.PersistentScheduler): |
| 119 | + # The celerybeat-schedule file with shelve gets corrupted really easily, so we need |
| 120 | + # to set up a tempfile instead. |
| 121 | + new_schedule = tempfile.NamedTemporaryFile() |
| 122 | + with open(sender.schedule_filename, 'rb') as current_schedule: |
| 123 | + shutil.copyfileobj(current_schedule, new_schedule) |
| 124 | + # We need to stop and restart celerybeat to get the task updates in place. |
| 125 | + # This isn't ideal, but seems to work. |
| 126 | + |
| 127 | + sender.stop() |
| 128 | + # Now, actually add all the periodic tasks to overwrite beat with the headers |
| 129 | + for task in add_periodic_task_deferred: |
| 130 | + task() |
| 131 | + # Then, restart celerybeat, on the new schedule file (copied from the old one) |
| 132 | + app.Beat(schedule=new_schedule.name).run() |
104 | 133 |
|
105 | | - # To avoid recursion, since restarting celerybeat will result in this |
106 | | - # signal being called again, we disconnect the signal. |
107 | | - beat_init.disconnect(celerybeat_startup, dispatch_uid=1) |
| 134 | + else: |
| 135 | + # For django-celery, etc., we don't need to stop and restart celerybeat |
| 136 | + for task in add_periodic_task_deferred: |
| 137 | + task() |
108 | 138 |
|
109 | | - # We need to stop and restart celerybeat to get the task updates in place. |
110 | | - # This isn't ideal, but seems to work. |
111 | | - sender.stop() |
112 | | - app.Beat().run() |
113 | 139 | logger.debug("[Cronitor] creating monitors: %s", [m['key'] for m in monitors]) |
114 | 140 | Monitor.put(monitors) |
115 | 141 |
|
@@ -175,4 +201,3 @@ def ping_monitor_on_retry(sender, # type: celery.Task |
175 | 201 | return |
176 | 202 |
|
177 | 203 | monitor.ping(state=State.FAIL, series=sender.request.id, message=str(reason)) |
178 | | - |
|
0 commit comments