Merge branch 'issue_1250' into 'master'

Fix Discord service issues and improve dashboard

Closes #1250

See merge request allianceauth/allianceauth!1229
This commit is contained in:
colcrunch 2020-07-13 18:54:08 +00:00
commit bb6a7e8327
23 changed files with 781 additions and 344 deletions

View File

@ -1,7 +1,7 @@
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
__version__ = '2.7.2'
__version__ = '2.7.3a2'
__title__ = 'Alliance Auth'
__url__ = 'https://gitlab.com/allianceauth/allianceauth'
NAME = '%s v%s' % (__title__, __version__)

View File

@ -1,7 +1,8 @@
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import Permission
from django.contrib.auth.models import User
import logging
from django.contrib.auth.backends import ModelBackend
from django.contrib.auth.models import User, Permission
from .models import UserProfile, CharacterOwnership, OwnershipRecord
@ -11,9 +12,11 @@ logger = logging.getLogger(__name__)
class StateBackend(ModelBackend):
@staticmethod
def _get_state_permissions(user_obj):
profile_state_field = UserProfile._meta.get_field('state')
user_state_query = 'state__%s__user' % profile_state_field.related_query_name()
return Permission.objects.filter(**{user_state_query: user_obj})
"""returns permissions for state of given user object"""
if hasattr(user_obj, "profile") and user_obj.profile:
return Permission.objects.filter(state=user_obj.profile.state)
else:
return Permission.objects.none()
def get_state_permissions(self, user_obj, obj=None):
return self._get_permissions(user_obj, obj, 'state')

View File

@ -73,11 +73,17 @@ class UserProfile(models.Model):
if commit:
logger.info('Updating {} state to {}'.format(self.user, self.state))
self.save(update_fields=['state'])
notify(self.user, _('State Changed'),
_('Your user state has been changed to %(state)s') % ({'state': state}),
'info')
notify(
self.user,
_('State changed to: %s' % state),
_('Your user\'s state is now: %(state)s')
% ({'state': state}),
'info'
)
from allianceauth.authentication.signals import state_changed
state_changed.send(sender=self.__class__, user=self.user, state=self.state)
state_changed.send(
sender=self.__class__, user=self.user, state=self.state
)
def __str__(self):
return str(self.user)

View File

@ -23,9 +23,7 @@ def trigger_state_check(state):
check_states = State.objects.filter(priority__lt=state.priority)
for profile in UserProfile.objects.filter(state__in=check_states):
if state.available_to_user(profile.user):
profile.state = state
profile.save(update_fields=['state'])
state_changed.send(sender=state.__class__, user=profile.user, state=state)
profile.assign_state(state)
@receiver(m2m_changed, sender=State.member_characters.through)

View File

@ -14,7 +14,11 @@
<div class="col-sm-6 text-center">
<div class="panel panel-primary" style="height:100%">
<div class="panel-heading">
<h3 class="panel-title">{% trans "Main Character" %}</h3>
<h3 class="panel-title">
{% blocktrans with state=request.user.profile.state %}
Main Character (State: {{ state }})
{% endblocktrans %}
</h3>
</div>
<div class="panel-body">
{% if request.user.profile.main_character %}

View File

