From f3f156bf570a0e73b1223a30ba215a0c17406ce1 Mon Sep 17 00:00:00 2001 From: Adarnof Date: Fri, 23 Mar 2018 14:43:15 -0400 Subject: [PATCH] Use Django's cache framework for task keys. Remove depreciated only_one decorator. Prevent including task_self repr in key name. Because some tasks are nested in a class, they use a task_self argument instead of the normal self which the celery_once package doesn't recognize to strip out. --- .../project_template/project_name/celery.py | 7 +-- allianceauth/services/tasks.py | 60 +++++++++++-------- tests/celery.py | 14 ++--- 3 files changed, 43 insertions(+), 38 deletions(-) diff --git a/allianceauth/project_template/project_name/celery.py b/allianceauth/project_template/project_name/celery.py index aaed7fb4..247c5361 100644 --- a/allianceauth/project_template/project_name/celery.py +++ b/allianceauth/project_template/project_name/celery.py @@ -12,11 +12,8 @@ app = Celery('{{ project_name }}') # the configuration object to child processes. app.config_from_object('django.conf:settings') app.conf.ONCE = { - 'backend': 'celery_once.backends.Redis', - 'settings': { - 'url': 'redis://localhost:6379/0', - 'default_timeout': 60 * 60 - } + 'backend': 'allianceauth.services.tasks.DjangoBackend', + 'settings': {} } # Load task modules from all registered Django app configs. diff --git a/allianceauth/services/tasks.py b/allianceauth/services/tasks.py index 545aa7a9..5e812adf 100644 --- a/allianceauth/services/tasks.py +++ b/allianceauth/services/tasks.py @@ -1,12 +1,13 @@ import logging -import redis -from celery import shared_task +from celery import shared_task, Task from django.contrib.auth.models import User from .hooks import ServicesHook -from celery_once import QueueOnce as BaseTask +from celery_once import QueueOnce as BaseTask, AlreadyQueued +from celery_once.helpers import now_unix, queue_once_key +from django.core.cache import cache +from inspect import getcallargs -REDIS_CLIENT = redis.Redis() logger = logging.getLogger(__name__) @@ -15,32 +16,41 @@ class QueueOnce(BaseTask): once = BaseTask.once once['graceful'] = True + def get_key(self, args=None, kwargs=None): + """ + Generate the key from the name of the task (e.g. 'tasks.example') and + args/kwargs. + """ + restrict_to = self.once.get('keys', None) + args = args or {} + kwargs = kwargs or {} + call_args = getcallargs(self.run, *args, **kwargs) -# http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html -def only_one(function=None, key="", timeout=None): - """Enforce only one celery task at a time.""" + if isinstance(call_args.get('self'), Task): + del call_args['self'] + if isinstance(call_args.get('task_self'), Task): + del call_args['task_self'] + return queue_once_key(self.name, call_args, restrict_to) - def _dec(run_func): - """Decorator.""" - def _caller(*args, **kwargs): - """Caller.""" - ret_value = None - have_lock = False - lock = REDIS_CLIENT.lock(key, timeout=timeout) - try: - have_lock = lock.acquire(blocking=False) - if have_lock: - ret_value = run_func(*args, **kwargs) - finally: - if have_lock: - lock.release() +class DjangoBackend: + def __init__(self, settings): + pass - return ret_value + @staticmethod + def raise_or_lock(key, timeout): + now = now_unix() + result = cache.get(key) + if result: + remaining = int(result) - now + if remaining > 0: + raise AlreadyQueued(remaining) + else: + cache.set(key, now + timeout, timeout) - return _caller - - return _dec(function) if function is not None else _dec + @staticmethod + def clear_lock(key): + return cache.delete(key) @shared_task(bind=True) diff --git a/tests/celery.py b/tests/celery.py index 220dbb35..ee162f68 100644 --- a/tests/celery.py +++ b/tests/celery.py @@ -1,4 +1,4 @@ -import os, sys +import os from celery import Celery # set the default Django settings module for the 'celery' program. @@ -7,17 +7,15 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings') from django.conf import settings # noqa app = Celery('devauth') -app.conf.ONCE = { - 'backend': 'celery_once.backends.Redis', - 'settings': { - 'url': 'redis://localhost:6379/0', - 'default_timeout': 60 * 60 - } -} # Using a string here means the worker don't have to serialize # the configuration object to child processes. app.config_from_object('django.conf:settings') +app.conf.ONCE = { + 'backend': 'allianceauth.services.tasks.DjangoBackend', + 'settings': {} +} # Load task modules from all registered Django app configs. app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) +