From 60998bffc2dd3b05815e402d9fb7dd4eb0f917c4 Mon Sep 17 00:00:00 2001 From: Ariel Rin Date: Fri, 10 Jan 2025 12:10:49 +0000 Subject: [PATCH] Cron Offset Tasks --- allianceauth/apps.py | 1 - allianceauth/crontab/__init__.py | 3 + allianceauth/crontab/apps.py | 14 ++++ allianceauth/crontab/models.py | 23 ++++++ allianceauth/crontab/schedulers.py | 63 +++++++++++++++ allianceauth/crontab/tests/__init__.py | 0 allianceauth/crontab/tests/test_models.py | 63 +++++++++++++++ allianceauth/crontab/tests/test_utils.py | 80 +++++++++++++++++++ allianceauth/crontab/utils.py | 41 ++++++++++ .../project_name/settings/base.py | 6 +- docs/development/tech_docu/celery.md | 51 +++++++++++- 11 files changed, 341 insertions(+), 4 deletions(-) create mode 100644 allianceauth/crontab/__init__.py create mode 100644 allianceauth/crontab/apps.py create mode 100644 allianceauth/crontab/models.py create mode 100644 allianceauth/crontab/schedulers.py create mode 100644 allianceauth/crontab/tests/__init__.py create mode 100644 allianceauth/crontab/tests/test_models.py create mode 100644 allianceauth/crontab/tests/test_utils.py create mode 100644 allianceauth/crontab/utils.py diff --git a/allianceauth/apps.py b/allianceauth/apps.py index 098f50ba..1191fed9 100644 --- a/allianceauth/apps.py +++ b/allianceauth/apps.py @@ -1,5 +1,4 @@ from django.apps import AppConfig -from django.core.checks import Warning, Error, register class AllianceAuthConfig(AppConfig): diff --git a/allianceauth/crontab/__init__.py b/allianceauth/crontab/__init__.py new file mode 100644 index 00000000..c621f6d2 --- /dev/null +++ b/allianceauth/crontab/__init__.py @@ -0,0 +1,3 @@ +""" +Alliance Auth Crontab Utilities +""" diff --git a/allianceauth/crontab/apps.py b/allianceauth/crontab/apps.py new file mode 100644 index 00000000..e47e39ed --- /dev/null +++ b/allianceauth/crontab/apps.py @@ -0,0 +1,14 @@ +""" +Crontab App Config +""" + +from django.apps import AppConfig + + +class CrontabConfig(AppConfig): + """ + Crontab App Config + """ + + name = "allianceauth.crontab" + label = "crontab" diff --git a/allianceauth/crontab/models.py b/allianceauth/crontab/models.py new file mode 100644 index 00000000..f23b35ea --- /dev/null +++ b/allianceauth/crontab/models.py @@ -0,0 +1,23 @@ +from random import random +from django.db import models +from django.utils.translation import gettext_lazy as _ +from solo.models import SingletonModel + + +def random_default() -> float: + return random() + + +class CronOffset(SingletonModel): + + minute = models.FloatField(_("Minute Offset"), default=random_default) + hour = models.FloatField(_("Hour Offset"), default=random_default) + day_of_month = models.FloatField(_("Day of Month Offset"), default=random_default) + month_of_year = models.FloatField(_("Month of Year Offset"), default=random_default) + day_of_week = models.FloatField(_("Day of Week Offset"), default=random_default) + + def __str__(self) -> str: + return "Cron Offsets" + + class Meta: + verbose_name = "Cron Offsets" diff --git a/allianceauth/crontab/schedulers.py b/allianceauth/crontab/schedulers.py new file mode 100644 index 00000000..0b36eead --- /dev/null +++ b/allianceauth/crontab/schedulers.py @@ -0,0 +1,63 @@ +from django.core.exceptions import ObjectDoesNotExist +from django_celery_beat.schedulers import ( + DatabaseScheduler +) +from django_celery_beat.models import CrontabSchedule +from django.db.utils import OperationalError, ProgrammingError + +from celery import schedules +from celery.utils.log import get_logger + +from allianceauth.crontab.models import CronOffset +from allianceauth.crontab.utils import offset_cron + +logger = get_logger(__name__) + + +class OffsetDatabaseScheduler(DatabaseScheduler): + """ + Customization of Django Celery Beat, Database Scheduler + Takes the Celery Schedule from local.py and applies our AA Framework Cron Offset, if apply_offset is true + Otherwise it passes it through as normal + """ + def update_from_dict(self, mapping): + s = {} + + try: + cron_offset = CronOffset.get_solo() + except (OperationalError, ProgrammingError, ObjectDoesNotExist) as exc: + # This is just incase we haven't migrated yet or something + logger.warning( + "OffsetDatabaseScheduler: Could not fetch CronOffset (%r). " + "Defering to DatabaseScheduler", + exc + ) + return super().update_from_dict(mapping) + + for name, entry_fields in mapping.items(): + try: + apply_offset = entry_fields.pop("apply_offset", False) + entry = self.Entry.from_entry(name, app=self.app, **entry_fields) + + if entry.model.enabled and apply_offset: + schedule_obj = entry.schedule + if isinstance(schedule_obj, schedules.crontab): + offset_cs = CrontabSchedule.from_schedule(offset_cron(schedule_obj)) + offset_cs, created = CrontabSchedule.objects.get_or_create( + minute=offset_cs.minute, + hour=offset_cs.hour, + day_of_month=offset_cs.day_of_month, + month_of_year=offset_cs.month_of_year, + day_of_week=offset_cs.day_of_week, + timezone=offset_cs.timezone, + ) + entry.model.crontab = offset_cs + entry.model.save() + logger.debug(f"Offset applied for '{name}' due to 'apply_offset' = True.") + + s[name] = entry + + except Exception as e: + logger.exception("Error updating schedule for %s: %r", name, e) + + self.schedule.update(s) diff --git a/allianceauth/crontab/tests/__init__.py b/allianceauth/crontab/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/allianceauth/crontab/tests/test_models.py b/allianceauth/crontab/tests/test_models.py new file mode 100644 index 00000000..cb92e667 --- /dev/null +++ b/allianceauth/crontab/tests/test_models.py @@ -0,0 +1,63 @@ +from unittest.mock import patch +from django.test import TestCase + +from allianceauth.crontab.models import CronOffset + + +class CronOffsetModelTest(TestCase): + def test_cron_offset_is_singleton(self): + """ + Test that CronOffset is indeed a singleton and that + multiple calls to get_solo() return the same instance. + """ + offset1 = CronOffset.get_solo() + offset2 = CronOffset.get_solo() + + # They should be the exact same object in memory + self.assertEqual(offset1.pk, offset2.pk) + + def test_default_values_random(self): + """ + Test that the default values are set via random_default() when + no explicit value is provided. We'll patch 'random.random' to + produce predictable output. + """ + with patch('allianceauth.crontab.models.random', return_value=0.1234): + # Force creation of a new CronOffset by clearing the existing one + CronOffset.objects.all().delete() + + offset = CronOffset.get_solo() # This triggers creation + + # All fields should be 0.1234, because we patched random() + self.assertAlmostEqual(offset.minute, 0.1234) + self.assertAlmostEqual(offset.hour, 0.1234) + self.assertAlmostEqual(offset.day_of_month, 0.1234) + self.assertAlmostEqual(offset.month_of_year, 0.1234) + self.assertAlmostEqual(offset.day_of_week, 0.1234) + + def test_update_offset_values(self): + """ + Test that we can update the offsets and retrieve them. + """ + offset = CronOffset.get_solo() + offset.minute = 0.5 + offset.hour = 0.25 + offset.day_of_month = 0.75 + offset.month_of_year = 0.99 + offset.day_of_week = 0.33 + offset.save() + + # Retrieve again to ensure changes persist + saved_offset = CronOffset.get_solo() + self.assertEqual(saved_offset.minute, 0.5) + self.assertEqual(saved_offset.hour, 0.25) + self.assertEqual(saved_offset.day_of_month, 0.75) + self.assertEqual(saved_offset.month_of_year, 0.99) + self.assertEqual(saved_offset.day_of_week, 0.33) + + def test_str_representation(self): + """ + Verify the __str__ method returns 'Cron Offsets'. + """ + offset = CronOffset.get_solo() + self.assertEqual(str(offset), "Cron Offsets") diff --git a/allianceauth/crontab/tests/test_utils.py b/allianceauth/crontab/tests/test_utils.py new file mode 100644 index 00000000..48592c99 --- /dev/null +++ b/allianceauth/crontab/tests/test_utils.py @@ -0,0 +1,80 @@ +# myapp/tests/test_tasks.py + +import logging +from unittest.mock import patch +from django.test import TestCase +from django.db import ProgrammingError +from celery.schedules import crontab + +from allianceauth.crontab.utils import offset_cron +from allianceauth.crontab.models import CronOffset + +logger = logging.getLogger(__name__) + + +class TestOffsetCron(TestCase): + + def test_offset_cron_normal(self): + """ + Test that offset_cron modifies the minute/hour fields + based on the CronOffset values when everything is normal. + """ + # We'll create a mock CronOffset instance + mock_offset = CronOffset(minute=0.5, hour=0.5) + + # Our initial crontab schedule + original_schedule = crontab( + minute=[0, 5, 55], + hour=[0, 3, 23], + day_of_month='*', + month_of_year='*', + day_of_week='*' + ) + + # Patch CronOffset.get_solo to return our mock offset + with patch('allianceauth.crontab.models.CronOffset.get_solo', return_value=mock_offset): + new_schedule = offset_cron(original_schedule) + + # Check the new minute/hour + # minute 0 -> 0 + round(60 * 0.5) = 30 % 60 = 30 + # minute 5 -> 5 + 30 = 35 % 60 = 35 + # minute 55 -> 55 + 30 = 85 % 60 = 25 --> sorted => 25,30,35 + self.assertEqual(new_schedule._orig_minute, '25,30,35') + + # hour 0 -> 0 + round(24 * 0.5) = 12 % 24 = 12 + # hour 3 -> 3 + 12 = 15 % 24 = 15 + # hour 23 -> 23 + 12 = 35 % 24 = 11 --> sorted => 11,12,15 + self.assertEqual(new_schedule._orig_hour, '11,12,15') + + # Check that other fields are unchanged + self.assertEqual(new_schedule._orig_day_of_month, '*') + self.assertEqual(new_schedule._orig_month_of_year, '*') + self.assertEqual(new_schedule._orig_day_of_week, '*') + + def test_offset_cron_programming_error(self): + """ + Test that if a ProgrammingError is raised (e.g. before migrations), + offset_cron just returns the original schedule. + """ + original_schedule = crontab(minute=[0, 15, 30], hour=[1, 2, 3]) + + # Force get_solo to raise ProgrammingError + with patch('allianceauth.crontab.models.CronOffset.get_solo', side_effect=ProgrammingError()): + new_schedule = offset_cron(original_schedule) + + # Should return the original schedule unchanged + self.assertEqual(new_schedule, original_schedule) + + def test_offset_cron_unexpected_exception(self): + """ + Test that if any other exception is raised, offset_cron + also returns the original schedule, and logs the error. + """ + original_schedule = crontab(minute='0', hour='0') + + # Force get_solo to raise a generic Exception + with patch('allianceauth.crontab.models.CronOffset.get_solo', side_effect=Exception("Something bad")): + new_schedule = offset_cron(original_schedule) + + # Should return the original schedule unchanged + self.assertEqual(new_schedule, original_schedule) diff --git a/allianceauth/crontab/utils.py b/allianceauth/crontab/utils.py new file mode 100644 index 00000000..150664ca --- /dev/null +++ b/allianceauth/crontab/utils.py @@ -0,0 +1,41 @@ +from celery.schedules import crontab +import logging +from allianceauth.crontab.models import CronOffset +from django.db import ProgrammingError + + +logger = logging.getLogger(__name__) + + +def offset_cron(schedule: crontab) -> crontab: + """Take a crontab and apply a series of precalculated offsets to spread out tasks execution on remote resources + + Args: + schedule (crontab): celery.schedules.crontab() + + Returns: + crontab: A crontab with offsetted Minute and Hour fields + """ + + try: + cron_offset = CronOffset.get_solo() + new_minute = [(m + (round(60 * cron_offset.minute))) % 60 for m in schedule.minute] + new_hour = [(m + (round(24 * cron_offset.hour))) % 24 for m in schedule.hour] + + return crontab( + minute=",".join(str(m) for m in sorted(new_minute)), + hour=",".join(str(h) for h in sorted(new_hour)), + day_of_month=schedule._orig_day_of_month, + month_of_year=schedule._orig_month_of_year, + day_of_week=schedule._orig_day_of_week) + + except ProgrammingError as e: + # If this is called before migrations are run hand back the default schedule + # These offsets are stored in a Singleton Model, + logger.error(e) + return schedule + + except Exception as e: + # We absolutely cant fail to hand back a schedule + logger.error(e) + return schedule diff --git a/allianceauth/project_template/project_name/settings/base.py b/allianceauth/project_template/project_name/settings/base.py index 9e8d6957..56e7c493 100644 --- a/allianceauth/project_template/project_name/settings/base.py +++ b/allianceauth/project_template/project_name/settings/base.py @@ -43,6 +43,7 @@ INSTALLED_APPS = [ 'allianceauth.theme.flatly', 'allianceauth.theme.materia', "allianceauth.custom_css", + 'allianceauth.crontab', 'sri', ] @@ -51,7 +52,7 @@ SECRET_KEY = "wow I'm a really bad default secret key" # Celery configuration BROKER_URL = 'redis://localhost:6379/0' -CELERYBEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" +CELERYBEAT_SCHEDULER = "allianceauth.crontab.schedulers.OffsetDatabaseScheduler" CELERYBEAT_SCHEDULE = { 'esi_cleanup_callbackredirect': { 'task': 'esi.tasks.cleanup_callbackredirect', @@ -64,10 +65,12 @@ CELERYBEAT_SCHEDULE = { 'run_model_update': { 'task': 'allianceauth.eveonline.tasks.run_model_update', 'schedule': crontab(minute='0', hour="*/6"), + 'apply_offset': True }, 'check_all_character_ownership': { 'task': 'allianceauth.authentication.tasks.check_all_character_ownership', 'schedule': crontab(minute='0', hour='*/4'), + 'apply_offset': True }, 'analytics_daily_stats': { 'task': 'allianceauth.analytics.tasks.analytics_daily_stats', @@ -75,6 +78,7 @@ CELERYBEAT_SCHEDULE = { } } + # Build paths inside the project like this: os.path.join(BASE_DIR, ...) PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) BASE_DIR = os.path.dirname(PROJECT_DIR) diff --git a/docs/development/tech_docu/celery.md b/docs/development/tech_docu/celery.md index 3f39d299..b9571170 100644 --- a/docs/development/tech_docu/celery.md +++ b/docs/development/tech_docu/celery.md @@ -123,6 +123,7 @@ Example setting: CELERYBEAT_SCHEDULE['structures_update_all_structures'] = { 'task': 'structures.tasks.update_all_structures', 'schedule': crontab(minute='*/30'), + 'apply_offset': True, } ``` @@ -130,6 +131,7 @@ CELERYBEAT_SCHEDULE['structures_update_all_structures'] = { - `'task'`: Name of your task (full path) - `'schedule'`: Schedule definition (see Celery documentation on [Periodic Tasks](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) for details) +- `'apply_offset'`: Boolean, Apply a Delay unique to the install, in order to reduce impact on ESI. See [Apply Offset](#apply-offset) ## How can I use priorities for tasks? @@ -174,9 +176,54 @@ Large numbers of installs running the same crontab (ie. `0 * * * *`) can all sla Consider Artificially smoothing out your tasks with a few methods -### Offset Crontabs +### Apply Offset -Avoid running your tasks on the hour or other nice neat human numbers, consider 23 minutes on the hour instead of at zero (`28 * * * *`) +`allianceauth.crontab` contains a series of Offsets stored in the DB that are both static for an install, but random across all AA installs. + +This enables us to spread our load on ESI (or other external resources) across a greater window, making it unlikely that two installs will hit ESI at the same time. + +Tasks defined in local.py, can have `'apply_offset': True` added to their Task Definition + +```python +CELERYBEAT_SCHEDULE['taskname'] = { + 'task': 'module.tasks.task', + 'schedule': crontab(minute='*/30'), + 'apply_offset': True, +} +``` + +Tasks added to directly to Django Celery Beat Models (Using a Management Task etc) can pass their Cron Schedule through offset_cron(crontab) + +```{eval-rst} +.. automodule:: allianceauth.crontab.utils + :members: + :undoc-members: +``` + +```python +from django_celery_beat.models import CrontabSchedule, PeriodicTask +from celery.schedules import crontab + +schedule = CrontabSchedule.from_schedule(offset_cron(crontab(minute='0', hour='0'))) + +schedule, created = CrontabSchedule.objects.get_or_create( + minute=schedule.minute, + hour=schedule.hour, + day_of_month=schedule.day_of_month, + month_of_year=schedule.month_of_year, + day_of_week=schedule.day_of_week, + timezone=schedule.timezone, +) + +PeriodicTask.objects.update_or_create( + task='module.tasks.task', + defaults={ + 'crontab': schedule, + 'name': 'task name', + 'enabled': True + } +) +``` ### Subset Tasks