diff --git a/allianceauth/project_template/project_name/celery.py b/allianceauth/project_template/project_name/celery.py index c55a67d0..6f1ce67e 100644 --- a/allianceauth/project_template/project_name/celery.py +++ b/allianceauth/project_template/project_name/celery.py @@ -22,7 +22,7 @@ app.conf.task_default_priority = 5 # anything called with the task.delay() will app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen app.conf.ONCE = { - 'backend': 'allianceauth.services.tasks.DjangoBackend', + 'backend': 'allianceauth.services.celery_once.backends.DjangoBackend', 'settings': {} } diff --git a/allianceauth/services/celery_once/__init__.py b/allianceauth/services/celery_once/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/allianceauth/services/celery_once/backends.py b/allianceauth/services/celery_once/backends.py new file mode 100644 index 00000000..e84fdc06 --- /dev/null +++ b/allianceauth/services/celery_once/backends.py @@ -0,0 +1,19 @@ +from celery_once import AlreadyQueued +from django.core.cache import cache + + +class DjangoBackend: + """Locking backend for celery once.""" + + def __init__(self, settings): + pass + + @staticmethod + def raise_or_lock(key, timeout): + acquired = cache.add(key=key, value="lock", timeout=timeout) + if not acquired: + raise AlreadyQueued(int(cache.ttl(key))) + + @staticmethod + def clear_lock(key): + return cache.delete(key) diff --git a/allianceauth/services/celery_once/tasks.py b/allianceauth/services/celery_once/tasks.py new file mode 100644 index 00000000..4751ccc4 --- /dev/null +++ b/allianceauth/services/celery_once/tasks.py @@ -0,0 +1,8 @@ +from celery_once import QueueOnce as BaseTask + + +class QueueOnce(BaseTask): + """QueueOnce class with custom defaults.""" + + once = BaseTask.once + once["graceful"] = True diff --git a/allianceauth/services/celery_once/tests.py b/allianceauth/services/celery_once/tests.py new file mode 100644 index 00000000..cc48d69f --- /dev/null +++ b/allianceauth/services/celery_once/tests.py @@ -0,0 +1,47 @@ +from celery_once import AlreadyQueued +from django.core.cache import cache +from django.test import TestCase + +from allianceauth.services.celery_once.backends import DjangoBackend + + +class TestDjangoBackend(TestCase): + TEST_KEY = "my-django-backend-test-key" + TIMEOUT = 1800 + + def setUp(self) -> None: + cache.delete(self.TEST_KEY) + self.backend = DjangoBackend(dict()) + + def test_can_get_lock(self): + """ + when lock can be acquired + then set it with timeout + """ + self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) + self.assertIsNotNone(cache.get(self.TEST_KEY)) + self.assertAlmostEqual(cache.ttl(self.TEST_KEY), self.TIMEOUT, delta=2) + + def test_when_cant_get_lock_raise_exception(self): + """ + when lock can bot be acquired + then raise AlreadyQueued exception with countdown + """ + self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) + + try: + self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) + except Exception as ex: + self.assertIsInstance(ex, AlreadyQueued) + self.assertAlmostEqual(ex.countdown, self.TIMEOUT, delta=2) + + def test_can_clear_lock(self): + """ + when a lock exists + then can get a new lock after clearing it + """ + self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) + + self.backend.clear_lock(self.TEST_KEY) + self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) + self.assertIsNotNone(cache.get(self.TEST_KEY)) diff --git a/allianceauth/services/tasks.py b/allianceauth/services/tasks.py index 58947577..4fb29e52 100644 --- a/allianceauth/services/tasks.py +++ b/allianceauth/services/tasks.py @@ -3,33 +3,11 @@ import logging from celery import shared_task from django.contrib.auth.models import User from .hooks import ServicesHook -from celery_once import QueueOnce as BaseTask, AlreadyQueued -from django.core.cache import cache - +from .celery_once.tasks import QueueOnce # noqa: F401 - for backwards compatibility logger = logging.getLogger(__name__) -class QueueOnce(BaseTask): - once = BaseTask.once - once['graceful'] = True - - -class DjangoBackend: - def __init__(self, settings): - pass - - @staticmethod - def raise_or_lock(key, timeout): - acquired = cache.add(key=key, value="lock", timeout=timeout) - if not acquired: - raise AlreadyQueued(int(cache.ttl(key))) - - @staticmethod - def clear_lock(key): - return cache.delete(key) - - @shared_task(bind=True) def validate_services(self, pk): user = User.objects.get(pk=pk) @@ -38,7 +16,7 @@ def validate_services(self, pk): for svc in ServicesHook.get_services(): try: svc.validate_user(user) - except: + except Exception: logger.exception(f'Exception running validate_user for services module {svc} on user {user}') diff --git a/allianceauth/services/tests/test_tasks.py b/allianceauth/services/tests/test_tasks.py index 06257a1f..a923e15c 100644 --- a/allianceauth/services/tests/test_tasks.py +++ b/allianceauth/services/tests/test_tasks.py @@ -1,15 +1,10 @@ from unittest import mock -from celery_once import AlreadyQueued - -from django.core.cache import cache from django.test import override_settings, TestCase from allianceauth.tests.auth_utils import AuthUtils from allianceauth.services.tasks import validate_services, update_groups_for_user -from ..tasks import DjangoBackend - @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True) class ServicesTasksTestCase(TestCase): @@ -46,46 +41,3 @@ class ServicesTasksTestCase(TestCase): self.assertTrue(svc.update_groups.called) args, _ = svc.update_groups.call_args self.assertEqual(self.member, args[0]) # Assert correct user - - -class TestDjangoBackend(TestCase): - - TEST_KEY = "my-django-backend-test-key" - TIMEOUT = 1800 - - def setUp(self) -> None: - cache.delete(self.TEST_KEY) - self.backend = DjangoBackend(dict()) - - def test_can_get_lock(self): - """ - when lock can be acquired - then set it with timetout - """ - self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) - self.assertIsNotNone(cache.get(self.TEST_KEY)) - self.assertAlmostEqual(cache.ttl(self.TEST_KEY), self.TIMEOUT, delta=2) - - def test_when_cant_get_lock_raise_exception(self): - """ - when lock can bot be acquired - then raise AlreadyQueued exception with countdown - """ - self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) - - try: - self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) - except Exception as ex: - self.assertIsInstance(ex, AlreadyQueued) - self.assertAlmostEqual(ex.countdown, self.TIMEOUT, delta=2) - - def test_can_clear_lock(self): - """ - when a lock exists - then can get a new lock after clearing it - """ - self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) - - self.backend.clear_lock(self.TEST_KEY) - self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT) - self.assertIsNotNone(cache.get(self.TEST_KEY)) diff --git a/tests/celery.py b/tests/celery.py index d43e8a52..11ae735e 100644 --- a/tests/celery.py +++ b/tests/celery.py @@ -21,7 +21,7 @@ app.conf.task_default_priority = 5 # anything called with the task.delay() will app.conf.worker_prefetch_multiplier = 1 # only prefetch single tasks at a time on the workers so that prio tasks happen app.conf.ONCE = { - 'backend': 'allianceauth.services.tasks.DjangoBackend', + 'backend': 'allianceauth.services.celery_once.backends.DjangoBackend', 'settings': {} }