@ -0,0 +1,149 @@
from django.contrib.auth.models import User, Group
from django.test import TestCase
from allianceauth.eveonline.models import EveCharacter
from allianceauth.tests.auth_utils import AuthUtils
from esi.models import Token
from ..backends import StateBackend
from ..models import CharacterOwnership, UserProfile, OwnershipRecord
MODULE_PATH = 'allianceauth.authentication'
PERMISSION_1 = "authentication.add_user"
PERMISSION_2 = "authentication.change_user"
class TestStatePermissions(TestCase):
def setUp(self):
# permissions
self.permission_1 = AuthUtils.get_permission_by_name(PERMISSION_1)
self.permission_2 = AuthUtils.get_permission_by_name(PERMISSION_2)
# group
self.group_1 = Group.objects.create(name="Group 1")
self.group_2 = Group.objects.create(name="Group 2")
# state
self.state_1 = AuthUtils.get_member_state()
self.state_2 = AuthUtils.create_state("Other State", 75)
# user
self.user = AuthUtils.create_user("Bruce Wayne")
self.main = AuthUtils.add_main_character_2(self.user, self.user.username, 123)
def test_user_has_user_permissions(self):
self.user.user_permissions.add(self.permission_1)
user = User.objects.get(pk=self.user.pk)
self.assertTrue(user.has_perm(PERMISSION_1))
def test_user_has_group_permissions(self):
self.group_1.permissions.add(self.permission_1)
self.user.groups.add(self.group_1)
user = User.objects.get(pk=self.user.pk)
self.assertTrue(user.has_perm(PERMISSION_1))
def test_user_has_state_permissions(self):
self.state_1.permissions.add(self.permission_1)
self.state_1.member_characters.add(self.main)
user = User.objects.get(pk=self.user.pk)
self.assertTrue(user.has_perm(PERMISSION_1))
def test_when_user_changes_state_perms_change_accordingly(self):
self.state_1.permissions.add(self.permission_1)
self.state_1.member_characters.add(self.main)
user = User.objects.get(pk=self.user.pk)
self.assertTrue(user.has_perm(PERMISSION_1))
self.state_2.permissions.add(self.permission_2)
self.state_2.member_characters.add(self.main)
self.state_1.member_characters.remove(self.main)
user = User.objects.get(pk=self.user.pk)
self.assertFalse(user.has_perm(PERMISSION_1))
self.assertTrue(user.has_perm(PERMISSION_2))
def test_state_permissions_are_returned_for_current_user_object(self):
# verify state permissions are returns for the current user object
# and not for it's instance in the database, which might be outdated
self.state_1.permissions.add(self.permission_1)
self.state_2.permissions.add(self.permission_2)
self.state_1.member_characters.add(self.main)
user = User.objects.get(pk=self.user.pk)
user.profile.state = self.state_2
self.assertFalse(user.has_perm(PERMISSION_1))
self.assertTrue(user.has_perm(PERMISSION_2))
class TestAuthenticate(TestCase):
@classmethod
def setUpTestData(cls):
cls.main_character = EveCharacter.objects.create(
character_id=1,
character_name='Main Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
cls.alt_character = EveCharacter.objects.create(
character_id=2,
character_name='Alt Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
cls.unclaimed_character = EveCharacter.objects.create(
character_id=3,
character_name='Unclaimed Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
cls.user = AuthUtils.create_user('test_user', disconnect_signals=True)
cls.old_user = AuthUtils.create_user('old_user', disconnect_signals=True)
AuthUtils.disconnect_signals()
CharacterOwnership.objects.create(user=cls.user, character=cls.main_character, owner_hash='1')
CharacterOwnership.objects.create(user=cls.user, character=cls.alt_character, owner_hash='2')
UserProfile.objects.update_or_create(user=cls.user, defaults={'main_character': cls.main_character})
AuthUtils.connect_signals()
def test_authenticate_main_character(self):
t = Token(character_id=self.main_character.character_id, character_owner_hash='1')
user = StateBackend().authenticate(token=t)
self.assertEquals(user, self.user)
def test_authenticate_alt_character(self):
t = Token(character_id=self.alt_character.character_id, character_owner_hash='2')
user = StateBackend().authenticate(token=t)
self.assertEquals(user, self.user)
def test_authenticate_unclaimed_character(self):
t = Token(character_id=self.unclaimed_character.character_id, character_name=self.unclaimed_character.character_name, character_owner_hash='3')
user = StateBackend().authenticate(token=t)
self.assertNotEqual(user, self.user)
self.assertEqual(user.username, 'Unclaimed_Character')
self.assertEqual(user.profile.main_character, self.unclaimed_character)
def test_authenticate_character_record(self):
t = Token(character_id=self.unclaimed_character.character_id, character_name=self.unclaimed_character.character_name, character_owner_hash='4')
OwnershipRecord.objects.create(user=self.old_user, character=self.unclaimed_character, owner_hash='4')
user = StateBackend().authenticate(token=t)
self.assertEqual(user, self.old_user)
self.assertTrue(CharacterOwnership.objects.filter(owner_hash='4', user=self.old_user).exists())
self.assertTrue(user.profile.main_character)
def test_iterate_username(self):
t = Token(character_id=self.unclaimed_character.character_id,
character_name=self.unclaimed_character.character_name, character_owner_hash='3')
username = StateBackend().authenticate(token=t).username
t.character_owner_hash = '4'
username_1 = StateBackend().authenticate(token=t).username
t.character_owner_hash = '5'
username_2 = StateBackend().authenticate(token=t).username
self.assertNotEqual(username, username_1, username_2)
self.assertTrue(username_1.endswith('_1'))
self.assertTrue(username_2.endswith('_2'))

View File

@ -0,0 +1,35 @@
from io import StringIO
from django.core.management import call_command
from django.test import TestCase
from allianceauth.tests.auth_utils import AuthUtils
from ..models import CharacterOwnership, UserProfile
class ManagementCommandTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.user = AuthUtils.create_user('test user', disconnect_signals=True)
AuthUtils.add_main_character(cls.user, 'test character', '1', '2', 'test corp', 'test')
character = UserProfile.objects.get(user=cls.user).main_character
CharacterOwnership.objects.create(user=cls.user, character=character, owner_hash='test')
def setUp(self):
self.stdout = StringIO()
def test_ownership(self):
call_command('checkmains', stdout=self.stdout)
self.assertFalse(UserProfile.objects.filter(main_character__isnull=True).count())
self.assertNotIn(self.user.username, self.stdout.getvalue())
self.assertIn('All main characters', self.stdout.getvalue())
def test_no_ownership(self):
user = AuthUtils.create_user('v1 user', disconnect_signals=True)
AuthUtils.add_main_character(user, 'v1 character', '10', '20', 'test corp', 'test')
self.assertFalse(UserProfile.objects.filter(main_character__isnull=True).count())
call_command('checkmains', stdout=self.stdout)
self.assertEqual(UserProfile.objects.filter(main_character__isnull=True).count(), 1)
self.assertIn(user.username, self.stdout.getvalue())

View File

@ -0,0 +1,68 @@
from unittest import mock
from urllib import parse
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.http.response import HttpResponse
from django.shortcuts import reverse
from django.test import TestCase
from django.test.client import RequestFactory
from allianceauth.eveonline.models import EveCharacter
from allianceauth.tests.auth_utils import AuthUtils
from ..decorators import main_character_required
from ..models import CharacterOwnership
MODULE_PATH = 'allianceauth.authentication'
class DecoratorTestCase(TestCase):
@staticmethod
@main_character_required
def dummy_view(*args, **kwargs):
return HttpResponse(status=200)
@classmethod
def setUpTestData(cls):
cls.main_user = AuthUtils.create_user('main_user', disconnect_signals=True)
cls.no_main_user = AuthUtils.create_user(
'no_main_user', disconnect_signals=True
)
main_character = EveCharacter.objects.create(
character_id=1,
character_name='Main Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
CharacterOwnership.objects.create(
user=cls.main_user, character=main_character, owner_hash='1'
)
cls.main_user.profile.main_character = main_character
def setUp(self):
self.request = RequestFactory().get('/test/')
@mock.patch(MODULE_PATH + '.decorators.messages')
def test_login_redirect(self, m):
setattr(self.request, 'user', AnonymousUser())
response = self.dummy_view(self.request)
self.assertEqual(response.status_code, 302)
url = getattr(response, 'url', None)
self.assertEqual(parse.urlparse(url).path, reverse(settings.LOGIN_URL))
@mock.patch(MODULE_PATH + '.decorators.messages')
def test_main_character_redirect(self, m):
setattr(self.request, 'user', self.no_main_user)
response = self.dummy_view(self.request)
self.assertEqual(response.status_code, 302)
url = getattr(response, 'url', None)
self.assertEqual(url, reverse('authentication:dashboard'))
@mock.patch(MODULE_PATH + '.decorators.messages')
def test_successful_request(self, m):
setattr(self.request, 'user', self.main_user)
response = self.dummy_view(self.request)
self.assertEqual(response.status_code, 200)

View File

@ -1,147 +1,20 @@
from unittest import mock
from io import StringIO
from urllib import parse
from django.conf import settings
from django.contrib.auth.models import AnonymousUser, User
from django.core.management import call_command
from django.http.response import HttpResponse
from django.shortcuts import reverse
from django.contrib.auth.models import User
from django.test import TestCase
from django.test.client import RequestFactory
from allianceauth.authentication.decorators import main_character_required
from allianceauth.eveonline.models import EveCharacter, EveCorporationInfo,\
EveAllianceInfo
from allianceauth.tests.auth_utils import AuthUtils
from esi.errors import IncompleteResponseError
from esi.models import Token
from ..backends import StateBackend
from ..models import CharacterOwnership, UserProfile, State, get_guest_state,\
OwnershipRecord
from ..models import CharacterOwnership, State, get_guest_state
from ..tasks import check_character_ownership
MODULE_PATH = 'allianceauth.authentication'
class DecoratorTestCase(TestCase):
@staticmethod
@main_character_required
def dummy_view(*args, **kwargs):
return HttpResponse(status=200)
@classmethod
def setUpTestData(cls):
cls.main_user = AuthUtils.create_user('main_user', disconnect_signals=True)
cls.no_main_user = AuthUtils.create_user('no_main_user', disconnect_signals=True)
main_character = EveCharacter.objects.create(
character_id=1,
character_name='Main Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
CharacterOwnership.objects.create(user=cls.main_user, character=main_character, owner_hash='1')
cls.main_user.profile.main_character = main_character
def setUp(self):
self.request = RequestFactory().get('/test/')
@mock.patch(MODULE_PATH + '.decorators.messages')
def test_login_redirect(self, m):
setattr(self.request, 'user', AnonymousUser())
response = self.dummy_view(self.request)
self.assertEqual(response.status_code, 302)
url = getattr(response, 'url', None)
self.assertEqual(parse.urlparse(url).path, reverse(settings.LOGIN_URL))
@mock.patch(MODULE_PATH + '.decorators.messages')
def test_main_character_redirect(self, m):
setattr(self.request, 'user', self.no_main_user)
response = self.dummy_view(self.request)
self.assertEqual(response.status_code, 302)
url = getattr(response, 'url', None)
self.assertEqual(url, reverse('authentication:dashboard'))
@mock.patch(MODULE_PATH + '.decorators.messages')
def test_successful_request(self, m):
setattr(self.request, 'user', self.main_user)
response = self.dummy_view(self.request)
self.assertEqual(response.status_code, 200)
class BackendTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.main_character = EveCharacter.objects.create(
character_id=1,
character_name='Main Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
cls.alt_character = EveCharacter.objects.create(
character_id=2,
character_name='Alt Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
cls.unclaimed_character = EveCharacter.objects.create(
character_id=3,
character_name='Unclaimed Character',
corporation_id=1,
corporation_name='Corp',
corporation_ticker='CORP',
)
cls.user = AuthUtils.create_user('test_user', disconnect_signals=True)
cls.old_user = AuthUtils.create_user('old_user', disconnect_signals=True)
AuthUtils.disconnect_signals()
CharacterOwnership.objects.create(user=cls.user, character=cls.main_character, owner_hash='1')
CharacterOwnership.objects.create(user=cls.user, character=cls.alt_character, owner_hash='2')
UserProfile.objects.update_or_create(user=cls.user, defaults={'main_character': cls.main_character})
AuthUtils.connect_signals()
def test_authenticate_main_character(self):
t = Token(character_id=self.main_character.character_id, character_owner_hash='1')
user = StateBackend().authenticate(token=t)
self.assertEquals(user, self.user)
def test_authenticate_alt_character(self):
t = Token(character_id=self.alt_character.character_id, character_owner_hash='2')
user = StateBackend().authenticate(token=t)
self.assertEquals(user, self.user)
def test_authenticate_unclaimed_character(self):
t = Token(character_id=self.unclaimed_character.character_id, character_name=self.unclaimed_character.character_name, character_owner_hash='3')
user = StateBackend().authenticate(token=t)
self.assertNotEqual(user, self.user)
self.assertEqual(user.username, 'Unclaimed_Character')
self.assertEqual(user.profile.main_character, self.unclaimed_character)
def test_authenticate_character_record(self):
t = Token(character_id=self.unclaimed_character.character_id, character_name=self.unclaimed_character.character_name, character_owner_hash='4')
record = OwnershipRecord.objects.create(user=self.old_user, character=self.unclaimed_character, owner_hash='4')
user = StateBackend().authenticate(token=t)
self.assertEqual(user, self.old_user)
self.assertTrue(CharacterOwnership.objects.filter(owner_hash='4', user=self.old_user).exists())
self.assertTrue(user.profile.main_character)
def test_iterate_username(self):
t = Token(character_id=self.unclaimed_character.character_id,
character_name=self.unclaimed_character.character_name, character_owner_hash='3')
username = StateBackend().authenticate(token=t).username
t.character_owner_hash = '4'
username_1 = StateBackend().authenticate(token=t).username
t.character_owner_hash = '5'
username_2 = StateBackend().authenticate(token=t).username
self.assertNotEqual(username, username_1, username_2)
self.assertTrue(username_1.endswith('_1'))
self.assertTrue(username_2.endswith('_2'))
class CharacterOwnershipTestCase(TestCase):
@classmethod
def setUpTestData(cls):
@ -378,30 +251,3 @@ class CharacterOwnershipCheckTestCase(TestCase):
filter.return_value.exists.return_value = False
check_character_ownership(self.ownership)
self.assertTrue(filter.return_value.delete.called)
class ManagementCommandTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.user = AuthUtils.create_user('test user', disconnect_signals=True)
AuthUtils.add_main_character(cls.user, 'test character', '1', '2', 'test corp', 'test')
character = UserProfile.objects.get(user=cls.user).main_character
CharacterOwnership.objects.create(user=cls.user, character=character, owner_hash='test')
def setUp(self):
self.stdout = StringIO()
def test_ownership(self):
call_command('checkmains', stdout=self.stdout)
self.assertFalse(UserProfile.objects.filter(main_character__isnull=True).count())
self.assertNotIn(self.user.username, self.stdout.getvalue())
self.assertIn('All main characters', self.stdout.getvalue())
def test_no_ownership(self):
user = AuthUtils.create_user('v1 user', disconnect_signals=True)
AuthUtils.add_main_character(user, 'v1 character', '10', '20', 'test corp', 'test')
self.assertFalse(UserProfile.objects.filter(main_character__isnull=True).count())
call_command('checkmains', stdout=self.stdout)
self.assertEqual(UserProfile.objects.filter(main_character__isnull=True).count(), 1)
self.assertIn(user.username, self.stdout.getvalue())

View File

@ -33,7 +33,8 @@ class DiscordService(ServicesHook):
if self.user_has_account(user):
logger.debug('Deleting user %s %s account', user, self.name)
tasks.delete_user.apply_async(
kwargs={'user_pk': user.pk}, priority=SINGLE_TASK_PRIORITY
kwargs={'user_pk': user.pk, 'notify_user': notify_user},
priority=SINGLE_TASK_PRIORITY
)
def render_services_ctrl(self, request):
@ -60,13 +61,21 @@ class DiscordService(ServicesHook):
)
def service_active_for_user(self, user):
return user.has_perm(self.access_perm)
has_perms = user.has_perm(self.access_perm)
logger.debug("User %s has service permission: %s", user, has_perms)
return has_perms
def sync_nickname(self, user):
logger.debug('Syncing %s nickname for user %s', self.name, user)
if self.user_has_account(user):
tasks.update_nickname.apply_async(
kwargs={'user_pk': user.pk}, priority=SINGLE_TASK_PRIORITY
kwargs={
'user_pk': user.pk,
# since the new nickname is not yet in the DB we need to
# provide it manually to the task
'nickname': DiscordUser.objects.user_formatted_nick(user)
},
priority=SINGLE_TASK_PRIORITY
)
def sync_nicknames_bulk(self, users: list):
@ -87,7 +96,13 @@ class DiscordService(ServicesHook):
logger.debug('Processing %s groups for %s', self.name, user)
if self.user_has_account(user):
tasks.update_groups.apply_async(
kwargs={'user_pk': user.pk}, priority=SINGLE_TASK_PRIORITY
kwargs={
'user_pk': user.pk,
# since state changes may not yet be in the DB we need to
# provide the new state name manually to the task
'state_name': user.profile.state.name
},
priority=SINGLE_TASK_PRIORITY
)
def update_groups_bulk(self, users: list):

View File

@ -6,9 +6,14 @@ DISCORD_API_BASE_URL = clean_setting(
'DISCORD_API_BASE_URL', 'https://discordapp.com/api/'
)
# Low level timeout for requests to the Discord API in ms
DISCORD_API_TIMEOUT = clean_setting(
'DISCORD_API_TIMEOUT', 5000
# Low level connecttimeout for requests to the Discord API in seconds
DISCORD_API_TIMEOUT_CONNECT = clean_setting(
'DISCORD_API_TIMEOUT', 5
)
# Low level read timeout for requests to the Discord API in seconds
DISCORD_API_TIMEOUT_READ = clean_setting(
'DISCORD_API_TIMEOUT', 30
)
# Base authorization URL for Discord Oauth

View File

@ -15,7 +15,8 @@ from allianceauth import __title__ as AUTH_TITLE, __url__, __version__
from .. import __title__
from .app_settings import (
DISCORD_API_BASE_URL,
DISCORD_API_TIMEOUT,
DISCORD_API_TIMEOUT_CONNECT,
DISCORD_API_TIMEOUT_READ,
DISCORD_DISABLE_ROLE_CREATION,
DISCORD_GUILD_NAME_CACHE_MAX_AGE,
DISCORD_OAUTH_BASE_URL,
@ -540,7 +541,7 @@ class DiscordClient:
args = {
'url': url,
'headers': headers,
'timeout': DISCORD_API_TIMEOUT / 1000
'timeout': (DISCORD_API_TIMEOUT_CONNECT, DISCORD_API_TIMEOUT_READ)
}
if data:
args['json'] = data

View File

@ -127,9 +127,17 @@ class DiscordUserManager(models.Manager):
return None
@staticmethod
def user_group_names(user: User) -> list:
def user_group_names(user: User, state_name: str = None) -> list:
"""returns list of group names plus state the given user is a member of"""
return [group.name for group in user.groups.all()] + [user.profile.state.name]
if not state_name:
state_name = user.profile.state.name
group_names = (
[group.name for group in user.groups.all()] + [state_name]
)
logger.debug(
"Group names for roles updates of user %s are: %s", user, group_names
)
return group_names
def user_has_account(self, user: User) -> bool:
"""Returns True if the user has an Discord account, else False
@ -171,8 +179,15 @@ class DiscordUserManager(models.Manager):
@classmethod
def server_name(cls):
"""returns the name of the Discord server"""
return cls._bot_client().guild_name(DISCORD_GUILD_ID)
"""returns the name of the current Discord server
or an empty string if the name could not be retrieved
"""
try:
server_name = cls._bot_client().guild_name(DISCORD_GUILD_ID)
except HTTPError:
server_name = ""
return server_name
@staticmethod
def _bot_client(is_rate_limited: bool = True):

View File

@ -67,21 +67,25 @@ class DiscordUser(models.Model):
def __repr__(self):
return f'{type(self).__name__}(user=\'{self.user}\', uid={self.uid})'
def update_nickname(self) -> bool:
def update_nickname(self, nickname: str = None) -> bool:
"""Update nickname with formatted name of main character
Params:
- nickname: optional nickname to be used instead of user's main
Returns:
- True on success
- None if user is no longer a member of the Discord server
- False on error or raises exception
"""
requested_nick = DiscordUser.objects.user_formatted_nick(self.user)
if requested_nick:
if not nickname:
nickname = DiscordUser.objects.user_formatted_nick(self.user)
if nickname:
client = DiscordUser.objects._bot_client()
success = client.modify_guild_member(
guild_id=DISCORD_GUILD_ID,
user_id=self.uid,
nick=requested_nick
nick=nickname
)
if success:
logger.info('Nickname for %s has been updated', self.user)
@ -92,10 +96,13 @@ class DiscordUser(models.Model):
else:
return False
def update_groups(self) -> bool:
def update_groups(self, state_name: str = None) -> bool:
"""update groups for a user based on his current group memberships.
Will add or remove roles of a user as needed.
Params:
- state_name: optional state name to be used
Returns:
- True on success
- None if user is no longer a member of the Discord server
@ -128,7 +135,9 @@ class DiscordUser(models.Model):
requested_roles = match_or_create_roles_from_names(
client=client,
guild_id=DISCORD_GUILD_ID,
role_names=DiscordUser.objects.user_group_names(self.user)
role_names=DiscordUser.objects.user_group_names(
user=self.user, state_name=state_name
)
)
logger.debug(
'Requested roles for user %s: %s', self.user, requested_roles.ids()
@ -144,13 +153,13 @@ class DiscordUser(models.Model):
role_ids=list(new_roles.ids())
)
if success:
logger.info('Groups for %s have been updated', self.user)
logger.info('Roles for %s have been updated', self.user)
else:
logger.warning('Failed to update groups for %s', self.user)
logger.warning('Failed to update roles for %s', self.user)
return success
else:
logger.info('No need to update groups for user %s', self.user)
logger.info('No need to update roles for user %s', self.user)
return True
def update_username(self) -> bool:

View File

@ -26,25 +26,27 @@ BULK_TASK_PRIORITY = 6
@shared_task(
bind=True, name='discord.update_groups', base=QueueOnce, max_retries=None
)
def update_groups(self, user_pk: int) -> None:
def update_groups(self, user_pk: int, state_name: str = None) -> None:
"""Update roles on Discord for given user according to his current groups
Params:
- user_pk: PK of given user
- state_name: optional state name to be used
"""
_task_perform_user_action(self, user_pk, 'update_groups')
_task_perform_user_action(self, user_pk, 'update_groups', state_name=state_name)
@shared_task(
bind=True, name='discord.update_nickname', base=QueueOnce, max_retries=None
)
def update_nickname(self, user_pk: int) -> None:
def update_nickname(self, user_pk: int, nickname: str = None) -> None:
"""Set nickname on Discord for given user to his main character name
Params:
- user_pk: PK of given user
- nickname: optional nickname to be used instead of user's main
"""
_task_perform_user_action(self, user_pk, 'update_nickname')
_task_perform_user_action(self, user_pk, 'update_nickname', nickname=nickname)
@shared_task(
@ -75,6 +77,7 @@ def _task_perform_user_action(self, user_pk: int, method: str, **kwargs) -> None
"""perform a user related action incl. managing all exceptions"""
logger.debug("Starting %s for user with pk %s", method, user_pk)
user = User.objects.get(pk=user_pk)
# logger.debug("user %s has state %s", user, user.profile.state)
if DiscordUser.objects.user_has_account(user):
logger.info("Running %s for user %s", method, user)
try:

View File

@ -28,7 +28,7 @@
{% endif %}
{% if request.user.is_superuser %}
<div class="text-center" style="padding-top:5px;">
<a type="button" class="btn btn-default" href="{% url 'discord:add_bot' %}">
<a type="button" id="btnLinkDiscordServer" class="btn btn-default" href="{% url 'discord:add_bot' %}">
{% trans "Link Discord Server" %}
</a>
</div>

View File

@ -3,6 +3,7 @@ from unittest.mock import patch
from django.test import TestCase, RequestFactory
from django.test.utils import override_settings
from allianceauth.notifications.models import Notification
from allianceauth.tests.auth_utils import AuthUtils
from . import TEST_USER_NAME, TEST_USER_ID, add_permissions_to_members, MODULE_PATH
@ -30,6 +31,7 @@ class TestDiscordService(TestCase):
self.service = DiscordService
add_permissions_to_members()
self.factory = RequestFactory()
Notification.objects.all().delete()
def test_service_enabled(self):
service = self.service()
@ -95,10 +97,11 @@ class TestDiscordService(TestCase):
mock_DiscordClient.return_value.remove_guild_member.return_value = True
service = self.service()
service.delete_user(self.member)
service.delete_user(self.member, notify_user=True)
self.assertTrue(mock_DiscordClient.return_value.remove_guild_member.called)
self.assertFalse(DiscordUser.objects.filter(user=self.member).exists())
self.assertTrue(Notification.objects.filter(user=self.member).exists())
@patch(MODULE_PATH + '.managers.DiscordClient', spec=DiscordClient)
def test_delete_user_is_not_member(self, mock_DiscordClient):

View File

@ -16,9 +16,12 @@ import requests_mock
from django.contrib.auth.models import Group, User
from django.core.cache import caches
from django.shortcuts import reverse
from django.test import TransactionTestCase
from django.test import TransactionTestCase, TestCase
from django.test.utils import override_settings
from allianceauth.authentication.models import State
from allianceauth.eveonline.models import EveCharacter
from allianceauth.notifications.models import Notification
from allianceauth.tests.auth_utils import AuthUtils
from . import (
@ -43,6 +46,7 @@ from ..models import DiscordUser
logger = logging.getLogger('allianceauth')
ROLE_MEMBER = create_role(99, 'Member')
ROLE_BLUE = create_role(98, 'Blue')
# Putting all requests to Discord into objects so we can compare them better
DiscordRequest = namedtuple('DiscordRequest', ['method', 'url'])
@ -87,6 +91,16 @@ def clear_cache():
logger.info('Cache flushed')
def reset_testdata():
AuthUtils.disconnect_signals()
Group.objects.all().delete()
User.objects.all().delete()
State.objects.all().delete()
EveCharacter.objects.all().delete()
AuthUtils.connect_signals()
Notification.objects.all().delete()
@patch(MODULE_PATH + '.models.DISCORD_GUILD_ID', TEST_GUILD_ID)
@override_settings(CELERY_ALWAYS_EAGER=True)
@requests_mock.Mocker()
@ -98,14 +112,24 @@ class TestServiceFeatures(TransactionTestCase):
cls.maxDiff = None
def setUp(self):
"""All tests: Given a user with member state,
service permission and active Discord account
"""
clear_cache()
AuthUtils.disconnect_signals()
Group.objects.all().delete()
User.objects.all().delete()
AuthUtils.connect_signals()
self.group_3 = Group.objects.create(name='charlie')
self.user = AuthUtils.create_member(TEST_USER_NAME)
AuthUtils.add_main_character_2(
reset_testdata()
self.group_charlie = Group.objects.create(name='charlie')
# States
self.member_state = AuthUtils.get_member_state()
self.guest_state = AuthUtils.get_guest_state()
self.blue_state = AuthUtils.create_state("Blue", 50)
permission = AuthUtils.get_permission_by_name('discord.access_discord')
self.member_state.permissions.add(permission)
self.blue_state.permissions.add(permission)
# Test user
self.user = AuthUtils.create_user(TEST_USER_NAME)
self.main = AuthUtils.add_main_character_2(
self.user,
TEST_MAIN_NAME,
TEST_MAIN_ID,
@ -114,11 +138,18 @@ class TestServiceFeatures(TransactionTestCase):
corp_ticker='TEST',
disconnect_signals=True
)
self.discord_user = DiscordUser.objects.create(user=self.user, uid=TEST_USER_ID)
add_permissions_to_members()
self.member_state.member_characters.add(self.main)
def test_name_of_main_changes(self, requests_mocker):
# modify_guild_member()
# verify user is a member and has an account
self.user = User.objects.get(pk=self.user.pk)
self.assertEqual(self.user.profile.state, self.member_state)
self.discord_user = DiscordUser.objects.create(user=self.user, uid=TEST_USER_ID)
self.assertTrue(DiscordUser.objects.user_has_account(self.user))
def test_when_name_of_main_changes_then_discord_nick_is_updated(
self, requests_mocker
):
requests_mocker.patch(modify_guild_member_request.url, status_code=204)
# changing nick to trigger signals
@ -126,22 +157,23 @@ class TestServiceFeatures(TransactionTestCase):
self.user.profile.main_character.character_name = new_nick
self.user.profile.main_character.save()
# Need to have called modify_guild_member two times only
# Once for sync nickname
# Once for change of main character
requests_made = list()
# verify Discord nick was updates
nick_updated = False
for r in requests_mocker.request_history:
requests_made.append(DiscordRequest(r.method, r.url))
my_request = DiscordRequest(r.method, r.url)
if my_request == modify_guild_member_request and "nick" in r.json():
nick_updated = True
self.assertEqual(r.json()["nick"], new_nick)
expected = [modify_guild_member_request, modify_guild_member_request]
self.assertListEqual(requests_made, expected)
self.assertTrue(nick_updated)
self.assertTrue(DiscordUser.objects.user_has_account(self.user))
def test_name_of_main_changes_but_user_deleted(self, requests_mocker):
# modify_guild_member()
def test_when_name_of_main_changes_and_user_deleted_then_account_is_deleted(
self, requests_mocker
):
requests_mocker.patch(
modify_guild_member_request.url, status_code=404, json={'code': 10007}
)
# remove_guild_member()
requests_mocker.delete(remove_guild_member_request.url, status_code=204)
# changing nick to trigger signals
@ -149,24 +181,11 @@ class TestServiceFeatures(TransactionTestCase):
self.user.profile.main_character.character_name = new_nick
self.user.profile.main_character.save()
# Need to have called modify_guild_member two times only
# Once for sync nickname
# Once for change of main character
requests_made = list()
for r in requests_mocker.request_history:
requests_made.append(DiscordRequest(r.method, r.url))
self.assertFalse(DiscordUser.objects.user_has_account(self.user))
expected = [
modify_guild_member_request,
remove_guild_member_request,
]
self.assertListEqual(requests_made, expected)
# self.assertFalse(DiscordUser.objects.user_has_account(self.user))
def test_name_of_main_changes_but_user_rate_limited(
def test_when_name_of_main_changes_and_and_rate_limited_then_dont_call_api(
self, requests_mocker
):
# modify_guild_member()
requests_mocker.patch(modify_guild_member_request.url, status_code=204)
# exhausting rate limit
@ -183,96 +202,230 @@ class TestServiceFeatures(TransactionTestCase):
self.user.profile.main_character.save()
# should not have called the API
requests_made = list()
for r in requests_mocker.request_history:
requests_made.append(DiscordRequest(r.method, r.url))
requests_made = [
DiscordRequest(r.method, r.url) for r in requests_mocker.request_history
]
expected = list()
self.assertListEqual(requests_made, expected)
self.assertListEqual(requests_made, list())
def test_user_demoted_to_guest(self, requests_mocker):
# remove_guild_member()
def test_when_member_is_demoted_to_guest_then_his_account_is_deleted(
self, requests_mocker
):
requests_mocker.patch(modify_guild_member_request.url, status_code=204)
requests_mocker.delete(remove_guild_member_request.url, status_code=204)
self.user.groups.clear()
requests_made = list()
# our user is a member and has an account
self.assertTrue(self.user.has_perm('discord.access_discord'))
# now we demote him to guest
self.member_state.member_characters.remove(self.main)
# verify user is now guest
self.user = User.objects.get(pk=self.user.pk)
self.assertEqual(self.user.profile.state, AuthUtils.get_guest_state())
# verify user has no longer access to Discord and no account
self.assertFalse(self.user.has_perm('discord.access_discord'))
self.assertFalse(DiscordUser.objects.user_has_account(self.user))
# verify account was actually deleted from Discord server
requests_made = [
DiscordRequest(r.method, r.url) for r in requests_mocker.request_history
]
self.assertIn(remove_guild_member_request, requests_made)
# verify user has been notified
self.assertTrue(Notification.objects.filter(user=self.user).exists())
def test_when_member_changes_to_blue_state_then_roles_are_updated_accordingly(
self, requests_mocker
):
# request mocks
requests_mocker.get(
guild_member_request.url,
json={'user': create_user_info(), 'roles': ['3', '13', '99']}
)
requests_mocker.get(
guild_roles_request.url,
json=[
ROLE_ALPHA, ROLE_BRAVO, ROLE_CHARLIE, ROLE_MIKE, ROLE_MEMBER, ROLE_BLUE
]
)
requests_mocker.post(create_guild_role_request.url, json=ROLE_CHARLIE)
requests_mocker.patch(modify_guild_member_request.url, status_code=204)
AuthUtils.disconnect_signals()
self.user.groups.add(self.group_charlie)
AuthUtils.connect_signals()
# demote user to blue state
self.blue_state.member_characters.add(self.main)
self.member_state.member_characters.remove(self.main)
# verify roles for user where updated
roles_updated = False
for r in requests_mocker.request_history:
requests_made.append(DiscordRequest(r.method, r.url))
my_request = DiscordRequest(r.method, r.url)
if my_request == modify_guild_member_request and "roles" in r.json():
roles_updated = True
self.assertSetEqual(set(r.json()["roles"]), {3, 13, 98})
break
# compare the list of made requests with expected
expected = [remove_guild_member_request]
self.assertListEqual(requests_made, expected)
self.assertTrue(roles_updated)
self.assertTrue(DiscordUser.objects.user_has_account(self.user))
def test_adding_group_to_user_role_exists(self, requests_mocker):
# guild_member()
def test_when_group_added_to_member_and_role_known_then_his_roles_are_updated(
self, requests_mocker
):
requests_mocker.get(
guild_member_request.url,
json={
'user': create_user_info(),
'roles': ['1', '13', '99']
'roles': ['13', '99']
}
)
# guild_roles()
requests_mocker.get(
guild_roles_request.url,
json=[ROLE_ALPHA, ROLE_BRAVO, ROLE_CHARLIE, ROLE_MIKE, ROLE_MEMBER]
)
# create_guild_role()
requests_mocker.post(create_guild_role_request.url, json=ROLE_CHARLIE)
# modify_guild_member()
requests_mocker.patch(modify_guild_member_request.url, status_code=204)
# adding new group to trigger signals
self.user.groups.add(self.group_3)
self.user.refresh_from_db()
self.user.groups.add(self.group_charlie)
# compare the list of made requests with expected
requests_made = list()
# verify roles for user where updated
roles_updated = False
for r in requests_mocker.request_history:
requests_made.append(DiscordRequest(r.method, r.url))
my_request = DiscordRequest(r.method, r.url)
if my_request == modify_guild_member_request and "roles" in r.json():
roles_updated = True
self.assertSetEqual(set(r.json()["roles"]), {3, 13, 99})
break
expected = [
guild_member_request,
guild_roles_request,
modify_guild_member_request
]
self.assertListEqual(requests_made, expected)
self.assertTrue(roles_updated)
self.assertTrue(DiscordUser.objects.user_has_account(self.user))
def test_adding_group_to_user_role_does_not_exist(self, requests_mocker):
# guild_member()
def test_when_group_added_to_member_and_role_unknown_then_his_roles_are_updated(
self, requests_mocker
):
requests_mocker.get(
guild_member_request.url,
json={
'user': {'id': str(TEST_USER_ID), 'username': TEST_MAIN_NAME},
'roles': ['1', '13', '99']
'roles': ['13', '99']
}
)
# guild_roles()
requests_mocker.get(
guild_roles_request.url,
json=[ROLE_ALPHA, ROLE_BRAVO, ROLE_MIKE, ROLE_MEMBER]
)
# create_guild_role()
requests_mocker.post(create_guild_role_request.url, json=ROLE_CHARLIE)
# modify_guild_member()
requests_mocker.patch(modify_guild_member_request.url, status_code=204)
# adding new group to trigger signals
self.user.groups.add(self.group_3)
self.user.groups.add(self.group_charlie)
self.user.refresh_from_db()
# compare the list of made requests with expected
requests_made = list()
# verify roles for user where updated
roles_updated = False
for r in requests_mocker.request_history:
requests_made.append(DiscordRequest(r.method, r.url))
my_request = DiscordRequest(r.method, r.url)
if my_request == modify_guild_member_request and "roles" in r.json():
roles_updated = True
self.assertSetEqual(set(r.json()["roles"]), {3, 13, 99})
break
expected = [
guild_member_request,
guild_roles_request,
create_guild_role_request,
modify_guild_member_request
]
self.assertListEqual(requests_made, expected)
self.assertTrue(roles_updated)
self.assertTrue(DiscordUser.objects.user_has_account(self.user))
@override_settings(CELERY_ALWAYS_EAGER=True)
@patch(MODULE_PATH + '.managers.DISCORD_GUILD_ID', TEST_GUILD_ID)
@patch(MODULE_PATH + '.models.DISCORD_GUILD_ID', TEST_GUILD_ID)
@requests_mock.Mocker()
class StateTestCase(TestCase):
def setUp(self):
clear_cache()
reset_testdata()
self.user = AuthUtils.create_user('test_user', disconnect_signals=True)
AuthUtils.add_main_character(
self.user,
'Perm Test Character', '99',
corp_id='100',
alliance_id='200',
corp_name='Perm Test Corp',
alliance_name='Perm Test Alliance'
)
self.test_character = EveCharacter.objects.get(character_id='99')
self.member_state = State.objects.create(
name='Test Member',
priority=150,
)
self.access_discord = AuthUtils.get_permission_by_name('discord.access_discord')
self.member_state.permissions.add(self.access_discord)
self.member_state.member_characters.add(self.test_character)
def _add_discord_user(self):
self.discord_user = DiscordUser.objects.create(
user=self.user, uid="12345678910"
)
def _refresh_user(self):
self.user = User.objects.get(pk=self.user.pk)
def test_perm_changes_to_higher_priority_state_creation(self, requests_mocker):
mock_url = DiscordRequest(
method='DELETE',
url=f'{DISCORD_API_BASE_URL}guilds/{TEST_GUILD_ID}/members/12345678910'
)
requests_mocker.delete(mock_url.url, status_code=204)
self._add_discord_user()
self._refresh_user()
higher_state = State.objects.create(
name='Higher State',
priority=200,
)
self.assertIsNotNone(self.user.discord)
higher_state.member_characters.add(self.test_character)
self._refresh_user()
self.assertEquals(higher_state, self.user.profile.state)
with self.assertRaises(DiscordUser.DoesNotExist):
self.user.discord
higher_state.member_characters.clear()
self._refresh_user()
self.assertEquals(self.member_state, self.user.profile.state)
with self.assertRaises(DiscordUser.DoesNotExist):
self.user.discord
def test_perm_changes_to_lower_priority_state_creation(self, requests_mocker):
mock_url = DiscordRequest(
method='DELETE',
url=f'{DISCORD_API_BASE_URL}guilds/{TEST_GUILD_ID}/members/12345678910'
)
requests_mocker.delete(mock_url.url, status_code=204)
self._add_discord_user()
self._refresh_user()
lower_state = State.objects.create(
name='Lower State',
priority=125,
)
self.assertIsNotNone(self.user.discord)
lower_state.member_characters.add(self.test_character)
self._refresh_user()
self.assertEquals(self.member_state, self.user.profile.state)
self.member_state.member_characters.clear()
self._refresh_user()
self.assertEquals(lower_state, self.user.profile.state)
with self.assertRaises(DiscordUser.DoesNotExist):
self.user.discord
self.member_state.member_characters.add(self.test_character)
self._refresh_user()
self.assertEquals(self.member_state, self.user.profile.state)
with self.assertRaises(DiscordUser.DoesNotExist):
self.user.discord
@patch(MODULE_PATH + '.managers.DISCORD_GUILD_ID', TEST_GUILD_ID)
@ -282,6 +435,7 @@ class TestUserFeatures(WebTest):
def setUp(self):
clear_cache()
reset_testdata()
self.member = AuthUtils.create_member(TEST_USER_NAME)
AuthUtils.add_main_character_2(
self.member,
@ -474,3 +628,24 @@ class TestUserFeatures(WebTest):
expected = [remove_guild_member_request, guild_infos_request]
self.assertListEqual(requests_made, expected)
@patch(MODULE_PATH + '.views.messages')
def test_user_add_new_server(self, requests_mocker, mock_messages):
# guild_infos()
mock_exception = HTTPError('can not get guild info from Discord API')
mock_exception.response = Mock()
mock_exception.response.status_code = 440
requests_mocker.get(guild_infos_request.url, exc=mock_exception)
# login
self.member.is_superuser = True
self.member.is_staff = True
self.member.save()
self.app.set_user(self.member)
# click deactivate on the service page
response = self.app.get(reverse('services:services'))
# check we got can see the page and the "link server" button
self.assertEqual(response.status_int, 200)
self.assertIsNotNone(response.html.find(id='btnLinkDiscordServer'))

View File

@ -361,3 +361,26 @@ class TestUserHasAccount(TestCase):
def test_return_false_if_not_called_with_user_object(self):
self.assertFalse(DiscordUser.objects.user_has_account('abc'))
@patch(MODULE_PATH + '.managers.DiscordClient', spec=DiscordClient)
class TestServerName(TestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.user = AuthUtils.create_user(TEST_USER_NAME)
def test_returns_name_when_api_returns_it(self, mock_DiscordClient):
server_name = "El Dorado"
mock_DiscordClient.return_value.guild_name.return_value = server_name
self.assertEqual(DiscordUser.objects.server_name(), server_name)
def test_returns_empty_string_when_api_throws_http_error(self, mock_DiscordClient):
mock_exception = HTTPError('Test exception')
mock_exception.response = Mock(**{"status_code": 440})
mock_DiscordClient.return_value.guild_name.side_effect = mock_exception
self.assertEqual(DiscordUser.objects.server_name(), "")

View File

@ -25,10 +25,10 @@ class ExampleService(ServicesHook):
:return:
"""
urls = self.Urls()
urls.auth_activate = 'auth_example_activate'
urls.auth_deactivate = 'auth_example_deactivate'
urls.auth_reset_password = 'auth_example_reset_password'
urls.auth_set_password = 'auth_example_set_password'
# urls.auth_activate = 'auth_example_activate'
# urls.auth_deactivate = 'auth_example_deactivate'
# urls.auth_reset_password = 'auth_example_reset_password'
# urls.auth_set_password = 'auth_example_set_password'
return render_to_string(self.service_ctrl_template, {
'service_name': self.title,
'urls': urls,

View File

@ -123,17 +123,12 @@ def m2m_changed_state_permissions(sender, instance, action, pk_set, *args, **kwa
logger.debug("Permission change for state {} was not service permission, ignoring".format(instance))
@receiver(state_changed, sender=UserProfile)
@receiver(state_changed)
def check_service_accounts_state_changed(sender, user, state, **kwargs):
logger.debug("Received state_changed from %s to state %s" % (user, state))
service_perms = [svc.access_perm for svc in ServicesHook.get_services()]
state_perms = ["{}.{}".format(perm.natural_key()[1], perm.natural_key()[0]) for perm in state.permissions.all()]
for perm in service_perms:
if perm not in state_perms:
for svc in ServicesHook.get_services():
if svc.access_perm == perm:
logger.debug("User %s new state %s does not have service %s permission. Checking account." % (user, state, svc))
svc.validate_user(user)
svc.update_groups(user)
@receiver(pre_delete, sender=User)
@ -160,23 +155,36 @@ def disable_services_on_inactive(sender, instance, *args, **kwargs):
@receiver(pre_save, sender=UserProfile)
def process_main_character_change(sender, instance, *args, **kwargs):
if not instance.pk: # ignore
# new model being created
if not instance.pk:
# ignore new model being created
return
try:
logger.debug(
"Received pre_save from %s for process_main_character_change", instance
)
old_instance = UserProfile.objects.get(pk=instance.pk)
if old_instance.main_character and not instance.main_character: # lost main char disable services
logger.info("Disabling services due to loss of main character for user {0}".format(instance.user))
if old_instance.main_character and not instance.main_character:
logger.info(
"Disabling services due to loss of main character for user %s",
instance.user
)
disable_user(instance.user)
elif old_instance.main_character is not instance.main_character: # swapping/changing main character
logger.info("Updating Names due to change of main character for user {0}".format(instance.user))
elif old_instance.main_character != instance.main_character:
logger.info(
"Updating Names due to change of main character for user %s",
instance.user
)
for svc in ServicesHook.get_services():
try:
svc.validate_user(instance.user)
svc.sync_nickname(instance.user)
except:
logger.exception('Exception running sync_nickname for services module %s on user %s' % (svc, instance))
logger.exception(
"Exception running sync_nickname for services module %s "
"on user %s",
svc,
instance
)
except UserProfile.DoesNotExist:
pass
@ -186,6 +194,10 @@ def process_main_character_change(sender, instance, *args, **kwargs):
def process_main_character_update(sender, instance, *args, **kwargs):
try:
if instance.userprofile:
logger.debug(
"Received pre_save from %s for process_main_character_update",
instance
)
old_instance = EveCharacter.objects.get(pk=instance.pk)
if not instance.character_name == old_instance.character_name or \
not instance.corporation_name == old_instance.corporation_name or \

View File

@ -1,15 +1,20 @@
from copy import deepcopy
from unittest import mock
from django.test import TestCase
from django.contrib.auth.models import Group, Permission
from allianceauth.tests.auth_utils import AuthUtils
from allianceauth.authentication.models import State
from allianceauth.eveonline.models import EveCharacter
from allianceauth.tests.auth_utils import AuthUtils
class ServicesSignalsTestCase(TestCase):
def setUp(self):
self.member = AuthUtils.create_user('auth_member', disconnect_signals=True)
AuthUtils.add_main_character(self.member, 'Test', '1', '2', 'Test Corp', 'TEST')
AuthUtils.add_main_character_2(
self.member, 'Test', 1, 2, 'Test Corp', 'TEST'
)
self.none_user = AuthUtils.create_user('none_user', disconnect_signals=True)
@mock.patch('allianceauth.services.signals.transaction')
@ -46,7 +51,6 @@ class ServicesSignalsTestCase(TestCase):
@mock.patch('allianceauth.services.signals.disable_user')
def test_pre_delete_user(self, disable_user):
"""
Test that disable_member is called when a user is deleted
"""
@ -126,7 +130,9 @@ class ServicesSignalsTestCase(TestCase):
transaction.on_commit = lambda fn: fn()
ct = ContentType.objects.get(app_label='auth', model='permission')
perm = Permission.objects.create(name="Test perm", codename="access_testsvc", content_type=ct)
perm = Permission.objects.create(
name="Test perm", codename="access_testsvc", content_type=ct
)
self.member.user_permissions.add(perm)
# Act, should trigger m2m change
@ -159,7 +165,9 @@ class ServicesSignalsTestCase(TestCase):
AuthUtils.connect_signals()
ct = ContentType.objects.get(app_label='auth', model='permission')
perm = Permission.objects.create(name="Test perm", codename="access_testsvc", content_type=ct)
perm = Permission.objects.create(
name="Test perm", codename="access_testsvc", content_type=ct
)
test_state.permissions.add(perm)
# Act, should trigger m2m change
@ -173,12 +181,12 @@ class ServicesSignalsTestCase(TestCase):
self.assertEqual(self.member, args[0])
@mock.patch('allianceauth.services.signals.ServicesHook')
def test_state_changed_services_valudation(self, services_hook):
"""
Test a user changing state has service accounts validated
def test_state_changed_services_validation_and_groups_update(self, services_hook):
"""Test a user changing state has service accounts validated and groups updated
"""
svc = mock.Mock()
svc.validate_user.return_value = None
svc.update_groups.return_value = None
svc.access_perm = 'auth.access_testsvc'
services_hook.get_services.return_value = [svc]
@ -193,3 +201,62 @@ class ServicesSignalsTestCase(TestCase):
self.assertTrue(svc.validate_user.called)
args, kwargs = svc.validate_user.call_args
self.assertEqual(self.member, args[0])
self.assertTrue(svc.update_groups.called)
args, kwargs = svc.update_groups.call_args
self.assertEqual(self.member, args[0])
@mock.patch('allianceauth.services.signals.ServicesHook')
def test_state_changed_services_validation_and_groups_update_1(self, services_hook):
"""Test a user changing main has service accounts validated and sync updated
"""
svc = mock.Mock()
svc.validate_user.return_value = None
svc.sync_nickname.return_value = None
svc.access_perm = 'auth.access_testsvc'
services_hook.get_services.return_value = [svc]
new_main = EveCharacter.objects.create(
character_id=123,
character_name="Alter Ego",
corporation_id=987,
corporation_name="ABC"
)
self.member.profile.main_character = new_main
self.member.profile.save()
# Assert
self.assertTrue(services_hook.get_services.called)
self.assertTrue(svc.validate_user.called)
args, kwargs = svc.validate_user.call_args
self.assertEqual(self.member, args[0])
self.assertTrue(svc.sync_nickname.called)
args, kwargs = svc.sync_nickname.call_args
self.assertEqual(self.member, args[0])
@mock.patch('allianceauth.services.signals.ServicesHook')
def test_state_changed_services_validation_and_groups_update_2(self, services_hook):
"""Test a user changing main has service does not have accounts validated
and sync updated if the new main is equal to the old main
"""
svc = mock.Mock()
svc.validate_user.return_value = None
svc.sync_nickname.return_value = None
svc.access_perm = 'auth.access_testsvc'
services_hook.get_services.return_value = [svc]
# this creates a clone of the Django object
new_main = deepcopy(self.member.profile.main_character)
self.assertIsNot(new_main, self.member.profile.main_character)
self.member.profile.main_character = new_main
self.member.profile.save()
# Assert
self.assertFalse(services_hook.get_services.called)
self.assertFalse(svc.validate_user.called)
self.assertFalse(svc.sync_nickname.called)

BIN
allianceauth_model.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 274 KiB