Cron Offset Tasks

This commit is contained in:
Ariel Rin 2025-01-10 12:10:49 +00:00
parent a5971314f5
commit 60998bffc2
11 changed files with 341 additions and 4 deletions

View File

@ -1,5 +1,4 @@
from django.apps import AppConfig from django.apps import AppConfig
from django.core.checks import Warning, Error, register
class AllianceAuthConfig(AppConfig): class AllianceAuthConfig(AppConfig):

View File

@ -0,0 +1,3 @@
"""
Alliance Auth Crontab Utilities
"""

View File

@ -0,0 +1,14 @@
"""
Crontab App Config
"""
from django.apps import AppConfig
class CrontabConfig(AppConfig):
"""
Crontab App Config
"""
name = "allianceauth.crontab"
label = "crontab"

View File

@ -0,0 +1,23 @@
from random import random
from django.db import models
from django.utils.translation import gettext_lazy as _
from solo.models import SingletonModel
def random_default() -> float:
return random()
class CronOffset(SingletonModel):
minute = models.FloatField(_("Minute Offset"), default=random_default)
hour = models.FloatField(_("Hour Offset"), default=random_default)
day_of_month = models.FloatField(_("Day of Month Offset"), default=random_default)
month_of_year = models.FloatField(_("Month of Year Offset"), default=random_default)
day_of_week = models.FloatField(_("Day of Week Offset"), default=random_default)
def __str__(self) -> str:
return "Cron Offsets"
class Meta:
verbose_name = "Cron Offsets"

View File

@ -0,0 +1,63 @@
from django.core.exceptions import ObjectDoesNotExist
from django_celery_beat.schedulers import (
DatabaseScheduler
)
from django_celery_beat.models import CrontabSchedule
from django.db.utils import OperationalError, ProgrammingError
from celery import schedules
from celery.utils.log import get_logger
from allianceauth.crontab.models import CronOffset
from allianceauth.crontab.utils import offset_cron
logger = get_logger(__name__)
class OffsetDatabaseScheduler(DatabaseScheduler):
"""
Customization of Django Celery Beat, Database Scheduler
Takes the Celery Schedule from local.py and applies our AA Framework Cron Offset, if apply_offset is true
Otherwise it passes it through as normal
"""
def update_from_dict(self, mapping):
s = {}
try:
cron_offset = CronOffset.get_solo()
except (OperationalError, ProgrammingError, ObjectDoesNotExist) as exc:
# This is just incase we haven't migrated yet or something
logger.warning(
"OffsetDatabaseScheduler: Could not fetch CronOffset (%r). "
"Defering to DatabaseScheduler",
exc
)
return super().update_from_dict(mapping)
for name, entry_fields in mapping.items():
try:
apply_offset = entry_fields.pop("apply_offset", False)
entry = self.Entry.from_entry(name, app=self.app, **entry_fields)
if entry.model.enabled and apply_offset:
schedule_obj = entry.schedule
if isinstance(schedule_obj, schedules.crontab):
offset_cs = CrontabSchedule.from_schedule(offset_cron(schedule_obj))
offset_cs, created = CrontabSchedule.objects.get_or_create(
minute=offset_cs.minute,
hour=offset_cs.hour,
day_of_month=offset_cs.day_of_month,
month_of_year=offset_cs.month_of_year,
day_of_week=offset_cs.day_of_week,
timezone=offset_cs.timezone,
)
entry.model.crontab = offset_cs
entry.model.save()
logger.debug(f"Offset applied for '{name}' due to 'apply_offset' = True.")
s[name] = entry
except Exception as e:
logger.exception("Error updating schedule for %s: %r", name, e)
self.schedule.update(s)

View File

View File

