Add Celery Priorities

This commit is contained in:
Aaron Kable 2020-03-26 02:20:02 +00:00 committed by Ariel Rin
parent 32e0621b0a
commit 8861ec0a61
5 changed files with 31 additions and 10 deletions

View File

@ -7,6 +7,7 @@ from .models import EveCorporationInfo
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TASK_PRIORITY = 7
@shared_task @shared_task
def update_corp(corp_id): def update_corp(corp_id):
@ -27,11 +28,12 @@ def update_character(character_id):
def run_model_update(): def run_model_update():
# update existing corp models # update existing corp models
for corp in EveCorporationInfo.objects.all().values('corporation_id'): for corp in EveCorporationInfo.objects.all().values('corporation_id'):
update_corp.delay(corp['corporation_id']) update_corp.apply_async(args=[corp['corporation_id']], priority=TASK_PRIORITY)
# update existing alliance models # update existing alliance models
for alliance in EveAllianceInfo.objects.all().values('alliance_id'): for alliance in EveAllianceInfo.objects.all().values('alliance_id'):
update_alliance.delay(alliance['alliance_id']) update_alliance.apply_async(args=[alliance['alliance_id']], priority=TASK_PRIORITY)
#update existing character models
for character in EveCharacter.objects.all().values('character_id'): for character in EveCharacter.objects.all().values('character_id'):
update_character.delay(character['character_id']) update_character.apply_async(args=[character['character_id']], priority=TASK_PRIORITY)

View File

@ -80,28 +80,28 @@ class TestTasks(TestCase):
character_name='character.name', character_name='character.name',
corporation_id='character.corp.id', corporation_id='character.corp.id',
corporation_name='character.corp.name', corporation_name='character.corp.name',
corporation_ticker='character.corp.ticker', corporation_ticker='c.c.t', # max 5 chars
alliance_id='character.alliance.id', alliance_id='character.alliance.id',
alliance_name='character.alliance.name', alliance_name='character.alliance.name',
) )
run_model_update() run_model_update()
self.assertEqual(mock_update_corp.delay.call_count, 1) self.assertEqual(mock_update_corp.apply_async.call_count, 1)
self.assertEqual( self.assertEqual(
int(mock_update_corp.delay.call_args[0][0]), int(mock_update_corp.apply_async.call_args[1]['args'][0]),
2345 2345
) )
self.assertEqual(mock_update_alliance.delay.call_count, 1) self.assertEqual(mock_update_alliance.apply_async.call_count, 1)
self.assertEqual( self.assertEqual(
int(mock_update_alliance.delay.call_args[0][0]), int(mock_update_alliance.apply_async.call_args[1]['args'][0]),
3456 3456
) )
self.assertEqual(mock_update_character.delay.call_count, 1) self.assertEqual(mock_update_character.apply_async.call_count, 1)
self.assertEqual( self.assertEqual(
int(mock_update_character.delay.call_args[0][0]), int(mock_update_character.apply_async.call_args[1]['args'][0]),
1234 1234
) )

View File

@ -11,6 +11,15 @@ app = Celery('{{ project_name }}')
# Using a string here means the worker don't have to serialize # Using a string here means the worker don't have to serialize
# the configuration object to child processes. # the configuration object to child processes.
app.config_from_object('django.conf:settings') app.config_from_object('django.conf:settings')
# setup priorities ( 0 Highest, 9 Lowest )
app.conf.broker_transport_options = {
'priority_steps': list(range(10)), # setup que to have 10 steps
'queue_order_strategy': 'priority', # setup que to use prio sorting
}
app.conf.task_default_priority = 5 # anything called with the task.delay() will be given normal priority (5)
app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen
app.conf.ONCE = { app.conf.ONCE = {
'backend': 'allianceauth.services.tasks.DjangoBackend', 'backend': 'allianceauth.services.tasks.DjangoBackend',
'settings': {} 'settings': {}

View File

@ -11,6 +11,15 @@ app = Celery('devauth')
# Using a string here means the worker don't have to serialize # Using a string here means the worker don't have to serialize
# the configuration object to child processes. # the configuration object to child processes.
app.config_from_object('django.conf:settings') app.config_from_object('django.conf:settings')
# setup priorities ( 0 Highest, 9 Lowest )
app.conf.broker_transport_options = {
'priority_steps': list(range(10)), # setup que to have 10 steps
'queue_order_strategy': 'priority', # setup que to use prio sorting
}
app.conf.task_default_priority = 5 # anything called with the task.delay() will be given normal priority (5)
app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen
app.conf.ONCE = { app.conf.ONCE = {
'backend': 'allianceauth.services.tasks.DjangoBackend', 'backend': 'allianceauth.services.tasks.DjangoBackend',
'settings': {} 'settings': {}

View File

@ -20,3 +20,4 @@ commands =
all: coverage run runtests.py -v 2 all: coverage run runtests.py -v 2
all: coverage report -m all: coverage report -m
core: coverage run runtests.py allianceauth.authentication.tests.test_app_settings -v 2 core: coverage run runtests.py allianceauth.authentication.tests.test_app_settings -v 2
all: coverage xml