diff --git a/allianceauth/eveonline/tasks.py b/allianceauth/eveonline/tasks.py index ce2b4219..657e63f1 100644 --- a/allianceauth/eveonline/tasks.py +++ b/allianceauth/eveonline/tasks.py @@ -1,4 +1,5 @@ import logging +from random import randint from celery import shared_task @@ -9,7 +10,8 @@ from . import providers logger = logging.getLogger(__name__) TASK_PRIORITY = 7 -CHUNK_SIZE = 500 +CHARACTER_AFFILIATION_CHUNK_SIZE = 500 +EVEONLINE_TASK_JITTER = 600 def chunks(lst, n): @@ -19,13 +21,13 @@ def chunks(lst, n): @shared_task -def update_corp(corp_id): +def update_corp(corp_id: int) -> None: """Update given corporation from ESI""" EveCorporationInfo.objects.update_corporation(corp_id) @shared_task -def update_alliance(alliance_id): +def update_alliance(alliance_id: int) -> None: """Update given alliance from ESI""" EveAllianceInfo.objects.update_alliance(alliance_id).populate_alliance() @@ -37,23 +39,30 @@ def update_character(character_id: int) -> None: @shared_task -def run_model_update(): +def run_model_update() -> None: """Update all alliances, corporations and characters from ESI""" - #update existing corp models + # Queue update tasks for Known Corporation Models for corp in EveCorporationInfo.objects.all().values('corporation_id'): - update_corp.apply_async(args=[corp['corporation_id']], priority=TASK_PRIORITY) + update_corp.apply_async( + args=[corp['corporation_id']], + priority=TASK_PRIORITY, + countdown=randint(1, EVEONLINE_TASK_JITTER)) - # update existing alliance models + # Queue update tasks for Known Alliance Models for alliance in EveAllianceInfo.objects.all().values('alliance_id'): - update_alliance.apply_async(args=[alliance['alliance_id']], priority=TASK_PRIORITY) + update_alliance.apply_async( + args=[alliance['alliance_id']], + priority=TASK_PRIORITY, + countdown=randint(1, EVEONLINE_TASK_JITTER)) - # update existing character models + # Queue update tasks for Known Character Models character_ids = EveCharacter.objects.all().values_list('character_id', flat=True) - for character_ids_chunk in chunks(character_ids, CHUNK_SIZE): + for character_ids_chunk in chunks(character_ids, CHARACTER_AFFILIATION_CHUNK_SIZE): update_character_chunk.apply_async( - args=[character_ids_chunk], priority=TASK_PRIORITY - ) + args=[character_ids_chunk], + priority=TASK_PRIORITY, + countdown=randint(1, EVEONLINE_TASK_JITTER)) @shared_task @@ -68,8 +77,9 @@ def update_character_chunk(character_ids_chunk: list): logger.info("Failed to bulk update characters. Attempting single updates") for character_id in character_ids_chunk: update_character.apply_async( - args=[character_id], priority=TASK_PRIORITY - ) + args=[character_id], + priority=TASK_PRIORITY, + countdown=randint(1, EVEONLINE_TASK_JITTER)) return affiliations = { @@ -107,5 +117,5 @@ def update_character_chunk(character_ids_chunk: list): if corp_changed or alliance_changed or name_changed: update_character.apply_async( - args=[character.get('character_id')], priority=TASK_PRIORITY - ) + args=[character.get('character_id')], + priority=TASK_PRIORITY) diff --git a/allianceauth/eveonline/tests/test_tasks.py b/allianceauth/eveonline/tests/test_tasks.py index 78054e8f..26dc9d90 100644 --- a/allianceauth/eveonline/tests/test_tasks.py +++ b/allianceauth/eveonline/tests/test_tasks.py @@ -84,7 +84,7 @@ class TestUpdateTasks(TestCase): @override_settings(CELERY_ALWAYS_EAGER=True) @patch('allianceauth.eveonline.providers.esi_client_factory') @patch('allianceauth.eveonline.tasks.providers') -@patch('allianceauth.eveonline.tasks.CHUNK_SIZE', 2) +@patch('allianceauth.eveonline.tasks.CHARACTER_AFFILIATION_CHUNK_SIZE', 2) class TestRunModelUpdate(TransactionTestCase): def test_should_run_updates(self, mock_providers, mock_esi_client_factory): # given @@ -139,7 +139,7 @@ class TestRunModelUpdate(TransactionTestCase): @patch('allianceauth.eveonline.tasks.update_character', wraps=update_character) @patch('allianceauth.eveonline.providers.esi_client_factory') @patch('allianceauth.eveonline.tasks.providers') -@patch('allianceauth.eveonline.tasks.CHUNK_SIZE', 2) +@patch('allianceauth.eveonline.tasks.CHARACTER_AFFILIATION_CHUNK_SIZE', 2) class TestUpdateCharacterChunk(TestCase): @staticmethod def _updated_character_ids(spy_update_character) -> set: diff --git a/docs/development/tech_docu/celery.md b/docs/development/tech_docu/celery.md index f92a0cb1..3f39d299 100644 --- a/docs/development/tech_docu/celery.md +++ b/docs/development/tech_docu/celery.md @@ -168,6 +168,49 @@ example.apply_async(priority=3) For defining a priority to tasks, you cannot use the convenient shortcut ``delay()``, but instead need to start a task with ``apply_async()``, which also requires you to pass parameters to your task function differently. Please check out the `official docs `_ for details. ::: +## Rate-Limiting and Smoothing of Task Execution + +Large numbers of installs running the same crontab (ie. `0 * * * *`) can all slam an external service at the same time. + +Consider Artificially smoothing out your tasks with a few methods + +### Offset Crontabs + +Avoid running your tasks on the hour or other nice neat human numbers, consider 23 minutes on the hour instead of at zero (`28 * * * *`) + +### Subset Tasks + +Slice your tasks needed up into more manageable chunks and run them more often. 1/10th of your tasks run 10x more often will return the same end result with less peak loads on external services and your task queue. + +### Celery ETA/Countdown + +Scatter your tasks across a larger window using + +This example will queue up tasks across the next 10 minutes, trickling them into your workers (and the external service) + +```python +for corp in EveCorporationInfo.objects.all().values('corporation_id'): + update_corp.apply_async(args=[corp['corporation_id']], priority=TASK_PRIORITY) + update_corp.apply_async( + args=[corp['corporation_id']], + priority=TASK_PRIORITY, + countdown=randint(1, 600)) +``` + +### Celery Rate Limits + +Celery Rate Limits come with a small catch, its _per worker_, you may have to be either very conservative or have these configurable by the end user if they varied their worker count. + + + +This example of 10 Tasks per Minute will result in ~100 tasks per minute at 10 Workers + +```python +@shared_task(rate_limit="10/m") +def update_charactercorporationhistory(character_id: int) -> None: + """Update CharacterCorporationHistory models from ESI""" +``` + ## What special features should I be aware of? Every Alliance Auth installation will come with a couple of special celery related features "out-of-the-box" that you can make use of in your apps. @@ -192,6 +235,6 @@ You can use it like so: Please see the [official documentation](https://pypi.org/project/celery_once/) of celery-once for details. -### task priorities +### Task Priorities Alliance Auth is using task priorities to enable priority-based scheduling of task execution. Please see [How can I use priorities for tasks?](#how-can-i-use-priorities-for-tasks) for details.