@ -0,0 +1,63 @@
from unittest.mock import patch
from django.test import TestCase
from allianceauth.crontab.models import CronOffset
class CronOffsetModelTest(TestCase):
def test_cron_offset_is_singleton(self):
"""
Test that CronOffset is indeed a singleton and that
multiple calls to get_solo() return the same instance.
"""
offset1 = CronOffset.get_solo()
offset2 = CronOffset.get_solo()
# They should be the exact same object in memory
self.assertEqual(offset1.pk, offset2.pk)
def test_default_values_random(self):
"""
Test that the default values are set via random_default() when
no explicit value is provided. We'll patch 'random.random' to
produce predictable output.
"""
with patch('allianceauth.crontab.models.random', return_value=0.1234):
# Force creation of a new CronOffset by clearing the existing one
CronOffset.objects.all().delete()
offset = CronOffset.get_solo() # This triggers creation
# All fields should be 0.1234, because we patched random()
self.assertAlmostEqual(offset.minute, 0.1234)
self.assertAlmostEqual(offset.hour, 0.1234)
self.assertAlmostEqual(offset.day_of_month, 0.1234)
self.assertAlmostEqual(offset.month_of_year, 0.1234)
self.assertAlmostEqual(offset.day_of_week, 0.1234)
def test_update_offset_values(self):
"""
Test that we can update the offsets and retrieve them.
"""
offset = CronOffset.get_solo()
offset.minute = 0.5
offset.hour = 0.25
offset.day_of_month = 0.75
offset.month_of_year = 0.99
offset.day_of_week = 0.33
offset.save()
# Retrieve again to ensure changes persist
saved_offset = CronOffset.get_solo()
self.assertEqual(saved_offset.minute, 0.5)
self.assertEqual(saved_offset.hour, 0.25)
self.assertEqual(saved_offset.day_of_month, 0.75)
self.assertEqual(saved_offset.month_of_year, 0.99)
self.assertEqual(saved_offset.day_of_week, 0.33)
def test_str_representation(self):
"""
Verify the __str__ method returns 'Cron Offsets'.
"""
offset = CronOffset.get_solo()
self.assertEqual(str(offset), "Cron Offsets")

View File

@ -0,0 +1,80 @@
# myapp/tests/test_tasks.py
import logging
from unittest.mock import patch
from django.test import TestCase
from django.db import ProgrammingError
from celery.schedules import crontab
from allianceauth.crontab.utils import offset_cron
from allianceauth.crontab.models import CronOffset
logger = logging.getLogger(__name__)
class TestOffsetCron(TestCase):
def test_offset_cron_normal(self):
"""
Test that offset_cron modifies the minute/hour fields
based on the CronOffset values when everything is normal.
"""
# We'll create a mock CronOffset instance
mock_offset = CronOffset(minute=0.5, hour=0.5)
# Our initial crontab schedule
original_schedule = crontab(
minute=[0, 5, 55],
hour=[0, 3, 23],
day_of_month='*',
month_of_year='*',
day_of_week='*'
)
# Patch CronOffset.get_solo to return our mock offset
with patch('allianceauth.crontab.models.CronOffset.get_solo', return_value=mock_offset):
new_schedule = offset_cron(original_schedule)
# Check the new minute/hour
# minute 0 -> 0 + round(60 * 0.5) = 30 % 60 = 30
# minute 5 -> 5 + 30 = 35 % 60 = 35
# minute 55 -> 55 + 30 = 85 % 60 = 25 --> sorted => 25,30,35
self.assertEqual(new_schedule._orig_minute, '25,30,35')
# hour 0 -> 0 + round(24 * 0.5) = 12 % 24 = 12
# hour 3 -> 3 + 12 = 15 % 24 = 15
# hour 23 -> 23 + 12 = 35 % 24 = 11 --> sorted => 11,12,15
self.assertEqual(new_schedule._orig_hour, '11,12,15')
# Check that other fields are unchanged
self.assertEqual(new_schedule._orig_day_of_month, '*')
self.assertEqual(new_schedule._orig_month_of_year, '*')
self.assertEqual(new_schedule._orig_day_of_week, '*')
def test_offset_cron_programming_error(self):
"""
Test that if a ProgrammingError is raised (e.g. before migrations),
offset_cron just returns the original schedule.
"""
original_schedule = crontab(minute=[0, 15, 30], hour=[1, 2, 3])
# Force get_solo to raise ProgrammingError
with patch('allianceauth.crontab.models.CronOffset.get_solo', side_effect=ProgrammingError()):
new_schedule = offset_cron(original_schedule)
# Should return the original schedule unchanged
self.assertEqual(new_schedule, original_schedule)
def test_offset_cron_unexpected_exception(self):
"""
Test that if any other exception is raised, offset_cron
also returns the original schedule, and logs the error.
"""
original_schedule = crontab(minute='0', hour='0')
# Force get_solo to raise a generic Exception
with patch('allianceauth.crontab.models.CronOffset.get_solo', side_effect=Exception("Something bad")):
new_schedule = offset_cron(original_schedule)
# Should return the original schedule unchanged
self.assertEqual(new_schedule, original_schedule)

