Improve celery infos on Dashboard

This commit is contained in:
Erik Kalkoken
2022-02-26 05:15:30 +00:00
committed by Ariel Rin
parent ff7c9c48f3
commit 01164777ed
17 changed files with 623 additions and 52 deletions

View File

@@ -0,0 +1,153 @@
import datetime as dt
from collections import namedtuple
from typing import Optional, List
from redis import Redis
from pytz import utc
from django.core.cache import cache
_TaskCounts = namedtuple(
"_TaskCounts", ["succeeded", "retried", "failed", "total", "earliest_task", "hours"]
)
def dashboard_results(hours: int) -> _TaskCounts:
"""Counts of all task events within the given timeframe."""
def earliest_if_exists(events: EventSeries, earliest: dt.datetime) -> list:
my_earliest = events.first_event(earliest=earliest)
return [my_earliest] if my_earliest else []
earliest = dt.datetime.utcnow() - dt.timedelta(hours=hours)
earliest_events = list()
succeeded = SucceededTaskSeries()
succeeded_count = succeeded.count(earliest=earliest)
earliest_events += earliest_if_exists(succeeded, earliest)
retried = RetriedTaskSeries()
retried_count = retried.count(earliest=earliest)
earliest_events += earliest_if_exists(retried, earliest)
failed = FailedTaskSeries()
failed_count = failed.count(earliest=earliest)
earliest_events += earliest_if_exists(failed, earliest)
return _TaskCounts(
succeeded=succeeded_count,
retried=retried_count,
failed=failed_count,
total=succeeded_count + retried_count + failed_count,
earliest_task=min(earliest_events) if earliest_events else None,
hours=hours,
)
class EventSeries:
"""Base class for recording and analysing a series of events.
This class must be inherited from and the child class must define KEY_ID.
"""
_ROOT_KEY = "ALLIANCEAUTH_TASK_SERIES"
def __init__(
self,
redis: Redis = None,
) -> None:
if type(self) == EventSeries:
raise TypeError("Can not instantiate base class.")
if not hasattr(self, "KEY_ID"):
raise ValueError("KEY_ID not defined")
self._redis = cache.get_master_client() if not redis else redis
if not isinstance(self._redis, Redis):
raise TypeError(
"This class requires a Redis client, but none was provided "
"and the default Django cache backend is not Redis either."
)
@property
def _key_counter(self):
return f"{self._ROOT_KEY}_{self.KEY_ID}_COUNTER"
@property
def _key_sorted_set(self):
return f"{self._ROOT_KEY}_{self.KEY_ID}_SORTED_SET"
def add(self, event_time: dt.datetime = None) -> None:
"""Add event.
Args:
- event_time: timestamp of event. Will use current time if not specified.
"""
if not event_time:
event_time = dt.datetime.utcnow()
id = self._redis.incr(self._key_counter)
self._redis.zadd(self._key_sorted_set, {id: event_time.timestamp()})
def all(self) -> List[dt.datetime]:
"""List of all known events."""
return [
event[1]
for event in self._redis.zrangebyscore(
self._key_sorted_set,
"-inf",
"+inf",
withscores=True,
score_cast_func=self._cast_scores_to_dt,
)
]
def clear(self) -> None:
"""Clear all events."""
self._redis.delete(self._key_sorted_set)
self._redis.delete(self._key_counter)
def count(self, earliest: dt.datetime = None, latest: dt.datetime = None) -> int:
"""Count of events, can be restricted to given timeframe.
Args:
- earliest: Date of first events to count(inclusive), or -infinite if not specified
- latest: Date of last events to count(inclusive), or +infinite if not specified
"""
min = "-inf" if not earliest else earliest.timestamp()
max = "+inf" if not latest else latest.timestamp()
return self._redis.zcount(self._key_sorted_set, min=min, max=max)
def first_event(self, earliest: dt.datetime = None) -> Optional[dt.datetime]:
"""Date/Time of first event. Returns `None` if series has no events.
Args:
- earliest: Date of first events to count(inclusive), or any if not specified
"""
min = "-inf" if not earliest else earliest.timestamp()
event = self._redis.zrangebyscore(
self._key_sorted_set,
min,
"+inf",
withscores=True,
start=0,
num=1,
score_cast_func=self._cast_scores_to_dt,
)
if not event:
return None
return event[0][1]
@staticmethod
def _cast_scores_to_dt(score) -> dt.datetime:
return dt.datetime.fromtimestamp(float(score), tz=utc)
class SucceededTaskSeries(EventSeries):
"""A task has succeeded."""
KEY_ID = "SUCCEEDED"
class RetriedTaskSeries(EventSeries):
"""A task has been retried."""
KEY_ID = "RETRIED"
class FailedTaskSeries(EventSeries):
"""A task has failed."""
KEY_ID = "FAILED"

