Merge branch 'feature-show-running-tasks' into 'master'

Show running tasks on dashboard

See merge request allianceauth/allianceauth!1515
This commit is contained in:
Ariel Rin 2023-08-01 10:15:43 +00:00
commit 5d6a4ab1a9
9 changed files with 189 additions and 37 deletions

View File

@ -1,18 +1,26 @@
from collections import namedtuple """Counters for Task Statistics."""
import datetime as dt import datetime as dt
from typing import NamedTuple, Optional
from .event_series import EventSeries from .event_series import EventSeries
from .helpers import ItemCounter
# Global series for counting task events.
"""Global series for counting task events."""
succeeded_tasks = EventSeries("SUCCEEDED_TASKS") succeeded_tasks = EventSeries("SUCCEEDED_TASKS")
retried_tasks = EventSeries("RETRIED_TASKS") retried_tasks = EventSeries("RETRIED_TASKS")
failed_tasks = EventSeries("FAILED_TASKS") failed_tasks = EventSeries("FAILED_TASKS")
running_tasks = ItemCounter("running_tasks")
_TaskCounts = namedtuple( class _TaskCounts(NamedTuple):
"_TaskCounts", ["succeeded", "retried", "failed", "total", "earliest_task", "hours"] succeeded: int
) retried: int
failed: int
total: int
earliest_task: Optional[dt.datetime]
hours: int
running: int
def dashboard_results(hours: int) -> _TaskCounts: def dashboard_results(hours: int) -> _TaskCounts:
@ -23,13 +31,14 @@ def dashboard_results(hours: int) -> _TaskCounts:
return [my_earliest] if my_earliest else [] return [my_earliest] if my_earliest else []
earliest = dt.datetime.utcnow() - dt.timedelta(hours=hours) earliest = dt.datetime.utcnow() - dt.timedelta(hours=hours)
earliest_events = list() earliest_events = []
succeeded_count = succeeded_tasks.count(earliest=earliest) succeeded_count = succeeded_tasks.count(earliest=earliest)
earliest_events += earliest_if_exists(succeeded_tasks, earliest) earliest_events += earliest_if_exists(succeeded_tasks, earliest)
retried_count = retried_tasks.count(earliest=earliest) retried_count = retried_tasks.count(earliest=earliest)
earliest_events += earliest_if_exists(retried_tasks, earliest) earliest_events += earliest_if_exists(retried_tasks, earliest)
failed_count = failed_tasks.count(earliest=earliest) failed_count = failed_tasks.count(earliest=earliest)
earliest_events += earliest_if_exists(failed_tasks, earliest) earliest_events += earliest_if_exists(failed_tasks, earliest)
running_count = running_tasks.value()
return _TaskCounts( return _TaskCounts(
succeeded=succeeded_count, succeeded=succeeded_count,
retried=retried_count, retried=retried_count,
@ -37,4 +46,5 @@ def dashboard_results(hours: int) -> _TaskCounts:
total=succeeded_count + retried_count + failed_count, total=succeeded_count + retried_count + failed_count,
earliest_task=min(earliest_events) if earliest_events else None, earliest_task=min(earliest_events) if earliest_events else None,
hours=hours, hours=hours,
running=running_count,
) )

View File