View File

@ -0,0 +1,41 @@
from celery.schedules import crontab
import logging
from allianceauth.crontab.models import CronOffset
from django.db import ProgrammingError
logger = logging.getLogger(__name__)
def offset_cron(schedule: crontab) -> crontab:
"""Take a crontab and apply a series of precalculated offsets to spread out tasks execution on remote resources
Args:
schedule (crontab): celery.schedules.crontab()
Returns:
crontab: A crontab with offsetted Minute and Hour fields
"""
try:
cron_offset = CronOffset.get_solo()
new_minute = [(m + (round(60 * cron_offset.minute))) % 60 for m in schedule.minute]
new_hour = [(m + (round(24 * cron_offset.hour))) % 24 for m in schedule.hour]
return crontab(
minute=",".join(str(m) for m in sorted(new_minute)),
hour=",".join(str(h) for h in sorted(new_hour)),
day_of_month=schedule._orig_day_of_month,
month_of_year=schedule._orig_month_of_year,
day_of_week=schedule._orig_day_of_week)
except ProgrammingError as e:
# If this is called before migrations are run hand back the default schedule
# These offsets are stored in a Singleton Model,
logger.error(e)
return schedule
except Exception as e:
# We absolutely cant fail to hand back a schedule
logger.error(e)
return schedule

View File

