Merge remote-tracking branch 'upstream/master' into v2.9.x

This commit is contained in:
Joel Falknau 2021-08-20 14:41:38 +10:00
commit 392a0c4dcb
11 changed files with 224 additions and 50 deletions

View File

@ -40,9 +40,6 @@ class AuthGroupInlineAdmin(admin.StackedInline):
kwargs["queryset"] = Group.objects.order_by(Lower('name'))
return super().formfield_for_manytomany(db_field, request, **kwargs)
def has_add_permission(self, request, obj=None):
return False
def has_delete_permission(self, request, obj=None):
return False
@ -139,7 +136,7 @@ class GroupAdmin(admin.ModelAdmin):
_member_count.admin_order_field = 'member_count'
def has_leader(self, obj):
return obj.authgroup.group_leaders.exists()
return obj.authgroup.group_leaders.exists() or obj.authgroup.group_leader_groups.exists()
has_leader.boolean = True
@ -174,6 +171,13 @@ class GroupAdmin(admin.ModelAdmin):
kwargs["queryset"] = Permission.objects.select_related("content_type").all()
return super().formfield_for_manytomany(db_field, request, **kwargs)
def save_formset(self, request, form, formset, change):
for inline_form in formset:
ag_instance = inline_form.save(commit=False)
ag_instance.group = form.instance
ag_instance.save()
formset.save()
class Group(BaseGroup):
class Meta:

View File

