diff --git a/allianceauth/services/modules/mumble/app_settings.py b/allianceauth/services/modules/mumble/app_settings.py new file mode 100644 index 00000000..df03155e --- /dev/null +++ b/allianceauth/services/modules/mumble/app_settings.py @@ -0,0 +1,5 @@ +from django.conf import settings + +MUMBLE_TEMPS_FORCE_SSO = getattr(settings, "MUMBLE_TEMPS_FORCE_SSO", True) +MUMBLE_TEMPS_SSO_PREFIX = getattr(settings, "MUMBLE_TEMPS_SSO_PREFIX", "[TEMP]") +MUMBLE_TEMPS_LOGIN_PREFIX = getattr(settings, "MUMBLE_TEMPS_LOGIN_PREFIX", "[*TEMP]") diff --git a/allianceauth/services/modules/mumble/auth_hooks.py b/allianceauth/services/modules/mumble/auth_hooks.py index f1522e0a..f395531d 100644 --- a/allianceauth/services/modules/mumble/auth_hooks.py +++ b/allianceauth/services/modules/mumble/auth_hooks.py @@ -7,11 +7,10 @@ from django.template.loader import render_to_string from allianceauth.notifications import notify from allianceauth import hooks -from allianceauth.services.hooks import ServicesHook, UrlHook +from allianceauth.services.hooks import ServicesHook from .tasks import MumbleTasks from .models import MumbleUser from .urls import urlpatterns -from allianceauth.services.modules.mumble import urls from django.utils.translation import gettext_lazy as _ logger = logging.getLogger(__name__) @@ -43,11 +42,6 @@ class MumbleService(ServicesHook): if MumbleTasks.has_account(user): MumbleTasks.update_groups.delay(user.pk) - def sync_nickname(self, user): - logger.debug(f"Updating {self.name} nickname for {user}") - if MumbleTasks.has_account(user): - MumbleTasks.update_display_name.apply_async(args=[user.pk], countdown=5) # cooldown on this task to ensure DB clean when syncing - def validate_user(self, user): if MumbleTasks.has_account(user) and not self.service_active_for_user(user): self.delete_user(user, notify_user=True) @@ -74,14 +68,14 @@ class MumbleService(ServicesHook): 'username': request.user.mumble.username if MumbleTasks.has_account(request.user) else '', }, request=request) + @hooks.register('services_hook') def register_mumble_service() -> ServicesHook: return MumbleService() class MumbleMenuItem(MenuItemHook): - def __init__(self): - # setup menu entry for sidebar + def __init__(self) -> None: MenuItemHook.__init__( self=self, text=_("Mumble Temp Links"), @@ -90,12 +84,12 @@ class MumbleMenuItem(MenuItemHook): navactive=["mumble:index"], ) - def render(self, request): + def render(self, request) -> str: if request.user.has_perm("mumble.create_new_templinks"): return MenuItemHook.render(self, request) return "" @hooks.register("menu_item_hook") -def register_menu(): +def register_menu() -> MumbleMenuItem: return MumbleMenuItem() diff --git a/allianceauth/services/modules/mumble/managers.py b/allianceauth/services/modules/mumble/managers.py deleted file mode 100644 index 5ee02357..00000000 --- a/allianceauth/services/modules/mumble/managers.py +++ /dev/null @@ -1,60 +0,0 @@ -import random -import string -from passlib.hash import bcrypt_sha256 - -from django.db import models -from allianceauth.services.hooks import NameFormatter -import logging - -logger = logging.getLogger(__name__) - - -class MumbleManager(models.Manager): - - @staticmethod - def get_display_name(user): - from .auth_hooks import MumbleService - return NameFormatter(MumbleService(), user).format_name() - - @staticmethod - def get_username(user): - return user.profile.main_character.character_name # main character as the user.username may be incorect - - @staticmethod - def sanitise_username(username): - return username.replace(" ", "_") - - @staticmethod - def generate_random_pass(): - return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)]) - - @staticmethod - def gen_pwhash(password): - return bcrypt_sha256.encrypt(password.encode('utf-8')) - - def create(self, user): - try: - username = self.get_username(user) - logger.debug(f"Creating mumble user with username {username}") - username_clean = self.sanitise_username(username) - display_name = self.get_display_name(user) - password = self.generate_random_pass() - pwhash = self.gen_pwhash(password) - logger.debug("Proceeding with mumble user creation: clean username {}, pwhash starts with {}".format( - username_clean, pwhash[0:5])) - logger.info(f"Creating mumble user {username_clean}") - - result = super().create( - user=user, - username=username_clean, - pwhash=pwhash, - display_name=display_name) - result.update_groups() - result.credentials.update({'username': result.username, 'password': password}) - return result - except AttributeError: # No Main or similar errors - return False - return False - - def user_exists(self, username) -> bool: - return self.filter(username=username).exists() diff --git a/allianceauth/services/modules/mumble/models.py b/allianceauth/services/modules/mumble/models.py index 8e04f5b2..28217c8a 100644 --- a/allianceauth/services/modules/mumble/models.py +++ b/allianceauth/services/modules/mumble/models.py @@ -1,7 +1,9 @@ +import random +import string from typing import LiteralString from allianceauth.eveonline.models import EveCharacter from allianceauth.services.hooks import NameFormatter -from allianceauth.services.modules.mumble.managers import MumbleManager +from passlib.hash import bcrypt_sha256 from django.db import models from allianceauth.services.abstract import AbstractServiceModel import logging @@ -48,7 +50,21 @@ class MumbleUser(AbstractServiceModel): blank=True, null=True, editable=False, help_text="Timestamp of the users Last Disconnection from Mumble") - objects = MumbleManager() + @staticmethod + def get_username(user) -> str: + return user.profile.main_character.character_name # main character as the user.username may be incorrect + + @staticmethod + def sanitise_username(username) -> str: + return username.replace(" ", "_") + + @staticmethod + def generate_random_pass() -> str: + return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)]) + + @staticmethod + def gen_pwhash(password) -> str: + return bcrypt_sha256.encrypt(password.encode('utf-8')) def __str__(self) -> str: return f"{self.username}" @@ -57,8 +73,8 @@ class MumbleUser(AbstractServiceModel): init_password = password logger.debug(f"Updating mumble user {self.user} password.") if not password: - password = MumbleManager.generate_random_pass() - pwhash = MumbleManager.gen_pwhash(password) + password = self.generate_random_pass() + pwhash = self.gen_pwhash(password) logger.debug(f"Proceeding with mumble user {self.user} password update - pwhash starts with {pwhash[0:5]}") self.pwhash = pwhash self.hashfn = self.HashFunction.SHA256 @@ -66,10 +82,10 @@ class MumbleUser(AbstractServiceModel): if init_password is None: self.credentials.update({'username': self.username, 'password': password}) - def reset_password(self): + def reset_password(self) -> None: self.update_password() - def group_string(self) -> LiteralString: + def group_string(self) -> str: """Return a Mumble Safe Formatted List of Groups This used to be a ModelField, generated on the fly now with DjangoAuthenticatorTM @@ -81,10 +97,33 @@ class MumbleUser(AbstractServiceModel): groups_str.append(str(group.name)) return ','.join({g.replace(' ', '-') for g in groups_str}) - def get_display_name(self): + def get_display_name(self) -> str: from .auth_hooks import MumbleService return NameFormatter(MumbleService(), self.user).format_name() + def display_name(self) -> str: + return self.get_display_name() + + def create(self, user): + try: + username = self.get_username(user) + logger.debug(f"Creating mumble user with username {username}") + username_clean = self.sanitise_username(username) + password = self.generate_random_pass() + pwhash = self.gen_pwhash(password) + logger.debug("Proceeding with mumble user creation: clean username {}, pwhash starts with {}".format( + username_clean, pwhash[0:5])) + logger.info(f"Creating mumble user {username_clean}") + + result = self.objects.create(user=user, username=username_clean, pwhash=pwhash) + result.credentials.update({'username': result.username, 'password': password}) + return result + except AttributeError: # No Main or similar errors + return False + + def user_exists(self, username) -> bool: + return self.objects.filter(username=username).exists() + class Meta: verbose_name = _("User") verbose_name_plural = _("Users") diff --git a/allianceauth/services/modules/mumble/tasks.py b/allianceauth/services/modules/mumble/tasks.py index e02a1c86..c1cffedb 100644 --- a/allianceauth/services/modules/mumble/tasks.py +++ b/allianceauth/services/modules/mumble/tasks.py @@ -26,63 +26,9 @@ class MumbleTasks: logger.info("Deleting all MumbleUser models") MumbleUser.objects.all().delete() - @staticmethod - @shared_task(bind=True, name="mumble.update_groups", base=QueueOnce) - def update_groups(self, pk): - user = User.objects.get(pk=pk) - logger.debug("Updating mumble groups for user %s" % user) - if MumbleTasks.has_account(user): - try: - if not user.mumble.update_groups(): - raise Exception("Group sync failed") - logger.debug("Updated user %s mumble groups." % user) - return True - except MumbleUser.DoesNotExist: - logger.info(f"Mumble group sync failed for {user}, user does not have a mumble account") - except: - logger.exception("Mumble group sync failed for %s, retrying in 10 mins" % user) - raise self.retry(countdown=60 * 10) - else: - logger.debug("User %s does not have a mumble account, skipping" % user) - return False - - @staticmethod - @shared_task(bind=True, name="mumble.update_display_name", base=QueueOnce) - def update_display_name(self, pk): - user = User.objects.get(pk=pk) - logger.debug("Updating mumble groups for user %s" % user) - if MumbleTasks.has_account(user): - try: - if not user.mumble.update_display_name(): - raise Exception("Display Name Sync failed") - logger.debug("Updated user %s mumble display name." % user) - return True - except MumbleUser.DoesNotExist: - logger.info(f"Mumble display name sync failed for {user}, user does not have a mumble account") - except: - logger.exception("Mumble display name sync failed for %s, retrying in 10 mins" % user) - raise self.retry(countdown=60 * 10) - else: - logger.debug("User %s does not have a mumble account, skipping" % user) - return False - - @staticmethod - @shared_task(name="mumble.update_all_groups") - def update_all_groups(): - logger.debug("Updating ALL mumble groups") - for mumble_user in MumbleUser.objects.exclude(username__exact=''): - MumbleTasks.update_groups.delay(mumble_user.user.pk) - - @staticmethod - @shared_task(name="mumble.update_all_display_names") - def update_all_display_names(): - logger.debug("Updating ALL mumble display names") - for mumble_user in MumbleUser.objects.exclude(username__exact=''): - MumbleTasks.update_display_name.delay(mumble_user.user.pk) - @shared_task -def tidy_up_temp_links(): - TempLink.objects.filter(expires__lt=datetime.now(timezone.utc).replace(tzinfo=timezone.utc).timestamp()).delete() +def tidy_up_temp_links() -> None: + TempLink.objects.filter(expires__lt=datetime.now(timezone.utc).timestamp()).delete() TempUser.objects.filter(templink__isnull=True).delete() - TempUser.objects.filter(expires__lt=datetime.now(timezone.utc).replace(tzinfo=timezone.utc).timestamp()).delete() + TempUser.objects.filter(expires__lt=datetime.now(timezone.utc).timestamp()).delete() diff --git a/allianceauth/services/modules/mumble/tests.py b/allianceauth/services/modules/mumble/tests.py index e17d0267..5c4f3acb 100644 --- a/allianceauth/services/modules/mumble/tests.py +++ b/allianceauth/services/modules/mumble/tests.py @@ -47,32 +47,6 @@ class MumbleHooksTestCase(TestCase): self.assertTrue(service.service_active_for_user(member)) self.assertFalse(service.service_active_for_user(none_user)) - @mock.patch(MODULE_PATH + '.tasks.User.mumble') - def test_update_all_groups(self, mumble): - service = self.service() - service.update_all_groups() - # Check member and blue user have groups updated - self.assertTrue(mumble.update_groups.called) - self.assertEqual(mumble.update_groups.call_count, 1) - - def test_update_groups(self): - # Check member has Member group updated - service = self.service() - member = User.objects.get(username=self.member) - member.mumble.groups = '' # Remove the group set in setUp - member.mumble.save() - - service.update_groups(member) - - mumble_user = MumbleUser.objects.get(user=member) - self.assertIn(DEFAULT_AUTH_GROUP, mumble_user.groups) - - # Check none user does not have groups updated - service = self.service() - none_user = User.objects.get(username=self.none_user) - result = service.update_groups(none_user) - self.assertFalse(result) - def test_validate_user(self): service = self.service() # Test member is not deleted diff --git a/allianceauth/services/modules/mumble/views.py b/allianceauth/services/modules/mumble/views.py index f975ff86..337c0dbe 100644 --- a/allianceauth/services/modules/mumble/views.py +++ b/allianceauth/services/modules/mumble/views.py @@ -6,8 +6,7 @@ from allianceauth.eveonline.models import EveCharacter from allianceauth.services.forms import ServicePasswordModelForm from allianceauth.services.abstract import BaseCreatePasswordServiceAccountView, BaseDeactivateServiceAccountView, \ BaseResetPasswordServiceAccountView, BaseSetPasswordServiceAccountView -from allianceauth.services.hooks import NameFormatter -from allianceauth.services.modules.mumble.auth_hooks import MumbleService +from allianceauth.services.modules.mumble import app_settings from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required, permission_required @@ -16,6 +15,7 @@ from django.db.models import Count from django.http import Http404, HttpResponse, JsonResponse from django.shortcuts import redirect, render from django.utils.crypto import get_random_string +from esi.views import sso_redirect from .models import MumbleUser, TempLink, TempUser @@ -126,16 +126,8 @@ def index(request): ) tl.save() - tl_list = TempLink.objects.prefetch_related("creator").filter( - expires__gte=datetime.datetime.utcnow() - .replace(tzinfo=datetime.timezone.utc) - .timestamp() - ) - ex_tl_list = TempLink.objects.prefetch_related("creator").filter( - expires__lt=datetime.datetime.utcnow() - .replace(tzinfo=datetime.timezone.utc) - .timestamp() - ) + tl_list = TempLink.objects.prefetch_related("creator").filter(expires__gte=datetime.now(timezone.utc)) + ex_tl_list = TempLink.objects.prefetch_related("creator").filter(expires__lt=datetime.now(timezone.utc)) context = { "tl": tl, @@ -236,17 +228,9 @@ def link_sso(request, token, link): password = get_random_string(length=15) - display_name = "{}{}".format( - app_settings.MUMBLE_TEMPS_SSO_PREFIX, - NameFormatter( - service=MumbleService(), user=PseudoUser(main=char, username=username) - ).format_name(), - ) - temp_user = TempUser.objects.create( username=username, password=password, - name=display_name, expires=link.expires, templink=link, character_id=char.character_id,