View File

@@ -0,0 +1,42 @@
from celery.signals import task_failure, task_retry, task_success, worker_ready
from django.conf import settings
from .event_series import FailedTaskSeries, RetriedTaskSeries, SucceededTaskSeries
def reset_counters():
"""Reset all counters for the celery status."""
SucceededTaskSeries().clear()
FailedTaskSeries().clear()
RetriedTaskSeries().clear()
def is_enabled() -> bool:
return not bool(
getattr(settings, "ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED", False)
)
@worker_ready.connect
def reset_counters_when_celery_restarted(*args, **kwargs):
if is_enabled():
reset_counters()
@task_success.connect
def record_task_succeeded(*args, **kwargs):
if is_enabled():
SucceededTaskSeries().add()
@task_retry.connect
def record_task_retried(*args, **kwargs):
if is_enabled():
RetriedTaskSeries().add()
@task_failure.connect
def record_task_failed(*args, **kwargs):
if is_enabled():
FailedTaskSeries().add()

View File

@@ -0,0 +1,222 @@
import datetime as dt
from pytz import utc
from django.test import TestCase
from django.utils.timezone import now
from allianceauth.authentication.task_statistics.event_series import (
EventSeries,
FailedTaskSeries,
RetriedTaskSeries,
SucceededTaskSeries,
dashboard_results,
)
class TestEventSeries(TestCase):
"""Testing EventSeries class."""
class IncompleteEvents(EventSeries):
"""Child class without KEY ID"""
class MyEventSeries(EventSeries):
KEY_ID = "TEST"
def test_should_create_object(self):
# when
events = self.MyEventSeries()
# then
self.assertIsInstance(events, self.MyEventSeries)
def test_should_abort_when_redis_client_invalid(self):
with self.assertRaises(TypeError):
self.MyEventSeries(redis="invalid")
def test_should_not_allow_instantiation_of_base_class(self):
with self.assertRaises(TypeError):
EventSeries()
def test_should_not_allow_creating_child_class_without_key_id(self):
with self.assertRaises(ValueError):
self.IncompleteEvents()
def test_should_add_event(self):
# given
events = self.MyEventSeries()
events.clear()
# when
events.add()
# then
result = events.all()
self.assertEqual(len(result), 1)
self.assertAlmostEqual(result[0], now(), delta=dt.timedelta(seconds=30))
def test_should_add_event_with_specified_time(self):
# given
events = self.MyEventSeries()
events.clear()
my_time = dt.datetime(2021, 11, 1, 12, 15, tzinfo=utc)
# when
events.add(my_time)
# then
result = events.all()
self.assertEqual(len(result), 1)
self.assertAlmostEqual(result[0], my_time, delta=dt.timedelta(seconds=30))
def test_should_count_events(self):
# given
events = self.MyEventSeries()
events.clear()
events.add()
events.add()
# when
result = events.count()
# then
self.assertEqual(result, 2)
def test_should_count_zero(self):
# given
events = self.MyEventSeries()
events.clear()
# when
result = events.count()
# then
self.assertEqual(result, 0)
def test_should_count_events_within_timeframe_1(self):
# given
events = self.MyEventSeries()
events.clear()
events.add(dt.datetime(2021, 12, 1, 12, 0, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 10, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 15, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 30, tzinfo=utc))
# when
result = events.count(
earliest=dt.datetime(2021, 12, 1, 12, 8, tzinfo=utc),
latest=dt.datetime(2021, 12, 1, 12, 17, tzinfo=utc),
)
# then
self.assertEqual(result, 2)
def test_should_count_events_within_timeframe_2(self):
# given
events = self.MyEventSeries()
events.clear()
events.add(dt.datetime(2021, 12, 1, 12, 0, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 10, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 15, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 30, tzinfo=utc))
# when
result = events.count(earliest=dt.datetime(2021, 12, 1, 12, 8))
# then
self.assertEqual(result, 3)
def test_should_count_events_within_timeframe_3(self):
# given
events = self.MyEventSeries()
events.clear()
events.add(dt.datetime(2021, 12, 1, 12, 0, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 10, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 15, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 30, tzinfo=utc))
# when
result = events.count(latest=dt.datetime(2021, 12, 1, 12, 12))
# then
self.assertEqual(result, 2)
def test_should_clear_events(self):
# given
events = self.MyEventSeries()
events.clear()
events.add()
events.add()
# when
events.clear()
# then
self.assertEqual(events.count(), 0)
def test_should_return_date_of_first_event(self):
# given
events = self.MyEventSeries()
events.clear()
events.add(dt.datetime(2021, 12, 1, 12, 0, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 10, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 15, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 30, tzinfo=utc))
# when
result = events.first_event()
# then
self.assertEqual(result, dt.datetime(2021, 12, 1, 12, 0, tzinfo=utc))
def test_should_return_date_of_first_event_with_range(self):
# given
events = self.MyEventSeries()
events.clear()
events.add(dt.datetime(2021, 12, 1, 12, 0, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 10, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 15, tzinfo=utc))
events.add(dt.datetime(2021, 12, 1, 12, 30, tzinfo=utc))
# when
result = events.first_event(
earliest=dt.datetime(2021, 12, 1, 12, 8, tzinfo=utc)
)
# then
self.assertEqual(result, dt.datetime(2021, 12, 1, 12, 10, tzinfo=utc))
def test_should_return_all_events(self):
# given
events = self.MyEventSeries()
events.clear()
events.add()
events.add()
# when
results = events.all()
# then
self.assertEqual(len(results), 2)
class TestDashboardResults(TestCase):
def test_should_return_counts_for_given_timeframe_only(self):
# given
earliest_task = now() - dt.timedelta(minutes=15)
succeeded = SucceededTaskSeries()
succeeded.clear()
succeeded.add(now() - dt.timedelta(hours=1, seconds=1))
succeeded.add(earliest_task)
succeeded.add()
succeeded.add()
retried = RetriedTaskSeries()
retried.clear()
retried.add(now() - dt.timedelta(hours=1, seconds=1))
retried.add(now() - dt.timedelta(seconds=30))
retried.add()
failed = FailedTaskSeries()
failed.clear()
failed.add(now() - dt.timedelta(hours=1, seconds=1))
failed.add()
# when
results = dashboard_results(hours=1)
# then
self.assertEqual(results.succeeded, 3)
self.assertEqual(results.retried, 2)
self.assertEqual(results.failed, 1)
self.assertEqual(results.total, 6)
self.assertEqual(results.earliest_task, earliest_task)
def test_should_work_with_no_data(self):
# given
succeeded = SucceededTaskSeries()
succeeded.clear()
retried = RetriedTaskSeries()
retried.clear()
failed = FailedTaskSeries()
failed.clear()
# when
results = dashboard_results(hours=1)
# then
self.assertEqual(results.succeeded, 0)
self.assertEqual(results.retried, 0)
self.assertEqual(results.failed, 0)
self.assertEqual(results.total, 0)
self.assertIsNone(results.earliest_task)