@ -55,7 +55,6 @@ class RequestLog(models.Model):
return user.profile.main_character
class AuthGroup(models.Model):
"""
Extends Django Group model with a one-to-one field
@ -106,7 +105,8 @@ class AuthGroup(models.Model):
help_text="States listed here will have the ability to join this group provided "
"they have the proper permissions.")
description = models.TextField(max_length=512, blank=True, help_text="Short description <i>(max. 512 characters)</i> of the group shown to users.")
description = models.TextField(max_length=512, blank=True, help_text="Short description <i>(max. 512 characters)"
"</i> of the group shown to users.")
def __str__(self):
return self.group.name

View File

@ -48,6 +48,7 @@ class TestGroupAdmin(TestCase):
cls.group_2 = Group.objects.create(name='Group 2')
cls.group_2.authgroup.description = 'Internal Group'
cls.group_2.authgroup.internal = True
cls.group_2.authgroup.group_leader_groups.add(cls.group_1)
cls.group_2.authgroup.save()
# group 3 - has leader
@ -237,10 +238,14 @@ class TestGroupAdmin(TestCase):
result = self.modeladmin._member_count(obj)
self.assertEqual(result, expected)
def test_has_leader(self):
def test_has_leader_user(self):
result = self.modeladmin.has_leader(self.group_1)
self.assertTrue(result)
def test_has_leader_group(self):
result = self.modeladmin.has_leader(self.group_2)
self.assertTrue(result)
def test_properties_1(self):
expected = ['Default']
result = self.modeladmin._properties(self.group_1)

View File

@ -1,4 +1,35 @@
from django.contrib import admin
from .models import Notification
admin.site.register(Notification)
@admin.register(Notification)
class NotificationAdmin(admin.ModelAdmin):
list_display = ("timestamp", "_main", "_state", "title", "level", "viewed")
list_select_related = ("user", "user__profile__main_character", "user__profile__state")
list_filter = (
"level",
"timestamp",
"user__profile__state",
('user__profile__main_character', admin.RelatedOnlyFieldListFilter),
)
ordering = ("-timestamp", )
search_fields = ["user__username", "user__profile__main_character__character_name"]
def _main(self, obj):
try:
return obj.user.profile.main_character
except AttributeError:
return obj.user
_main.admin_order_field = "user__profile__main_character__character_name"
def _state(self, obj):
return obj.user.profile.state
_state.admin_order_field = "user__profile__state__name"
def has_change_permission(self, request, obj=None):
return False
def has_add_permission(self, request) -> bool:
return False

View File

@ -12,21 +12,20 @@ class NotificationHandler(logging.Handler):
try:
perm = Permission.objects.get(codename="logging_notifications")
message = record.getMessage()
if record.exc_text:
message += "\n\n"
message = message + record.exc_text
users = User.objects.filter(
Q(groups__permissions=perm) | Q(user_permissions=perm) | Q(is_superuser=True)).distinct()
for user in users:
notify(
user,
"%s [%s:%s]" % (record.levelname, record.funcName, record.lineno),
level=str([item[0] for item in Notification.LEVEL_CHOICES if item[1] == record.levelname][0]),
message=message
)
except Permission.DoesNotExist:
pass
return
message = record.getMessage()
if record.exc_text:
message += "\n\n"
message = message + record.exc_text
users = User.objects.filter(
Q(groups__permissions=perm) | Q(user_permissions=perm) | Q(is_superuser=True)).distinct()
for user in users:
notify(
user,
"%s [%s:%s]" % (record.levelname, record.funcName, record.lineno),
level=Notification.Level.from_old_name(record.levelname),
message=message
)

View File

@ -12,7 +12,7 @@ class NotificationQuerySet(models.QuerySet):
"""Custom QuerySet for Notification model"""
def update(self, *args, **kwargs):
# overriden update to ensure cache is invaidated on very call
"""Override update to ensure cache is invalidated on very call."""
super().update(*args, **kwargs)
user_pks = set(self.select_related("user").values_list('user__pk', flat=True))
for user_pk in user_pks:
@ -43,6 +43,8 @@ class NotificationManager(models.Manager):
if not message:
message = title
if level not in self.model.Level:
level = self.model.Level.INFO
obj = self.create(user=user, title=title, message=message, level=level)
logger.info("Created notification %s", obj)
return obj

View File

@ -0,0 +1,18 @@
# Generated by Django 3.1.12 on 2021-07-01 21:22
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('notifications', '0004_performance_tuning'),
]
operations = [
migrations.AlterField(
model_name='notification',
name='level',
field=models.CharField(choices=[('danger', 'danger'), ('warning', 'warning'), ('info', 'info'), ('success', 'success')], default='info', max_length=10),
),
]

View File

@ -2,6 +2,7 @@ import logging
from django.db import models
from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy as _
from .managers import NotificationManager
@ -14,16 +15,42 @@ class Notification(models.Model):
NOTIFICATIONS_MAX_PER_USER_DEFAULT = 50
NOTIFICATIONS_REFRESH_TIME_DEFAULT = 30
LEVEL_CHOICES = (
('danger', 'CRITICAL'),
('danger', 'ERROR'),
('warning', 'WARN'),
('info', 'INFO'),
('success', 'DEBUG'),
)
class Level(models.TextChoices):
"""A notification level."""
DANGER = 'danger', _('danger') #:
WARNING = 'warning', _('warning') #:
INFO = 'info', _('info') #:
SUCCESS = 'success', _('success') #:
@classmethod
def from_old_name(cls, name: str) -> object:
"""Map old name to enum.
Raises ValueError for invalid names.
"""
name_map = {
"CRITICAL": cls.DANGER,
"ERROR": cls.DANGER,
"WARN": cls.WARNING,
"INFO": cls.INFO,
"DEBUG": cls.SUCCESS,
}
try:
return name_map[name]
except KeyError:
raise ValueError(f"Unknown name: {name}") from None
# LEVEL_CHOICES = (
# ('danger', 'CRITICAL'),
# ('danger', 'ERROR'),
# ('warning', 'WARN'),
# ('info', 'INFO'),
# ('success', 'DEBUG'),
# )
user = models.ForeignKey(User, on_delete=models.CASCADE)
level = models.CharField(choices=LEVEL_CHOICES, max_length=10)
level = models.CharField(choices=Level.choices, max_length=10, default=Level.INFO)
title = models.CharField(max_length=254)
message = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True, db_index=True)
@ -45,22 +72,15 @@ class Notification(models.Model):
Notification.objects.invalidate_user_notification_cache(self.user.pk)
def mark_viewed(self) -> None:
"""mark notification as viewed"""
"""Mark notification as viewed."""
logger.info("Marking notification as viewed: %s" % self)
self.viewed = True
self.save()
def set_level(self, level_name: str) -> None:
"""set notification level according to level name, e.g. 'CRITICAL'
"""Set notification level according to old level name, e.g. 'CRITICAL'.
raised exception on invalid level names
Raises ValueError on invalid level names.
"""
try:
new_level = [
item[0] for item in self.LEVEL_CHOICES if item[1] == level_name
][0]
except IndexError:
raise ValueError('Invalid level name: %s' % level_name)
self.level = new_level
self.level = self.Level.from_old_name(level_name)
self.save()