@ -1,3 +1,5 @@
"""Event series for Task Statistics."""
import datetime as dt import datetime as dt
import logging import logging
from typing import List, Optional from typing import List, Optional
@ -73,8 +75,8 @@ class EventSeries:
""" """
if not event_time: if not event_time:
event_time = dt.datetime.utcnow() event_time = dt.datetime.utcnow()
id = self._redis.incr(self._key_counter) my_id = self._redis.incr(self._key_counter)
self._redis.zadd(self._key_sorted_set, {id: event_time.timestamp()}) self._redis.zadd(self._key_sorted_set, {my_id: event_time.timestamp()})
def all(self) -> List[dt.datetime]: def all(self) -> List[dt.datetime]:
"""List of all known events.""" """List of all known events."""
@ -101,9 +103,9 @@ class EventSeries:
- earliest: Date of first events to count(inclusive), or -infinite if not specified - earliest: Date of first events to count(inclusive), or -infinite if not specified
- latest: Date of last events to count(inclusive), or +infinite if not specified - latest: Date of last events to count(inclusive), or +infinite if not specified
""" """
min = "-inf" if not earliest else earliest.timestamp() minimum = "-inf" if not earliest else earliest.timestamp()
max = "+inf" if not latest else latest.timestamp() maximum = "+inf" if not latest else latest.timestamp()
return self._redis.zcount(self._key_sorted_set, min=min, max=max) return self._redis.zcount(self._key_sorted_set, min=minimum, max=maximum)
def first_event(self, earliest: dt.datetime = None) -> Optional[dt.datetime]: def first_event(self, earliest: dt.datetime = None) -> Optional[dt.datetime]:
"""Date/Time of first event. Returns `None` if series has no events. """Date/Time of first event. Returns `None` if series has no events.
@ -111,10 +113,10 @@ class EventSeries:
Args: Args:
- earliest: Date of first events to count(inclusive), or any if not specified - earliest: Date of first events to count(inclusive), or any if not specified
""" """
min = "-inf" if not earliest else earliest.timestamp() minimum = "-inf" if not earliest else earliest.timestamp()
event = self._redis.zrangebyscore( event = self._redis.zrangebyscore(
self._key_sorted_set, self._key_sorted_set,
min, minimum,
"+inf", "+inf",
withscores=True, withscores=True,
start=0, start=0,

View File

@ -0,0 +1,44 @@
"""Helpers for Task Statistics."""
from typing import Optional
from django.core.cache import cache
class ItemCounter:
"""A process safe item counter."""
CACHE_KEY_BASE = "allianceauth-item-counter"
DEFAULT_CACHE_TIMEOUT = 24 * 3600
def __init__(self, name: str) -> None:
if not name:
raise ValueError("Must define a name")
self._name = str(name)
@property
def _cache_key(self) -> str:
return f"{self.CACHE_KEY_BASE}-{self._name}"
def reset(self, init_value: int = 0):
"""Reset counter to initial value."""
cache.set(self._cache_key, init_value, self.DEFAULT_CACHE_TIMEOUT)
def incr(self, delta: int = 1):
"""Increment counter by delta."""
try:
cache.incr(self._cache_key, delta)
except ValueError:
pass
def decr(self, delta: int = 1):
"""Decrement counter by delta."""
try:
cache.decr(self._cache_key, delta)
except ValueError:
pass
def value(self) -> Optional[int]:
"""Return current value or None if not yet initialized."""
return cache.get(self._cache_key)

View File

@ -1,14 +1,15 @@
"""Signals for Task Statistics."""
from celery.signals import ( from celery.signals import (
task_failure, task_failure, task_internal_error, task_postrun, task_prerun, task_retry,
task_internal_error, task_success, worker_ready,
task_retry,
task_success,
worker_ready
) )
from django.conf import settings from django.conf import settings
from .counters import failed_tasks, retried_tasks, succeeded_tasks from .counters import (
failed_tasks, retried_tasks, running_tasks, succeeded_tasks,
)
def reset_counters(): def reset_counters():
@ -16,9 +17,11 @@ def reset_counters():
succeeded_tasks.clear() succeeded_tasks.clear()
failed_tasks.clear() failed_tasks.clear()
retried_tasks.clear() retried_tasks.clear()
running_tasks.reset()
def is_enabled() -> bool: def is_enabled() -> bool:
"""Return True if task statistics are enabled, else return False."""
return not bool( return not bool(
getattr(settings, "ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED", False) getattr(settings, "ALLIANCEAUTH_DASHBOARD_TASK_STATISTICS_DISABLED", False)
) )
@ -52,3 +55,15 @@ def record_task_failed(*args, **kwargs):
def record_task_internal_error(*args, **kwargs): def record_task_internal_error(*args, **kwargs):
if is_enabled(): if is_enabled():
failed_tasks.add() failed_tasks.add()
@task_prerun.connect
def record_task_prerun(*args, **kwargs):
if is_enabled():
running_tasks.incr()
@task_postrun.connect
def record_task_postrun(*args, **kwargs):
if is_enabled():
running_tasks.decr()

View File

@ -8,25 +8,31 @@ from allianceauth.authentication.task_statistics.counters import (
succeeded_tasks, succeeded_tasks,
retried_tasks, retried_tasks,
failed_tasks, failed_tasks,
running_tasks,
) )
class TestDashboardResults(TestCase): class TestDashboardResults(TestCase):
def test_should_return_counts_for_given_timeframe_only(self): def test_should_return_counts_for_given_time_frame_only(self):
# given # given
earliest_task = now() - dt.timedelta(minutes=15) earliest_task = now() - dt.timedelta(minutes=15)
succeeded_tasks.clear() succeeded_tasks.clear()
succeeded_tasks.add(now() - dt.timedelta(hours=1, seconds=1)) succeeded_tasks.add(now() - dt.timedelta(hours=1, seconds=1))
succeeded_tasks.add(earliest_task) succeeded_tasks.add(earliest_task)
succeeded_tasks.add() succeeded_tasks.add()
succeeded_tasks.add() succeeded_tasks.add()
retried_tasks.clear() retried_tasks.clear()
retried_tasks.add(now() - dt.timedelta(hours=1, seconds=1)) retried_tasks.add(now() - dt.timedelta(hours=1, seconds=1))
retried_tasks.add(now() - dt.timedelta(seconds=30)) retried_tasks.add(now() - dt.timedelta(seconds=30))
retried_tasks.add() retried_tasks.add()
failed_tasks.clear() failed_tasks.clear()
failed_tasks.add(now() - dt.timedelta(hours=1, seconds=1)) failed_tasks.add(now() - dt.timedelta(hours=1, seconds=1))
failed_tasks.add() failed_tasks.add()
running_tasks.reset(8)
# when # when
results = dashboard_results(hours=1) results = dashboard_results(hours=1)
# then # then
@ -35,12 +41,14 @@ class TestDashboardResults(TestCase):
self.assertEqual(results.failed, 1) self.assertEqual(results.failed, 1)
self.assertEqual(results.total, 6) self.assertEqual(results.total, 6)
self.assertEqual(results.earliest_task, earliest_task) self.assertEqual(results.earliest_task, earliest_task)
self.assertEqual(results.running, 8)
def test_should_work_with_no_data(self): def test_should_work_with_no_data(self):
# given # given
succeeded_tasks.clear() succeeded_tasks.clear()
retried_tasks.clear() retried_tasks.clear()
failed_tasks.clear() failed_tasks.clear()
running_tasks.reset()
# when # when
results = dashboard_results(hours=1) results = dashboard_results(hours=1)
# then # then
@ -49,3 +57,4 @@ class TestDashboardResults(TestCase):
self.assertEqual(results.failed, 0) self.assertEqual(results.failed, 0)
self.assertEqual(results.total, 0) self.assertEqual(results.total, 0)
self.assertIsNone(results.earliest_task) self.assertIsNone(results.earliest_task)
self.assertEqual(results.running, 0)

View File

@ -0,0 +1,74 @@
from unittest import TestCase
from allianceauth.authentication.task_statistics.helpers import ItemCounter
COUNTER_NAME = "test-counter"
class TestItemCounter(TestCase):
def test_can_create_counter(self):
# when
counter = ItemCounter(COUNTER_NAME)
# then
self.assertIsInstance(counter, ItemCounter)
def test_can_reset_counter_to_default(self):
# given
counter = ItemCounter(COUNTER_NAME)
# when
counter.reset()
# then
self.assertEqual(counter.value(), 0)
def test_can_reset_counter_to_custom_value(self):
# given
counter = ItemCounter(COUNTER_NAME)
# when
counter.reset(42)
# then
self.assertEqual(counter.value(), 42)
def test_can_increment_counter_by_default(self):
# given
counter = ItemCounter(COUNTER_NAME)
counter.reset(0)
# when
counter.incr()
# then
self.assertEqual(counter.value(), 1)
def test_can_increment_counter_by_custom_value(self):
# given
counter = ItemCounter(COUNTER_NAME)
counter.reset(0)
# when
counter.incr(8)
# then
self.assertEqual(counter.value(), 8)
def test_can_decrement_counter_by_default(self):
# given
counter = ItemCounter(COUNTER_NAME)
counter.reset(9)
# when
counter.decr()
# then
self.assertEqual(counter.value(), 8)
def test_can_decrement_counter_by_custom_value(self):
# given
counter = ItemCounter(COUNTER_NAME)
counter.reset(9)
# when
counter.decr(8)
# then
self.assertEqual(counter.value(), 1)
def test_can_decrement_counter_below_zero(self):
# given
counter = ItemCounter(COUNTER_NAME)
counter.reset(0)
# when
counter.decr(1)
# then
self.assertEqual(counter.value(), -1)

View File

@ -22,11 +22,12 @@ from allianceauth.eveonline.tasks import update_character
class TestTaskSignals(TestCase): class TestTaskSignals(TestCase):
fixtures = ["disable_analytics"] fixtures = ["disable_analytics"]
def test_should_record_successful_task(self): def setUp(self) -> None:
# given
succeeded_tasks.clear() succeeded_tasks.clear()
retried_tasks.clear() retried_tasks.clear()
failed_tasks.clear() failed_tasks.clear()
def test_should_record_successful_task(self):
# when # when
with patch( with patch(
"allianceauth.eveonline.tasks.EveCharacter.objects.update_character" "allianceauth.eveonline.tasks.EveCharacter.objects.update_character"
@ -39,10 +40,6 @@ class TestTaskSignals(TestCase):
self.assertEqual(failed_tasks.count(), 0) self.assertEqual(failed_tasks.count(), 0)
def test_should_record_retried_task(self): def test_should_record_retried_task(self):
# given
succeeded_tasks.clear()
retried_tasks.clear()
failed_tasks.clear()
# when # when
with patch( with patch(
"allianceauth.eveonline.tasks.EveCharacter.objects.update_character" "allianceauth.eveonline.tasks.EveCharacter.objects.update_character"
@ -55,10 +52,6 @@ class TestTaskSignals(TestCase):
self.assertEqual(retried_tasks.count(), 1) self.assertEqual(retried_tasks.count(), 1)
def test_should_record_failed_task(self): def test_should_record_failed_task(self):
# given
succeeded_tasks.clear()
retried_tasks.clear()
failed_tasks.clear()
# when # when
with patch( with patch(
"allianceauth.eveonline.tasks.EveCharacter.objects.update_character" "allianceauth.eveonline.tasks.EveCharacter.objects.update_character"

View File

@ -92,8 +92,11 @@
{% include "allianceauth/admin-status/celery_bar_partial.html" with label="failed" level="danger" tasks_count=tasks_failed %} {% include "allianceauth/admin-status/celery_bar_partial.html" with label="failed" level="danger" tasks_count=tasks_failed %}
</div> </div>
<p> <p>
{% blocktranslate with running_count=tasks_running|default_if_none:"?"|intcomma %}
{{ running_count }} running |
{% endblocktranslate %}
{% blocktranslate with queue_length=task_queue_length|default_if_none:"?"|intcomma %} {% blocktranslate with queue_length=task_queue_length|default_if_none:"?"|intcomma %}
{{ queue_length }} queued tasks {{ queue_length }} queued
{% endblocktranslate %} {% endblocktranslate %}
</p> </p>
</div> </div>

View File

@ -54,7 +54,8 @@ def status_overview() -> dict:
"tasks_failed": 0, "tasks_failed": 0,
"tasks_total": 0, "tasks_total": 0,
"tasks_hours": 0, "tasks_hours": 0,
"earliest_task": None "earliest_task": None,
"tasks_running": 0
} }
response.update(_current_notifications()) response.update(_current_notifications())
response.update(_current_version_summary()) response.update(_current_version_summary())
@ -72,7 +73,8 @@ def _celery_stats() -> dict:
"tasks_failed": results.failed, "tasks_failed": results.failed,
"tasks_total": results.total, "tasks_total": results.total,
"tasks_hours": results.hours, "tasks_hours": results.hours,
"earliest_task": results.earliest_task "earliest_task": results.earliest_task,
"tasks_running": results.running,
} }