View File

@@ -0,0 +1,93 @@
from unittest.mock import patch
from celery.exceptions import Retry
from django.test import TestCase, override_settings
from allianceauth.authentication.task_statistics.event_series import (
FailedTaskSeries,
RetriedTaskSeries,
SucceededTaskSeries,
)
from allianceauth.authentication.task_statistics.signals import (
reset_counters,
is_enabled,
)
from allianceauth.eveonline.tasks import update_character
@override_settings(
CELERY_ALWAYS_EAGER=True, ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED=False
)
class TestTaskSignals(TestCase):
fixtures = ["disable_analytics"]
def test_should_record_successful_task(self):
# given
events = SucceededTaskSeries()
events.clear()
# when
with patch(
"allianceauth.eveonline.tasks.EveCharacter.objects.update_character"
) as mock_update:
mock_update.return_value = None
update_character.delay(1)
# then
self.assertEqual(events.count(), 1)
def test_should_record_retried_task(self):
# given
events = RetriedTaskSeries()
events.clear()
# when
with patch(
"allianceauth.eveonline.tasks.EveCharacter.objects.update_character"
) as mock_update:
mock_update.side_effect = Retry
update_character.delay(1)
# then
self.assertEqual(events.count(), 1)
def test_should_record_failed_task(self):
# given
events = FailedTaskSeries()
events.clear()
# when
with patch(
"allianceauth.eveonline.tasks.EveCharacter.objects.update_character"
) as mock_update:
mock_update.side_effect = RuntimeError
update_character.delay(1)
# then
self.assertEqual(events.count(), 1)
@override_settings(ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED=False)
class TestResetCounters(TestCase):
def test_should_reset_counters(self):
# given
succeeded = SucceededTaskSeries()
succeeded.clear()
succeeded.add()
retried = RetriedTaskSeries()
retried.clear()
retried.add()
failed = FailedTaskSeries()
failed.clear()
failed.add()
# when
reset_counters()
# then
self.assertEqual(succeeded.count(), 0)
self.assertEqual(retried.count(), 0)
self.assertEqual(failed.count(), 0)
class TestIsEnabled(TestCase):
@override_settings(ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED=False)
def test_enabled(self):
self.assertTrue(is_enabled())
@override_settings(ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED=True)
def test_disabled(self):
self.assertFalse(is_enabled())