Move celery once config into own package

This commit is contained in:
Erik Kalkoken 2023-02-28 15:16:51 +01:00
parent b149baa4e5
commit 0dd47e72bc
8 changed files with 78 additions and 74 deletions

View File

@ -22,7 +22,7 @@ app.conf.task_default_priority = 5 # anything called with the task.delay() will
app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen
app.conf.ONCE = { app.conf.ONCE = {
'backend': 'allianceauth.services.tasks.DjangoBackend', 'backend': 'allianceauth.services.celery_once.backends.DjangoBackend',
'settings': {} 'settings': {}
} }

View File

@ -0,0 +1,19 @@
from celery_once import AlreadyQueued
from django.core.cache import cache
class DjangoBackend:
"""Locking backend for celery once."""
def __init__(self, settings):
pass
@staticmethod
def raise_or_lock(key, timeout):
acquired = cache.add(key=key, value="lock", timeout=timeout)
if not acquired:
raise AlreadyQueued(int(cache.ttl(key)))
@staticmethod
def clear_lock(key):
return cache.delete(key)

View File

@ -0,0 +1,8 @@
from celery_once import QueueOnce as BaseTask
class QueueOnce(BaseTask):
"""QueueOnce class with custom defaults."""
once = BaseTask.once
once["graceful"] = True

View File

@ -0,0 +1,47 @@
from celery_once import AlreadyQueued
from django.core.cache import cache
from django.test import TestCase
from allianceauth.services.celery_once.backends import DjangoBackend
class TestDjangoBackend(TestCase):
TEST_KEY = "my-django-backend-test-key"
TIMEOUT = 1800
def setUp(self) -> None:
cache.delete(self.TEST_KEY)
self.backend = DjangoBackend(dict())
def test_can_get_lock(self):
"""
when lock can be acquired
then set it with timeout
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.assertIsNotNone(cache.get(self.TEST_KEY))
self.assertAlmostEqual(cache.ttl(self.TEST_KEY), self.TIMEOUT, delta=2)
def test_when_cant_get_lock_raise_exception(self):
"""
when lock can bot be acquired
then raise AlreadyQueued exception with countdown
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
try:
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
except Exception as ex:
self.assertIsInstance(ex, AlreadyQueued)
self.assertAlmostEqual(ex.countdown, self.TIMEOUT, delta=2)
def test_can_clear_lock(self):
"""
when a lock exists
then can get a new lock after clearing it
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.backend.clear_lock(self.TEST_KEY)
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.assertIsNotNone(cache.get(self.TEST_KEY))

View File

@ -3,33 +3,11 @@ import logging
from celery import shared_task from celery import shared_task
from django.contrib.auth.models import User from django.contrib.auth.models import User
from .hooks import ServicesHook from .hooks import ServicesHook
from celery_once import QueueOnce as BaseTask, AlreadyQueued from .celery_once.tasks import QueueOnce # noqa: F401 - for backwards compatibility
from django.core.cache import cache
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class QueueOnce(BaseTask):
once = BaseTask.once
once['graceful'] = True
class DjangoBackend:
def __init__(self, settings):
pass
@staticmethod
def raise_or_lock(key, timeout):
acquired = cache.add(key=key, value="lock", timeout=timeout)
if not acquired:
raise AlreadyQueued(int(cache.ttl(key)))
@staticmethod
def clear_lock(key):
return cache.delete(key)
@shared_task(bind=True) @shared_task(bind=True)
def validate_services(self, pk): def validate_services(self, pk):
user = User.objects.get(pk=pk) user = User.objects.get(pk=pk)
@ -38,7 +16,7 @@ def validate_services(self, pk):
for svc in ServicesHook.get_services(): for svc in ServicesHook.get_services():
try: try:
svc.validate_user(user) svc.validate_user(user)
except: except Exception:
logger.exception(f'Exception running validate_user for services module {svc} on user {user}') logger.exception(f'Exception running validate_user for services module {svc} on user {user}')

View File

@ -1,15 +1,10 @@
from unittest import mock from unittest import mock
from celery_once import AlreadyQueued
from django.core.cache import cache
from django.test import override_settings, TestCase from django.test import override_settings, TestCase
from allianceauth.tests.auth_utils import AuthUtils from allianceauth.tests.auth_utils import AuthUtils
from allianceauth.services.tasks import validate_services, update_groups_for_user from allianceauth.services.tasks import validate_services, update_groups_for_user
from ..tasks import DjangoBackend
@override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True) @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True)
class ServicesTasksTestCase(TestCase): class ServicesTasksTestCase(TestCase):
@ -46,46 +41,3 @@ class ServicesTasksTestCase(TestCase):
self.assertTrue(svc.update_groups.called) self.assertTrue(svc.update_groups.called)
args, _ = svc.update_groups.call_args args, _ = svc.update_groups.call_args
self.assertEqual(self.member, args[0]) # Assert correct user self.assertEqual(self.member, args[0]) # Assert correct user
class TestDjangoBackend(TestCase):
TEST_KEY = "my-django-backend-test-key"
TIMEOUT = 1800
def setUp(self) -> None:
cache.delete(self.TEST_KEY)
self.backend = DjangoBackend(dict())
def test_can_get_lock(self):
"""
when lock can be acquired
then set it with timetout
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.assertIsNotNone(cache.get(self.TEST_KEY))
self.assertAlmostEqual(cache.ttl(self.TEST_KEY), self.TIMEOUT, delta=2)
def test_when_cant_get_lock_raise_exception(self):
"""
when lock can bot be acquired
then raise AlreadyQueued exception with countdown
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
try:
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
except Exception as ex:
self.assertIsInstance(ex, AlreadyQueued)
self.assertAlmostEqual(ex.countdown, self.TIMEOUT, delta=2)
def test_can_clear_lock(self):
"""
when a lock exists
then can get a new lock after clearing it
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.backend.clear_lock(self.TEST_KEY)
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.assertIsNotNone(cache.get(self.TEST_KEY))

View File

@ -21,7 +21,7 @@ app.conf.task_default_priority = 5 # anything called with the task.delay() will
app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen
app.conf.ONCE = { app.conf.ONCE = {
'backend': 'allianceauth.services.tasks.DjangoBackend', 'backend': 'allianceauth.services.celery_once.backends.DjangoBackend',
'settings': {} 'settings': {}
} }