mirror of
https://gitlab.com/allianceauth/allianceauth.git
synced 2025-07-09 12:30:15 +02:00
Merge branch 'randomdelay' into 'master'
Spread esi tasks over 10 minutes See merge request allianceauth/allianceauth!1666
This commit is contained in:
commit
795a7e006f
@ -1,4 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
|
from random import randint
|
||||||
|
|
||||||
from celery import shared_task
|
from celery import shared_task
|
||||||
|
|
||||||
@ -9,7 +10,8 @@ from . import providers
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
TASK_PRIORITY = 7
|
TASK_PRIORITY = 7
|
||||||
CHUNK_SIZE = 500
|
CHARACTER_AFFILIATION_CHUNK_SIZE = 500
|
||||||
|
EVEONLINE_TASK_JITTER = 600
|
||||||
|
|
||||||
|
|
||||||
def chunks(lst, n):
|
def chunks(lst, n):
|
||||||
@ -19,13 +21,13 @@ def chunks(lst, n):
|
|||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def update_corp(corp_id):
|
def update_corp(corp_id: int) -> None:
|
||||||
"""Update given corporation from ESI"""
|
"""Update given corporation from ESI"""
|
||||||
EveCorporationInfo.objects.update_corporation(corp_id)
|
EveCorporationInfo.objects.update_corporation(corp_id)
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def update_alliance(alliance_id):
|
def update_alliance(alliance_id: int) -> None:
|
||||||
"""Update given alliance from ESI"""
|
"""Update given alliance from ESI"""
|
||||||
EveAllianceInfo.objects.update_alliance(alliance_id).populate_alliance()
|
EveAllianceInfo.objects.update_alliance(alliance_id).populate_alliance()
|
||||||
|
|
||||||
@ -37,23 +39,30 @@ def update_character(character_id: int) -> None:
|
|||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def run_model_update():
|
def run_model_update() -> None:
|
||||||
"""Update all alliances, corporations and characters from ESI"""
|
"""Update all alliances, corporations and characters from ESI"""
|
||||||
|
|
||||||
#update existing corp models
|
# Queue update tasks for Known Corporation Models
|
||||||
for corp in EveCorporationInfo.objects.all().values('corporation_id'):
|
for corp in EveCorporationInfo.objects.all().values('corporation_id'):
|
||||||
update_corp.apply_async(args=[corp['corporation_id']], priority=TASK_PRIORITY)
|
update_corp.apply_async(
|
||||||
|
args=[corp['corporation_id']],
|
||||||
|
priority=TASK_PRIORITY,
|
||||||
|
countdown=randint(1, EVEONLINE_TASK_JITTER))
|
||||||
|
|
||||||
# update existing alliance models
|
# Queue update tasks for Known Alliance Models
|
||||||
for alliance in EveAllianceInfo.objects.all().values('alliance_id'):
|
for alliance in EveAllianceInfo.objects.all().values('alliance_id'):
|
||||||
update_alliance.apply_async(args=[alliance['alliance_id']], priority=TASK_PRIORITY)
|
update_alliance.apply_async(
|
||||||
|
args=[alliance['alliance_id']],
|
||||||
|
priority=TASK_PRIORITY,
|
||||||
|
countdown=randint(1, EVEONLINE_TASK_JITTER))
|
||||||
|
|
||||||
# update existing character models
|
# Queue update tasks for Known Character Models
|
||||||
character_ids = EveCharacter.objects.all().values_list('character_id', flat=True)
|
character_ids = EveCharacter.objects.all().values_list('character_id', flat=True)
|
||||||
for character_ids_chunk in chunks(character_ids, CHUNK_SIZE):
|
for character_ids_chunk in chunks(character_ids, CHARACTER_AFFILIATION_CHUNK_SIZE):
|
||||||
update_character_chunk.apply_async(
|
update_character_chunk.apply_async(
|
||||||
args=[character_ids_chunk], priority=TASK_PRIORITY
|
args=[character_ids_chunk],
|
||||||
)
|
priority=TASK_PRIORITY,
|
||||||
|
countdown=randint(1, EVEONLINE_TASK_JITTER))
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
@ -68,8 +77,9 @@ def update_character_chunk(character_ids_chunk: list):
|
|||||||
logger.info("Failed to bulk update characters. Attempting single updates")
|
logger.info("Failed to bulk update characters. Attempting single updates")
|
||||||
for character_id in character_ids_chunk:
|
for character_id in character_ids_chunk:
|
||||||
update_character.apply_async(
|
update_character.apply_async(
|
||||||
args=[character_id], priority=TASK_PRIORITY
|
args=[character_id],
|
||||||
)
|
priority=TASK_PRIORITY,
|
||||||
|
countdown=randint(1, EVEONLINE_TASK_JITTER))
|
||||||
return
|
return
|
||||||
|
|
||||||
affiliations = {
|
affiliations = {
|
||||||
@ -107,5 +117,5 @@ def update_character_chunk(character_ids_chunk: list):
|
|||||||
|
|
||||||
if corp_changed or alliance_changed or name_changed:
|
if corp_changed or alliance_changed or name_changed:
|
||||||
update_character.apply_async(
|
update_character.apply_async(
|
||||||
args=[character.get('character_id')], priority=TASK_PRIORITY
|
args=[character.get('character_id')],
|
||||||
)
|
priority=TASK_PRIORITY)
|
||||||
|
@ -84,7 +84,7 @@ class TestUpdateTasks(TestCase):
|
|||||||
@override_settings(CELERY_ALWAYS_EAGER=True)
|
@override_settings(CELERY_ALWAYS_EAGER=True)
|
||||||
@patch('allianceauth.eveonline.providers.esi_client_factory')
|
@patch('allianceauth.eveonline.providers.esi_client_factory')
|
||||||
@patch('allianceauth.eveonline.tasks.providers')
|
@patch('allianceauth.eveonline.tasks.providers')
|
||||||
@patch('allianceauth.eveonline.tasks.CHUNK_SIZE', 2)
|
@patch('allianceauth.eveonline.tasks.CHARACTER_AFFILIATION_CHUNK_SIZE', 2)
|
||||||
class TestRunModelUpdate(TransactionTestCase):
|
class TestRunModelUpdate(TransactionTestCase):
|
||||||
def test_should_run_updates(self, mock_providers, mock_esi_client_factory):
|
def test_should_run_updates(self, mock_providers, mock_esi_client_factory):
|
||||||
# given
|
# given
|
||||||
@ -139,7 +139,7 @@ class TestRunModelUpdate(TransactionTestCase):
|
|||||||
@patch('allianceauth.eveonline.tasks.update_character', wraps=update_character)
|
@patch('allianceauth.eveonline.tasks.update_character', wraps=update_character)
|
||||||
@patch('allianceauth.eveonline.providers.esi_client_factory')
|
@patch('allianceauth.eveonline.providers.esi_client_factory')
|
||||||
@patch('allianceauth.eveonline.tasks.providers')
|
@patch('allianceauth.eveonline.tasks.providers')
|
||||||
@patch('allianceauth.eveonline.tasks.CHUNK_SIZE', 2)
|
@patch('allianceauth.eveonline.tasks.CHARACTER_AFFILIATION_CHUNK_SIZE', 2)
|
||||||
class TestUpdateCharacterChunk(TestCase):
|
class TestUpdateCharacterChunk(TestCase):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _updated_character_ids(spy_update_character) -> set:
|
def _updated_character_ids(spy_update_character) -> set:
|
||||||
|
@ -168,6 +168,49 @@ example.apply_async(priority=3)
|
|||||||
For defining a priority to tasks, you cannot use the convenient shortcut ``delay()``, but instead need to start a task with ``apply_async()``, which also requires you to pass parameters to your task function differently. Please check out the `official docs <https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async>`_ for details.
|
For defining a priority to tasks, you cannot use the convenient shortcut ``delay()``, but instead need to start a task with ``apply_async()``, which also requires you to pass parameters to your task function differently. Please check out the `official docs <https://docs.celeryproject.org/en/stable/reference/celery.app.task.html#celery.app.task.Task.apply_async>`_ for details.
|
||||||
:::
|
:::
|
||||||
|
|
||||||
|
## Rate-Limiting and Smoothing of Task Execution
|
||||||
|
|
||||||
|
Large numbers of installs running the same crontab (ie. `0 * * * *`) can all slam an external service at the same time.
|
||||||
|
|
||||||
|
Consider Artificially smoothing out your tasks with a few methods
|
||||||
|
|
||||||
|
### Offset Crontabs
|
||||||
|
|
||||||
|
Avoid running your tasks on the hour or other nice neat human numbers, consider 23 minutes on the hour instead of at zero (`28 * * * *`)
|
||||||
|
|
||||||
|
### Subset Tasks
|
||||||
|
|
||||||
|
Slice your tasks needed up into more manageable chunks and run them more often. 1/10th of your tasks run 10x more often will return the same end result with less peak loads on external services and your task queue.
|
||||||
|
|
||||||
|
### Celery ETA/Countdown
|
||||||
|
|
||||||
|
Scatter your tasks across a larger window using <https://docs.celeryq.dev/en/latest/userguide/calling.html#eta-and-countdown>
|
||||||
|
|
||||||
|
This example will queue up tasks across the next 10 minutes, trickling them into your workers (and the external service)
|
||||||
|
|
||||||
|
```python
|
||||||
|
for corp in EveCorporationInfo.objects.all().values('corporation_id'):
|
||||||
|
update_corp.apply_async(args=[corp['corporation_id']], priority=TASK_PRIORITY)
|
||||||
|
update_corp.apply_async(
|
||||||
|
args=[corp['corporation_id']],
|
||||||
|
priority=TASK_PRIORITY,
|
||||||
|
countdown=randint(1, 600))
|
||||||
|
```
|
||||||
|
|
||||||
|
### Celery Rate Limits
|
||||||
|
|
||||||
|
Celery Rate Limits come with a small catch, its _per worker_, you may have to be either very conservative or have these configurable by the end user if they varied their worker count.
|
||||||
|
|
||||||
|
<https://docs.celeryq.dev/en/latest/userguide/tasks.html#Task.rate_limit>
|
||||||
|
|
||||||
|
This example of 10 Tasks per Minute will result in ~100 tasks per minute at 10 Workers
|
||||||
|
|
||||||
|
```python
|
||||||
|
@shared_task(rate_limit="10/m")
|
||||||
|
def update_charactercorporationhistory(character_id: int) -> None:
|
||||||
|
"""Update CharacterCorporationHistory models from ESI"""
|
||||||
|
```
|
||||||
|
|
||||||
## What special features should I be aware of?
|
## What special features should I be aware of?
|
||||||
|
|
||||||
Every Alliance Auth installation will come with a couple of special celery related features "out-of-the-box" that you can make use of in your apps.
|
Every Alliance Auth installation will come with a couple of special celery related features "out-of-the-box" that you can make use of in your apps.
|
||||||
@ -192,6 +235,6 @@ You can use it like so:
|
|||||||
|
|
||||||
Please see the [official documentation](https://pypi.org/project/celery_once/) of celery-once for details.
|
Please see the [official documentation](https://pypi.org/project/celery_once/) of celery-once for details.
|
||||||
|
|
||||||
### task priorities
|
### Task Priorities
|
||||||
|
|
||||||
Alliance Auth is using task priorities to enable priority-based scheduling of task execution. Please see [How can I use priorities for tasks?](#how-can-i-use-priorities-for-tasks) for details.
|
Alliance Auth is using task priorities to enable priority-based scheduling of task execution. Please see [How can I use priorities for tasks?](#how-can-i-use-priorities-for-tasks) for details.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user