View File

@ -0,0 +1,69 @@
from logging import LogRecord, DEBUG
from django.contrib.auth.models import Permission, Group, User
from django.test import TestCase
from allianceauth.tests.auth_utils import AuthUtils
from ..handlers import NotificationHandler
from ..models import Notification
MODULE_PATH = 'allianceauth.notifications.handlers'
class TestHandler(TestCase):
def test_do_nothing_if_permission_does_not_exist(self):
# given
Permission.objects.get(codename="logging_notifications").delete()
handler = NotificationHandler()
record = LogRecord(
name="name",
level=DEBUG,
pathname="pathname",
lineno=42,
msg="msg",
args=[],
exc_info=None,
func="func"
)
# when
handler.emit(record)
# then
self.assertEqual(Notification.objects.count(), 0)
def test_should_emit_message_to_users_with_permission_only(self):
# given
AuthUtils.create_user('Lex Luthor')
user_permission = AuthUtils.create_user('Bruce Wayne')
user_permission = AuthUtils.add_permission_to_user_by_name(
"auth.logging_notifications", user_permission
)
group = Group.objects.create(name="Dummy Group")
perm = Permission.objects.get(codename="logging_notifications")
group.permissions.add(perm)
user_group = AuthUtils.create_user('Peter Parker')
user_group.groups.add(group)
user_superuser = User.objects.create_superuser("Clark Kent")
handler = NotificationHandler()
record = LogRecord(
name="name",
level=DEBUG,
pathname="pathname",
lineno=42,
msg="msg",
args=[],
exc_info=None,
func="func"
)
# when
handler.emit(record)
# then
self.assertEqual(Notification.objects.count(), 3)
users = set(Notification.objects.values_list("user__pk", flat=True))
self.assertSetEqual(
users, {user_permission.pk, user_group.pk, user_superuser.pk}
)
notif = Notification.objects.first()
self.assertEqual(notif.user, user_permission)
self.assertEqual(notif.title, "DEBUG [func:42]")
self.assertEqual(notif.level, "success")
self.assertEqual(notif.message, "msg")

View File

@ -65,6 +65,35 @@ class TestUserNotify(TestCase):
self.assertEqual(obj.title, title)
self.assertEqual(obj.message, title)
def test_should_use_default_level_when_not_specified(self):
# given
title = 'dummy_title'
message = 'dummy message'
# when
Notification.objects.notify_user(self.user, title, message)
# then
self.assertEqual(Notification.objects.filter(user=self.user).count(), 1)
obj = Notification.objects.first()
self.assertEqual(obj.user, self.user)
self.assertEqual(obj.title, title)
self.assertEqual(obj.message, message)
self.assertEqual(obj.level, Notification.Level.INFO)
def test_should_use_default_level_when_invalid_level_given(self):
# given
title = 'dummy_title'
message = 'dummy message'
level = "invalid"
# when
Notification.objects.notify_user(self.user, title, message, level)
# then
self.assertEqual(Notification.objects.filter(user=self.user).count(), 1)
obj = Notification.objects.first()
self.assertEqual(obj.user, self.user)
self.assertEqual(obj.title, title)
self.assertEqual(obj.message, message)
self.assertEqual(obj.level, Notification.Level.INFO)
@override_settings(NOTIFICATIONS_MAX_PER_USER=3)
def test_remove_when_too_many_notifications(self):
Notification.objects.notify_user(self.user, 'dummy')

View File

@ -7,19 +7,16 @@ The notifications package has an API for sending notifications.
Location: ``allianceauth.notifications``
.. automodule:: allianceauth.notifications.__init__
:members: notify
:undoc-members:
:members: notify
models
===========
.. autoclass:: allianceauth.notifications.models.Notification
:members: LEVEL_CHOICES, mark_viewed, set_level
:undoc-members:
:members: Level, mark_viewed, set_level
managers
===========
.. autoclass:: allianceauth.notifications.managers.NotificationManager
:members: notify_user, user_unread_count
:undoc-members: