diff --git a/allianceauth/project_template/project_name/celery.py b/allianceauth/project_template/project_name/celery.py index dc9b4d39..aaed7fb4 100644 --- a/allianceauth/project_template/project_name/celery.py +++ b/allianceauth/project_template/project_name/celery.py @@ -11,6 +11,13 @@ app = Celery('{{ project_name }}') # Using a string here means the worker don't have to serialize # the configuration object to child processes. app.config_from_object('django.conf:settings') +app.conf.ONCE = { + 'backend': 'celery_once.backends.Redis', + 'settings': { + 'url': 'redis://localhost:6379/0', + 'default_timeout': 60 * 60 + } +} # Load task modules from all registered Django app configs. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) diff --git a/allianceauth/services/modules/discord/tasks.py b/allianceauth/services/modules/discord/tasks.py index 86bb06f2..8995e267 100644 --- a/allianceauth/services/modules/discord/tasks.py +++ b/allianceauth/services/modules/discord/tasks.py @@ -9,6 +9,7 @@ from requests.exceptions import HTTPError from allianceauth.services.hooks import NameFormatter from .manager import DiscordOAuthManager, DiscordApiBackoff from .models import DiscordUser +from allianceauth.services.tasks import QueueOnce logger = logging.getLogger(__name__) @@ -58,7 +59,7 @@ class DiscordTasks: return True @staticmethod - @shared_task(bind=True, name='discord.update_groups') + @shared_task(bind=True, name='discord.update_groups', base=QueueOnce) def update_groups(task_self, pk): user = User.objects.get(pk=pk) logger.debug("Updating discord groups for user %s" % user) @@ -99,7 +100,7 @@ class DiscordTasks: DiscordTasks.update_groups.delay(discord_user.user.pk) @staticmethod - @shared_task(bind=True, name='discord.update_nickname') + @shared_task(bind=True, name='discord.update_nickname', base=QueueOnce) def update_nickname(task_self, pk): user = User.objects.get(pk=pk) logger.debug("Updating discord nickname for user %s" % user) diff --git a/allianceauth/services/modules/discourse/tasks.py b/allianceauth/services/modules/discourse/tasks.py index 43cd4dd6..262d1525 100644 --- a/allianceauth/services/modules/discourse/tasks.py +++ b/allianceauth/services/modules/discourse/tasks.py @@ -6,6 +6,7 @@ from celery import shared_task from allianceauth.notifications import notify from allianceauth.services.hooks import NameFormatter +from allianceauth.services.tasks import QueueOnce from .manager import DiscourseManager from .models import DiscourseUser @@ -40,7 +41,7 @@ class DiscourseTasks: return False @staticmethod - @shared_task(bind=True, name='discourse.update_groups') + @shared_task(bind=True, name='discourse.update_groups', base=QueueOnce) def update_groups(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating discourse groups for user %s" % user) diff --git a/allianceauth/services/modules/mumble/tasks.py b/allianceauth/services/modules/mumble/tasks.py index 541a0990..02450e93 100644 --- a/allianceauth/services/modules/mumble/tasks.py +++ b/allianceauth/services/modules/mumble/tasks.py @@ -3,7 +3,7 @@ import logging from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from celery import shared_task - +from allianceauth.services.tasks import QueueOnce from .models import MumbleUser logger = logging.getLogger(__name__) @@ -26,7 +26,7 @@ class MumbleTasks: MumbleUser.objects.all().delete() @staticmethod - @shared_task(bind=True, name="mumble.update_groups") + @shared_task(bind=True, name="mumble.update_groups", base=QueueOnce) def update_groups(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating mumble groups for user %s" % user) diff --git a/allianceauth/services/modules/openfire/tasks.py b/allianceauth/services/modules/openfire/tasks.py index 4d6bcf8d..3993b6b6 100644 --- a/allianceauth/services/modules/openfire/tasks.py +++ b/allianceauth/services/modules/openfire/tasks.py @@ -4,7 +4,7 @@ from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from allianceauth.notifications import notify from celery import shared_task - +from allianceauth.services.tasks import QueueOnce from allianceauth.services.modules.openfire.manager import OpenfireManager from allianceauth.services.hooks import NameFormatter from .models import OpenfireUser @@ -40,7 +40,7 @@ class OpenfireTasks: OpenfireUser.objects.all().delete() @staticmethod - @shared_task(bind=True, name="openfire.update_groups") + @shared_task(bind=True, name="openfire.update_groups", base=QueueOnce) def update_groups(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating jabber groups for user %s" % user) diff --git a/allianceauth/services/modules/phpbb3/tasks.py b/allianceauth/services/modules/phpbb3/tasks.py index 107ead86..6c9b7503 100644 --- a/allianceauth/services/modules/phpbb3/tasks.py +++ b/allianceauth/services/modules/phpbb3/tasks.py @@ -3,7 +3,7 @@ import logging from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from celery import shared_task - +from allianceauth.services.tasks import QueueOnce from allianceauth.notifications import notify from allianceauth.services.hooks import NameFormatter from .manager import Phpbb3Manager @@ -35,7 +35,7 @@ class Phpbb3Tasks: return False @staticmethod - @shared_task(bind=True, name="phpbb3.update_groups") + @shared_task(bind=True, name="phpbb3.update_groups", base=QueueOnce) def update_groups(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating phpbb3 groups for user %s" % user) diff --git a/allianceauth/services/modules/seat/tasks.py b/allianceauth/services/modules/seat/tasks.py index 0ed4e2a1..e0f61186 100644 --- a/allianceauth/services/modules/seat/tasks.py +++ b/allianceauth/services/modules/seat/tasks.py @@ -3,7 +3,7 @@ import logging from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from celery import shared_task - +from allianceauth.services.tasks import QueueOnce from allianceauth.notifications import notify from allianceauth.services.hooks import NameFormatter from .manager import SeatManager @@ -34,7 +34,7 @@ class SeatTasks: return False @staticmethod - @shared_task(bind=True) + @shared_task(bind=True, name='seat.update_roles', base=QueueOnce) def update_roles(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating SeAT roles for user %s" % user) diff --git a/allianceauth/services/modules/smf/tasks.py b/allianceauth/services/modules/smf/tasks.py index 91b45732..847fb52c 100644 --- a/allianceauth/services/modules/smf/tasks.py +++ b/allianceauth/services/modules/smf/tasks.py @@ -3,7 +3,7 @@ import logging from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from celery import shared_task - +from allianceauth.services.tasks import QueueOnce from allianceauth.notifications import notify from allianceauth.services.hooks import NameFormatter from .manager import SmfManager @@ -39,7 +39,7 @@ class SmfTasks: SmfUser.objects.all().delete() @staticmethod - @shared_task(bind=True, name="smf.update_groups") + @shared_task(bind=True, name="smf.update_groups", base=QueueOnce) def update_groups(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating smf groups for user %s" % user) diff --git a/allianceauth/services/modules/teamspeak3/tasks.py b/allianceauth/services/modules/teamspeak3/tasks.py index 0d2fa98d..96779e3b 100644 --- a/allianceauth/services/modules/teamspeak3/tasks.py +++ b/allianceauth/services/modules/teamspeak3/tasks.py @@ -3,7 +3,7 @@ import logging from django.contrib.auth.models import User from django.core.exceptions import ObjectDoesNotExist from celery import shared_task - +from allianceauth.services.tasks import QueueOnce from allianceauth.notifications import notify from allianceauth.services.hooks import NameFormatter from .manager import Teamspeak3Manager @@ -56,7 +56,7 @@ class Teamspeak3Tasks: logger.info("Teamspeak3 disabled") @staticmethod - @shared_task(bind=True, name="teamspeak3.update_groups") + @shared_task(bind=True, name="teamspeak3.update_groups", base=QueueOnce) def update_groups(self, pk): user = User.objects.get(pk=pk) logger.debug("Updating user %s teamspeak3 groups" % user) diff --git a/allianceauth/services/tasks.py b/allianceauth/services/tasks.py index d3bcdd16..545aa7a9 100644 --- a/allianceauth/services/tasks.py +++ b/allianceauth/services/tasks.py @@ -4,12 +4,18 @@ import redis from celery import shared_task from django.contrib.auth.models import User from .hooks import ServicesHook +from celery_once import QueueOnce as BaseTask REDIS_CLIENT = redis.Redis() logger = logging.getLogger(__name__) +class QueueOnce(BaseTask): + once = BaseTask.once + once['graceful'] = True + + # http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html def only_one(function=None, key="", timeout=None): """Enforce only one celery task at a time.""" diff --git a/setup.py b/setup.py index 882a439f..049a34ca 100644 --- a/setup.py +++ b/setup.py @@ -14,6 +14,7 @@ install_requires = [ 'redis', 'celery>=4.0.2', + 'celery_once', 'django>=1.11', 'django-bootstrap-form', diff --git a/tests/celery.py b/tests/celery.py index 6492058a..220dbb35 100644 --- a/tests/celery.py +++ b/tests/celery.py @@ -7,6 +7,13 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') from django.conf import settings # noqa app = Celery('devauth') +app.conf.ONCE = { + 'backend': 'celery_once.backends.Redis', + 'settings': { + 'url': 'redis://localhost:6379/0', + 'default_timeout': 60 * 60 + } +} # Using a string here means the worker don't have to serialize # the configuration object to child processes.