2023-02-28 15:16:51 +01:00

48 lines
1.5 KiB
Python

from celery_once import AlreadyQueued
from django.core.cache import cache
from django.test import TestCase
from allianceauth.services.celery_once.backends import DjangoBackend
class TestDjangoBackend(TestCase):
TEST_KEY = "my-django-backend-test-key"
TIMEOUT = 1800
def setUp(self) -> None:
cache.delete(self.TEST_KEY)
self.backend = DjangoBackend(dict())
def test_can_get_lock(self):
"""
when lock can be acquired
then set it with timeout
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.assertIsNotNone(cache.get(self.TEST_KEY))
self.assertAlmostEqual(cache.ttl(self.TEST_KEY), self.TIMEOUT, delta=2)
def test_when_cant_get_lock_raise_exception(self):
"""
when lock can bot be acquired
then raise AlreadyQueued exception with countdown
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
try:
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
except Exception as ex:
self.assertIsInstance(ex, AlreadyQueued)
self.assertAlmostEqual(ex.countdown, self.TIMEOUT, delta=2)
def test_can_clear_lock(self):
"""
when a lock exists
then can get a new lock after clearing it
"""
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.backend.clear_lock(self.TEST_KEY)
self.backend.raise_or_lock(self.TEST_KEY, self.TIMEOUT)
self.assertIsNotNone(cache.get(self.TEST_KEY))