Use celery_once to prevent repeat task queueing.

Prevent group updates from being queued multiple times per user.

Default graceful to prevent raising exceptions.
This commit is contained in:
Adarnof 2018-03-19 20:39:27 -04:00
parent 20236cab8a
commit 73e6f576f4
12 changed files with 38 additions and 15 deletions

View File

@ -11,6 +11,13 @@ app = Celery('{{ project_name }}')
# Using a string here means the worker don't have to serialize # Using a string here means the worker don't have to serialize
# the configuration object to child processes. # the configuration object to child processes.
app.config_from_object('django.conf:settings') app.config_from_object('django.conf:settings')
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# Load task modules from all registered Django app configs. # Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

View File

@ -9,6 +9,7 @@ from requests.exceptions import HTTPError
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from .manager import DiscordOAuthManager, DiscordApiBackoff from .manager import DiscordOAuthManager, DiscordApiBackoff
from .models import DiscordUser from .models import DiscordUser
from allianceauth.services.tasks import QueueOnce
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -58,7 +59,7 @@ class DiscordTasks:
return True return True
@staticmethod @staticmethod
@shared_task(bind=True, name='discord.update_groups') @shared_task(bind=True, name='discord.update_groups', base=QueueOnce)
def update_groups(task_self, pk): def update_groups(task_self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating discord groups for user %s" % user) logger.debug("Updating discord groups for user %s" % user)
@ -99,7 +100,7 @@ class DiscordTasks:
DiscordTasks.update_groups.delay(discord_user.user.pk) DiscordTasks.update_groups.delay(discord_user.user.pk)
@staticmethod @staticmethod
@shared_task(bind=True, name='discord.update_nickname') @shared_task(bind=True, name='discord.update_nickname', base=QueueOnce)
def update_nickname(task_self, pk): def update_nickname(task_self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating discord nickname for user %s" % user) logger.debug("Updating discord nickname for user %s" % user)

View File

@ -6,6 +6,7 @@ from celery import shared_task
from allianceauth.notifications import notify from allianceauth.notifications import notify
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from allianceauth.services.tasks import QueueOnce
from .manager import DiscourseManager from .manager import DiscourseManager
from .models import DiscourseUser from .models import DiscourseUser
@ -40,7 +41,7 @@ class DiscourseTasks:
return False return False
@staticmethod @staticmethod
@shared_task(bind=True, name='discourse.update_groups') @shared_task(bind=True, name='discourse.update_groups', base=QueueOnce)
def update_groups(self, pk): def update_groups(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating discourse groups for user %s" % user) logger.debug("Updating discourse groups for user %s" % user)

View File

@ -3,7 +3,7 @@ import logging
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from celery import shared_task from celery import shared_task
from allianceauth.services.tasks import QueueOnce
from .models import MumbleUser from .models import MumbleUser
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -26,7 +26,7 @@ class MumbleTasks:
MumbleUser.objects.all().delete() MumbleUser.objects.all().delete()
@staticmethod @staticmethod
@shared_task(bind=True, name="mumble.update_groups") @shared_task(bind=True, name="mumble.update_groups", base=QueueOnce)
def update_groups(self, pk): def update_groups(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating mumble groups for user %s" % user) logger.debug("Updating mumble groups for user %s" % user)

View File

@ -4,7 +4,7 @@ from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from allianceauth.notifications import notify from allianceauth.notifications import notify
from celery import shared_task from celery import shared_task
from allianceauth.services.tasks import QueueOnce
from allianceauth.services.modules.openfire.manager import OpenfireManager from allianceauth.services.modules.openfire.manager import OpenfireManager
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from .models import OpenfireUser from .models import OpenfireUser
@ -40,7 +40,7 @@ class OpenfireTasks:
OpenfireUser.objects.all().delete() OpenfireUser.objects.all().delete()
@staticmethod @staticmethod
@shared_task(bind=True, name="openfire.update_groups") @shared_task(bind=True, name="openfire.update_groups", base=QueueOnce)
def update_groups(self, pk): def update_groups(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating jabber groups for user %s" % user) logger.debug("Updating jabber groups for user %s" % user)

View File

@ -3,7 +3,7 @@ import logging
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from celery import shared_task from celery import shared_task
from allianceauth.services.tasks import QueueOnce
from allianceauth.notifications import notify from allianceauth.notifications import notify
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from .manager import Phpbb3Manager from .manager import Phpbb3Manager
@ -35,7 +35,7 @@ class Phpbb3Tasks:
return False return False
@staticmethod @staticmethod
@shared_task(bind=True, name="phpbb3.update_groups") @shared_task(bind=True, name="phpbb3.update_groups", base=QueueOnce)
def update_groups(self, pk): def update_groups(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating phpbb3 groups for user %s" % user) logger.debug("Updating phpbb3 groups for user %s" % user)

View File

@ -3,7 +3,7 @@ import logging
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from celery import shared_task from celery import shared_task
from allianceauth.services.tasks import QueueOnce
from allianceauth.notifications import notify from allianceauth.notifications import notify
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from .manager import SeatManager from .manager import SeatManager
@ -34,7 +34,7 @@ class SeatTasks:
return False return False
@staticmethod @staticmethod
@shared_task(bind=True) @shared_task(bind=True, name='seat.update_roles', base=QueueOnce)
def update_roles(self, pk): def update_roles(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating SeAT roles for user %s" % user) logger.debug("Updating SeAT roles for user %s" % user)

View File

@ -3,7 +3,7 @@ import logging
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from celery import shared_task from celery import shared_task
from allianceauth.services.tasks import QueueOnce
from allianceauth.notifications import notify from allianceauth.notifications import notify
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from .manager import SmfManager from .manager import SmfManager
@ -39,7 +39,7 @@ class SmfTasks:
SmfUser.objects.all().delete() SmfUser.objects.all().delete()
@staticmethod @staticmethod
@shared_task(bind=True, name="smf.update_groups") @shared_task(bind=True, name="smf.update_groups", base=QueueOnce)
def update_groups(self, pk): def update_groups(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating smf groups for user %s" % user) logger.debug("Updating smf groups for user %s" % user)

View File

@ -3,7 +3,7 @@ import logging
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist
from celery import shared_task from celery import shared_task
from allianceauth.services.tasks import QueueOnce
from allianceauth.notifications import notify from allianceauth.notifications import notify
from allianceauth.services.hooks import NameFormatter from allianceauth.services.hooks import NameFormatter
from .manager import Teamspeak3Manager from .manager import Teamspeak3Manager
@ -56,7 +56,7 @@ class Teamspeak3Tasks:
logger.info("Teamspeak3 disabled") logger.info("Teamspeak3 disabled")
@staticmethod @staticmethod
@shared_task(bind=True, name="teamspeak3.update_groups") @shared_task(bind=True, name="teamspeak3.update_groups", base=QueueOnce)
def update_groups(self, pk): def update_groups(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
logger.debug("Updating user %s teamspeak3 groups" % user) logger.debug("Updating user %s teamspeak3 groups" % user)

View File

@ -4,12 +4,18 @@ import redis
from celery import shared_task from celery import shared_task
from django.contrib.auth.models import User from django.contrib.auth.models import User
from .hooks import ServicesHook from .hooks import ServicesHook
from celery_once import QueueOnce as BaseTask
REDIS_CLIENT = redis.Redis() REDIS_CLIENT = redis.Redis()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class QueueOnce(BaseTask):
once = BaseTask.once
once['graceful'] = True
# http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html # http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
def only_one(function=None, key="", timeout=None): def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time.""" """Enforce only one celery task at a time."""

View File

@ -14,6 +14,7 @@ install_requires = [
'redis', 'redis',
'celery>=4.0.2', 'celery>=4.0.2',
'celery_once',
'django>=1.11', 'django>=1.11',
'django-bootstrap-form', 'django-bootstrap-form',

View File

@ -7,6 +7,13 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
from django.conf import settings # noqa from django.conf import settings # noqa
app = Celery('devauth') app = Celery('devauth')
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# Using a string here means the worker don't have to serialize # Using a string here means the worker don't have to serialize
# the configuration object to child processes. # the configuration object to child processes.