2023-02-28 15:16:51 +01:00

20 lines
468 B
Python

from celery_once import AlreadyQueued
from django.core.cache import cache
class DjangoBackend:
"""Locking backend for celery once."""
def __init__(self, settings):
pass
@staticmethod
def raise_or_lock(key, timeout):
acquired = cache.add(key=key, value="lock", timeout=timeout)
if not acquired:
raise AlreadyQueued(int(cache.ttl(key)))
@staticmethod
def clear_lock(key):
return cache.delete(key)