Use Django's cache framework for task keys.

Remove depreciated only_one decorator.

Prevent including task_self repr in key name.

Because some tasks are nested in a class, they use a task_self argument instead of the normal self which the celery_once package doesn't recognize to strip out.
This commit is contained in:
Adarnof 2018-03-23 14:43:15 -04:00
parent 73e6f576f4
commit f3f156bf57
3 changed files with 43 additions and 38 deletions

View File

@ -12,11 +12,8 @@ app = Celery('{{ project_name }}')
# the configuration object to child processes.
app.config_from_object('django.conf:settings')
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
'backend': 'allianceauth.services.tasks.DjangoBackend',
'settings': {}
}
# Load task modules from all registered Django app configs.

View File

@ -1,12 +1,13 @@
import logging
import redis
from celery import shared_task
from celery import shared_task, Task
from django.contrib.auth.models import User
from .hooks import ServicesHook
from celery_once import QueueOnce as BaseTask
from celery_once import QueueOnce as BaseTask, AlreadyQueued
from celery_once.helpers import now_unix, queue_once_key
from django.core.cache import cache
from inspect import getcallargs
REDIS_CLIENT = redis.Redis()
logger = logging.getLogger(__name__)
@ -15,32 +16,41 @@ class QueueOnce(BaseTask):
once = BaseTask.once
once['graceful'] = True
def get_key(self, args=None, kwargs=None):
"""
Generate the key from the name of the task (e.g. 'tasks.example') and
args/kwargs.
"""
restrict_to = self.once.get('keys', None)
args = args or {}
kwargs = kwargs or {}
call_args = getcallargs(self.run, *args, **kwargs)
# http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
def only_one(function=None, key="", timeout=None):
"""Enforce only one celery task at a time."""
if isinstance(call_args.get('self'), Task):
del call_args['self']
if isinstance(call_args.get('task_self'), Task):
del call_args['task_self']
return queue_once_key(self.name, call_args, restrict_to)
def _dec(run_func):
"""Decorator."""
def _caller(*args, **kwargs):
"""Caller."""
ret_value = None
have_lock = False
lock = REDIS_CLIENT.lock(key, timeout=timeout)
try:
have_lock = lock.acquire(blocking=False)
if have_lock:
ret_value = run_func(*args, **kwargs)
finally:
if have_lock:
lock.release()
class DjangoBackend:
def __init__(self, settings):
pass
return ret_value
@staticmethod
def raise_or_lock(key, timeout):
now = now_unix()
result = cache.get(key)
if result:
remaining = int(result) - now
if remaining > 0:
raise AlreadyQueued(remaining)
else:
cache.set(key, now + timeout, timeout)
return _caller
return _dec(function) if function is not None else _dec
@staticmethod
def clear_lock(key):
return cache.delete(key)
@shared_task(bind=True)

View File

@ -1,4 +1,4 @@
import os, sys
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
@ -7,17 +7,15 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
from django.conf import settings # noqa
app = Celery('devauth')
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://localhost:6379/0',
'default_timeout': 60 * 60
}
}
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
app.config_from_object('django.conf:settings')
app.conf.ONCE = {
'backend': 'allianceauth.services.tasks.DjangoBackend',
'settings': {}
}
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)