@ -43,6 +43,7 @@ INSTALLED_APPS = [
'allianceauth.theme.flatly', 'allianceauth.theme.flatly',
'allianceauth.theme.materia', 'allianceauth.theme.materia',
"allianceauth.custom_css", "allianceauth.custom_css",
'allianceauth.crontab',
'sri', 'sri',
] ]
@ -51,7 +52,7 @@ SECRET_KEY = "wow I'm a really bad default secret key"
# Celery configuration # Celery configuration
BROKER_URL = 'redis://localhost:6379/0' BROKER_URL = 'redis://localhost:6379/0'
CELERYBEAT_SCHEDULER = "django_celery_beat.schedulers.DatabaseScheduler" CELERYBEAT_SCHEDULER = "allianceauth.crontab.schedulers.OffsetDatabaseScheduler"
CELERYBEAT_SCHEDULE = { CELERYBEAT_SCHEDULE = {
'esi_cleanup_callbackredirect': { 'esi_cleanup_callbackredirect': {
'task': 'esi.tasks.cleanup_callbackredirect', 'task': 'esi.tasks.cleanup_callbackredirect',
@ -64,10 +65,12 @@ CELERYBEAT_SCHEDULE = {
'run_model_update': { 'run_model_update': {
'task': 'allianceauth.eveonline.tasks.run_model_update', 'task': 'allianceauth.eveonline.tasks.run_model_update',
'schedule': crontab(minute='0', hour="*/6"), 'schedule': crontab(minute='0', hour="*/6"),
'apply_offset': True
}, },
'check_all_character_ownership': { 'check_all_character_ownership': {
'task': 'allianceauth.authentication.tasks.check_all_character_ownership', 'task': 'allianceauth.authentication.tasks.check_all_character_ownership',
'schedule': crontab(minute='0', hour='*/4'), 'schedule': crontab(minute='0', hour='*/4'),
'apply_offset': True
}, },
'analytics_daily_stats': { 'analytics_daily_stats': {
'task': 'allianceauth.analytics.tasks.analytics_daily_stats', 'task': 'allianceauth.analytics.tasks.analytics_daily_stats',
@ -75,6 +78,7 @@ CELERYBEAT_SCHEDULE = {
} }
} }
# Build paths inside the project like this: os.path.join(BASE_DIR, ...) # Build paths inside the project like this: os.path.join(BASE_DIR, ...)
PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) PROJECT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
BASE_DIR = os.path.dirname(PROJECT_DIR) BASE_DIR = os.path.dirname(PROJECT_DIR)

View File

@ -123,6 +123,7 @@ Example setting:
CELERYBEAT_SCHEDULE['structures_update_all_structures'] = { CELERYBEAT_SCHEDULE['structures_update_all_structures'] = {
'task': 'structures.tasks.update_all_structures', 'task': 'structures.tasks.update_all_structures',
'schedule': crontab(minute='*/30'), 'schedule': crontab(minute='*/30'),
'apply_offset': True,
} }
``` ```
@ -130,6 +131,7 @@ CELERYBEAT_SCHEDULE['structures_update_all_structures'] = {
- `'task'`: Name of your task (full path) - `'task'`: Name of your task (full path)
- `'schedule'`: Schedule definition (see Celery documentation on [Periodic Tasks](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) for details) - `'schedule'`: Schedule definition (see Celery documentation on [Periodic Tasks](https://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html) for details)
- `'apply_offset'`: Boolean, Apply a Delay unique to the install, in order to reduce impact on ESI. See [Apply Offset](#apply-offset)
## How can I use priorities for tasks? ## How can I use priorities for tasks?
@ -174,9 +176,54 @@ Large numbers of installs running the same crontab (ie. `0 * * * *`) can all sla
Consider Artificially smoothing out your tasks with a few methods Consider Artificially smoothing out your tasks with a few methods
### Offset Crontabs ### Apply Offset
Avoid running your tasks on the hour or other nice neat human numbers, consider 23 minutes on the hour instead of at zero (`28 * * * *`) `allianceauth.crontab` contains a series of Offsets stored in the DB that are both static for an install, but random across all AA installs.
This enables us to spread our load on ESI (or other external resources) across a greater window, making it unlikely that two installs will hit ESI at the same time.
Tasks defined in local.py, can have `'apply_offset': True` added to their Task Definition
```python
CELERYBEAT_SCHEDULE['taskname'] = {
'task': 'module.tasks.task',
'schedule': crontab(minute='*/30'),
'apply_offset': True,
}
```
Tasks added to directly to Django Celery Beat Models (Using a Management Task etc) can pass their Cron Schedule through offset_cron(crontab)
```{eval-rst}
.. automodule:: allianceauth.crontab.utils
:members:
:undoc-members:
```
```python
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from celery.schedules import crontab
schedule = CrontabSchedule.from_schedule(offset_cron(crontab(minute='0', hour='0')))
schedule, created = CrontabSchedule.objects.get_or_create(
minute=schedule.minute,
hour=schedule.hour,
day_of_month=schedule.day_of_month,
month_of_year=schedule.month_of_year,
day_of_week=schedule.day_of_week,
timezone=schedule.timezone,
)
PeriodicTask.objects.update_or_create(
task='module.tasks.task',
defaults={
'crontab': schedule,
'name': 'task name',
'enabled': True
}
)
```
### Subset Tasks ### Subset Tasks