mirror of
https://gitlab.com/allianceauth/allianceauth.git
synced 2025-07-14 23:10:15 +02:00
Improve state backend fix, add tests for state backend and service signal changes
This commit is contained in:
parent
5c7478fa39
commit
eef6126ef8
@ -1,7 +1,8 @@
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.auth.models import User
|
||||
import logging
|
||||
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
from django.contrib.auth.models import User, Permission
|
||||
|
||||
from .models import UserProfile, CharacterOwnership, OwnershipRecord
|
||||
|
||||
|
||||
@ -11,16 +12,11 @@ logger = logging.getLogger(__name__)
|
||||
class StateBackend(ModelBackend):
|
||||
@staticmethod
|
||||
def _get_state_permissions(user_obj):
|
||||
"""returns permissions for state of given user object"""
|
||||
"""
|
||||
profile_state_field = UserProfile._meta.get_field('state')
|
||||
user_state_query = 'state__%s__user' % profile_state_field.related_query_name()
|
||||
return Permission.objects.filter(**{user_state_query: user_obj})
|
||||
"""
|
||||
"""returns permissions for state of given user object"""
|
||||
if hasattr(user_obj, "profile") and user_obj.profile:
|
||||
return Permission.objects.filter(state=user_obj.profile.state)
|
||||
return Permission.objects.filter(state=user_obj.profile.state)
|
||||
else:
|
||||
return []
|
||||
return Permission.objects.none()
|
||||
|
||||
def get_state_permissions(self, user_obj, obj=None):
|
||||
return self._get_permissions(user_obj, obj, 'state')
|
||||
|
85
allianceauth/authentication/tests/test_backend.py
Normal file
85
allianceauth/authentication/tests/test_backend.py
Normal file
@ -0,0 +1,85 @@
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from django.contrib.auth.models import User, Group
|
||||
from django.test import TestCase
|
||||
from allianceauth.tests.auth_utils import AuthUtils
|
||||
|
||||
from ..backends import StateBackend
|
||||
|
||||
MODULE_PATH = 'allianceauth.authentication'
|
||||
|
||||
PERMISSION_1 = "authentication.add_user"
|
||||
PERMISSION_2 = "authentication.change_user"
|
||||
|
||||
|
||||
class TestStatePermissions(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# permissions
|
||||
self.permission_1 = AuthUtils.get_permission_by_name(PERMISSION_1)
|
||||
self.permission_2 = AuthUtils.get_permission_by_name(PERMISSION_2)
|
||||
|
||||
# group
|
||||
self.group_1 = Group.objects.create(name="Group 1")
|
||||
self.group_2 = Group.objects.create(name="Group 2")
|
||||
|
||||
# state
|
||||
self.state_1 = AuthUtils.get_member_state()
|
||||
self.state_2 = AuthUtils.create_state("Blue", 50)
|
||||
|
||||
# user
|
||||
self.user = AuthUtils.create_user("Bruce Wayne")
|
||||
self.main = AuthUtils.add_main_character_2(self.user, self.user.username, 123)
|
||||
|
||||
def test_user_has_user_permissions(self):
|
||||
self.user.user_permissions.add(self.permission_1)
|
||||
|
||||
user = User.objects.get(pk=self.user.pk)
|
||||
self.assertTrue(user.has_perm(PERMISSION_1))
|
||||
|
||||
def test_user_has_group_permissions(self):
|
||||
self.group_1.permissions.add(self.permission_1)
|
||||
self.user.groups.add(self.group_1)
|
||||
|
||||
user = User.objects.get(pk=self.user.pk)
|
||||
self.assertTrue(user.has_perm(PERMISSION_1))
|
||||
|
||||
def test_user_has_state_permissions(self):
|
||||
self.state_1.permissions.add(self.permission_1)
|
||||
self.state_1.member_characters.add(self.main)
|
||||
user = User.objects.get(pk=self.user.pk)
|
||||
|
||||
self.assertTrue(user.has_perm(PERMISSION_1))
|
||||
|
||||
def test_when_user_changes_state_perms_change_accordingly(self):
|
||||
self.state_1.permissions.add(self.permission_1)
|
||||
self.state_1.member_characters.add(self.main)
|
||||
user = User.objects.get(pk=self.user.pk)
|
||||
self.assertTrue(user.has_perm(PERMISSION_1))
|
||||
|
||||
self.state_2.permissions.add(self.permission_2)
|
||||
self.state_2.member_characters.add(self.main)
|
||||
self.state_1.member_characters.remove(self.main)
|
||||
user = User.objects.get(pk=self.user.pk)
|
||||
self.assertFalse(user.has_perm(PERMISSION_1))
|
||||
self.assertTrue(user.has_perm(PERMISSION_2))
|
||||
|
||||
def test_state_permissions_are_returned_for_current_user_object(self):
|
||||
# verify state permissions are returns for the current user object
|
||||
# and not for it's instance in the database, which might be outdated
|
||||
self.state_1.permissions.add(self.permission_1)
|
||||
self.state_2.permissions.add(self.permission_2)
|
||||
self.state_1.member_characters.add(self.main)
|
||||
user = User.objects.get(pk=self.user.pk)
|
||||
user.profile.state = self.state_2
|
||||
self.assertFalse(user.has_perm(PERMISSION_1))
|
||||
self.assertTrue(user.has_perm(PERMISSION_2))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -154,27 +154,37 @@ def disable_services_on_inactive(sender, instance, *args, **kwargs):
|
||||
|
||||
|
||||
@receiver(pre_save, sender=UserProfile)
|
||||
def process_main_character_change(sender, instance, *args, **kwargs):
|
||||
|
||||
if not instance.pk: # ignore
|
||||
# new model being created
|
||||
def process_main_character_change(sender, instance, *args, **kwargs):
|
||||
if not instance.pk:
|
||||
# ignore new model being created
|
||||
return
|
||||
try:
|
||||
logger.debug(
|
||||
"Received pre_save from %s for process_main_character_change", instance
|
||||
)
|
||||
old_instance = UserProfile.objects.get(pk=instance.pk)
|
||||
if old_instance.main_character and not instance.main_character: # lost main char disable services
|
||||
logger.info("Disabling services due to loss of main character for user {0}".format(instance.user))
|
||||
if old_instance.main_character and not instance.main_character:
|
||||
logger.info(
|
||||
"Disabling services due to loss of main character for user %s",
|
||||
instance.user
|
||||
)
|
||||
disable_user(instance.user)
|
||||
elif old_instance.main_character != instance.main_character: # swapping/changing main character
|
||||
logger.info("Updating Names due to change of main character for user {0}".format(instance.user))
|
||||
elif old_instance.main_character != instance.main_character:
|
||||
logger.info(
|
||||
"Updating Names due to change of main character for user %s",
|
||||
instance.user
|
||||
)
|
||||
for svc in ServicesHook.get_services():
|
||||
try:
|
||||
svc.validate_user(instance.user)
|
||||
svc.sync_nickname(instance.user)
|
||||
except:
|
||||
logger.exception('Exception running sync_nickname for services module %s on user %s' % (svc, instance))
|
||||
logger.exception(
|
||||
"Exception running sync_nickname for services module %s "
|
||||
"on user %s",
|
||||
svc,
|
||||
instance
|
||||
)
|
||||
|
||||
except UserProfile.DoesNotExist:
|
||||
pass
|
||||
|
@ -1,15 +1,20 @@
|
||||
from copy import deepcopy
|
||||
from unittest import mock
|
||||
|
||||
from django.test import TestCase
|
||||
from django.contrib.auth.models import Group, Permission
|
||||
from allianceauth.tests.auth_utils import AuthUtils
|
||||
|
||||
from allianceauth.authentication.models import State
|
||||
from allianceauth.eveonline.models import EveCharacter
|
||||
from allianceauth.tests.auth_utils import AuthUtils
|
||||
|
||||
|
||||
class ServicesSignalsTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.member = AuthUtils.create_user('auth_member', disconnect_signals=True)
|
||||
AuthUtils.add_main_character(self.member, 'Test', '1', '2', 'Test Corp', 'TEST')
|
||||
AuthUtils.add_main_character_2(
|
||||
self.member, 'Test', 1, 2, 'Test Corp', 'TEST'
|
||||
)
|
||||
self.none_user = AuthUtils.create_user('none_user', disconnect_signals=True)
|
||||
|
||||
@mock.patch('allianceauth.services.signals.transaction')
|
||||
@ -46,7 +51,6 @@ class ServicesSignalsTestCase(TestCase):
|
||||
|
||||
@mock.patch('allianceauth.services.signals.disable_user')
|
||||
def test_pre_delete_user(self, disable_user):
|
||||
|
||||
"""
|
||||
Test that disable_member is called when a user is deleted
|
||||
"""
|
||||
@ -126,7 +130,9 @@ class ServicesSignalsTestCase(TestCase):
|
||||
transaction.on_commit = lambda fn: fn()
|
||||
|
||||
ct = ContentType.objects.get(app_label='auth', model='permission')
|
||||
perm = Permission.objects.create(name="Test perm", codename="access_testsvc", content_type=ct)
|
||||
perm = Permission.objects.create(
|
||||
name="Test perm", codename="access_testsvc", content_type=ct
|
||||
)
|
||||
self.member.user_permissions.add(perm)
|
||||
|
||||
# Act, should trigger m2m change
|
||||
@ -159,7 +165,9 @@ class ServicesSignalsTestCase(TestCase):
|
||||
AuthUtils.connect_signals()
|
||||
|
||||
ct = ContentType.objects.get(app_label='auth', model='permission')
|
||||
perm = Permission.objects.create(name="Test perm", codename="access_testsvc", content_type=ct)
|
||||
perm = Permission.objects.create(
|
||||
name="Test perm", codename="access_testsvc", content_type=ct
|
||||
)
|
||||
test_state.permissions.add(perm)
|
||||
|
||||
# Act, should trigger m2m change
|
||||
@ -173,12 +181,12 @@ class ServicesSignalsTestCase(TestCase):
|
||||
self.assertEqual(self.member, args[0])
|
||||
|
||||
@mock.patch('allianceauth.services.signals.ServicesHook')
|
||||
def test_state_changed_services_validation(self, services_hook):
|
||||
"""
|
||||
Test a user changing state has service accounts validated
|
||||
def test_state_changed_services_validation_and_groups_update(self, services_hook):
|
||||
"""Test a user changing state has service accounts validated and groups updated
|
||||
"""
|
||||
svc = mock.Mock()
|
||||
svc.validate_user.return_value = None
|
||||
svc.update_groups.return_value = None
|
||||
svc.access_perm = 'auth.access_testsvc'
|
||||
|
||||
services_hook.get_services.return_value = [svc]
|
||||
@ -190,7 +198,65 @@ class ServicesSignalsTestCase(TestCase):
|
||||
# Assert
|
||||
self.assertTrue(services_hook.get_services.called)
|
||||
|
||||
self.assertTrue(svc.validate_user.called)
|
||||
self.assertTrue(svc.validate_user.called)
|
||||
args, kwargs = svc.validate_user.call_args
|
||||
self.assertEqual(self.member, args[0])
|
||||
|
||||
|
||||
self.assertTrue(svc.update_groups.called)
|
||||
args, kwargs = svc.update_groups.call_args
|
||||
self.assertEqual(self.member, args[0])
|
||||
|
||||
|
||||
@mock.patch('allianceauth.services.signals.ServicesHook')
|
||||
def test_state_changed_services_validation_and_groups_update_1(self, services_hook):
|
||||
"""Test a user changing main has service accounts validated and sync updated
|
||||
"""
|
||||
svc = mock.Mock()
|
||||
svc.validate_user.return_value = None
|
||||
svc.sync_nickname.return_value = None
|
||||
svc.access_perm = 'auth.access_testsvc'
|
||||
|
||||
services_hook.get_services.return_value = [svc]
|
||||
|
||||
new_main = EveCharacter.objects.create(
|
||||
character_id=123,
|
||||
character_name="Alter Ego",
|
||||
corporation_id=987,
|
||||
corporation_name="ABC"
|
||||
)
|
||||
self.member.profile.main_character = new_main
|
||||
self.member.profile.save()
|
||||
|
||||
# Assert
|
||||
self.assertTrue(services_hook.get_services.called)
|
||||
|
||||
self.assertTrue(svc.validate_user.called)
|
||||
args, kwargs = svc.validate_user.call_args
|
||||
self.assertEqual(self.member, args[0])
|
||||
|
||||
self.assertTrue(svc.sync_nickname.called)
|
||||
args, kwargs = svc.sync_nickname.call_args
|
||||
self.assertEqual(self.member, args[0])
|
||||
|
||||
@mock.patch('allianceauth.services.signals.ServicesHook')
|
||||
def test_state_changed_services_validation_and_groups_update_2(self, services_hook):
|
||||
"""Test a user changing main has service does not have accounts validated
|
||||
and sync updated if the new main is equal to the old main
|
||||
"""
|
||||
svc = mock.Mock()
|
||||
svc.validate_user.return_value = None
|
||||
svc.sync_nickname.return_value = None
|
||||
svc.access_perm = 'auth.access_testsvc'
|
||||
|
||||
services_hook.get_services.return_value = [svc]
|
||||
|
||||
# this creates a clone of the Django object
|
||||
new_main = deepcopy(self.member.profile.main_character)
|
||||
self.assertIsNot(new_main, self.member.profile.main_character)
|
||||
self.member.profile.main_character = new_main
|
||||
self.member.profile.save()
|
||||
|
||||
# Assert
|
||||
self.assertFalse(services_hook.get_services.called)
|
||||
self.assertFalse(svc.validate_user.called)
|
||||
self.assertFalse(svc.sync_nickname.called)
|
Loading…
x
Reference in New Issue
Block a user