Refactor mumble service (#914)

* Added in_organisation check to EveCharacter model

* Basic name formatter

* Switch mumble service to use name formatter

* Squash services migrations

* Add name to example service to allow it to be used in tests

* Add name formatter to services

* Add abstract views, model, form for services modules

* Refactor mumble service to new style

* Don't set credentials if setting a provided password

* Add success message to set password view
This commit is contained in:
Basraah
2017-11-04 06:52:45 +10:00
committed by Adarnof
parent c4979a22dd
commit 86362bb0dd
13 changed files with 342 additions and 239 deletions

View File

@@ -6,8 +6,8 @@ from allianceauth.notifications import notify
from allianceauth import hooks
from allianceauth.services.hooks import ServicesHook
from .manager import MumbleManager
from .tasks import MumbleTasks
from .models import MumbleUser
from .urls import urlpatterns
logger = logging.getLogger(__name__)
@@ -25,11 +25,14 @@ class MumbleService(ServicesHook):
def delete_user(self, user, notify_user=False):
logging.debug("Deleting user %s %s account" % (user, self.name))
if MumbleManager.delete_user(user):
if notify_user:
notify(user, 'Mumble Account Disabled', level='danger')
return True
return False
try:
if user.mumble.delete():
if notify_user:
notify(user, 'Mumble Account Disabled', level='danger')
return True
return False
except MumbleUser.DoesNotExist:
logging.debug("User does not have a mumble account")
def update_groups(self, user):
logger.debug("Updating %s groups for %s" % (self.name, user))

View File

@@ -1,96 +0,0 @@
import random
import string
from passlib.hash import bcrypt_sha256
from django.core.exceptions import ObjectDoesNotExist
from .models import MumbleUser
import logging
logger = logging.getLogger(__name__)
class MumbleManager:
def __init__(self):
pass
HASH_FN = 'bcrypt-sha256'
@staticmethod
def __santatize_username(username):
sanatized = username.replace(" ", "_")
return sanatized
@staticmethod
def __generate_random_pass():
return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)])
@classmethod
def _gen_pwhash(cls, password):
return bcrypt_sha256.encrypt(password.encode('utf-8'))
@classmethod
def create_user(cls, user, username):
logger.debug("Creating mumble user with username %s" % (username))
username_clean = cls.__santatize_username(username)
password = cls.__generate_random_pass()
pwhash = cls._gen_pwhash(password)
logger.debug("Proceeding with mumble user creation: clean username %s, pwhash starts with %s" % (
username_clean, pwhash[0:5]))
if not MumbleUser.objects.filter(username=username_clean).exists():
logger.info("Creating mumble user %s" % username_clean)
MumbleUser.objects.create(user=user, username=username_clean, pwhash=pwhash, hashfn=cls.HASH_FN)
return username_clean, password
else:
logger.warn("Mumble user %s already exists.")
return False
@staticmethod
def delete_user(user):
logger.debug("Deleting user %s from mumble." % user)
if MumbleUser.objects.filter(user=user).exists():
MumbleUser.objects.filter(user=user).delete()
logger.info("Deleted user %s from mumble" % user)
return True
logger.error("Unable to delete user %s from mumble: MumbleUser model not found" % user)
return False
@classmethod
def update_user_password(cls, user, password=None):
logger.debug("Updating mumble user %s password." % user)
if not password:
password = cls.__generate_random_pass()
pwhash = cls._gen_pwhash(password)
logger.debug("Proceeding with mumble user %s password update - pwhash starts with %s" % (user, pwhash[0:5]))
try:
model = MumbleUser.objects.get(user=user)
model.pwhash = pwhash
model.hashfn = cls.HASH_FN
model.save()
return password
except ObjectDoesNotExist:
logger.error("User %s not found on mumble. Unable to update password." % user)
return False
@staticmethod
def update_groups(user, groups):
logger.debug("Updating mumble user %s groups %s" % (user, groups))
safe_groups = list(set([g.replace(' ', '-') for g in groups]))
groups = ''
for g in safe_groups:
groups = groups + g + ','
groups = groups.strip(',')
if MumbleUser.objects.filter(user=user).exists():
logger.info("Updating mumble user %s groups to %s" % (user, safe_groups))
model = MumbleUser.objects.get(user=user)
model.groups = groups
model.save()
return True
else:
logger.error("User %s not found on mumble. Unable to update groups." % user)
return False
@staticmethod
def user_exists(username):
return MumbleUser.objects.filter(username=username).exists()

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.6 on 2017-10-09 09:19
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('mumble', '0006_service_permissions'),
]
operations = [
migrations.RemoveField(
model_name='mumbleuser',
name='id',
),
migrations.AlterField(
model_name='mumbleuser',
name='user',
field=models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, primary_key=True, related_name='mumble', serialize=False, to=settings.AUTH_USER_MODEL),
),
]

View File

@@ -1,16 +1,98 @@
import random
import string
from passlib.hash import bcrypt_sha256
from django.db import models
from django.contrib.auth.models import Group
from allianceauth.services.hooks import NameFormatter
from allianceauth.services.abstract import AbstractServiceModel
import logging
logger = logging.getLogger(__name__)
class MumbleUser(models.Model):
user = models.OneToOneField('auth.User', related_name='mumble', null=True, on_delete=models.CASCADE)
class MumbleManager(models.Manager):
HASH_FN = 'bcrypt-sha256'
@staticmethod
def get_username(user):
from .auth_hooks import MumbleService
return NameFormatter(MumbleService(), user).format_name()
@staticmethod
def sanitise_username(username):
return username.replace(" ", "_")
@staticmethod
def generate_random_pass():
return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)])
@staticmethod
def gen_pwhash(password):
return bcrypt_sha256.encrypt(password.encode('utf-8'))
def create(self, user):
username = self.get_username(user)
logger.debug("Creating mumble user with username {}".format(username))
username_clean = self.sanitise_username(username)
password = self.generate_random_pass()
pwhash = self.gen_pwhash(password)
logger.debug("Proceeding with mumble user creation: clean username {}, pwhash starts with {}".format(
username_clean, pwhash[0:5]))
logger.info("Creating mumble user {}".format(username_clean))
result = super(MumbleManager, self).create(user=user, username=username_clean,
pwhash=pwhash, hashfn=self.HASH_FN)
result.update_groups()
result.credentials.update({'username': result.username, 'password': password})
return result
def user_exists(self, username):
return self.filter(username=username).exists()
class MumbleUser(AbstractServiceModel):
username = models.CharField(max_length=254, unique=True)
pwhash = models.CharField(max_length=80)
hashfn = models.CharField(max_length=20, default='sha1')
groups = models.TextField(blank=True, null=True)
objects = MumbleManager()
def __str__(self):
return self.username
def update_password(self, password=None):
init_password = password
logger.debug("Updating mumble user %s password.".format(self.user))
if not password:
password = MumbleManager.generate_random_pass()
pwhash = MumbleManager.gen_pwhash(password)
logger.debug("Proceeding with mumble user {} password update - pwhash starts with {}".format(
self.user, pwhash[0:5]))
self.pwhash = pwhash
self.hashfn = MumbleManager.HASH_FN
self.save()
if init_password is None:
self.credentials.update({'username': self.username, 'password': password})
def reset_password(self):
self.update_password()
def update_groups(self, groups: Group=None):
if groups is None:
groups = self.user.groups.all()
groups_str = []
for group in groups:
groups_str.append(str(group.name))
if len(groups) == 0:
groups_str.append('empty')
safe_groups = ','.join(set([g.replace(' ', '-') for g in groups_str]))
logger.info("Updating mumble user {} groups to {}".format(self.user, safe_groups))
self.groups = safe_groups
self.save()
return True
class Meta:
permissions = (
("access_mumble", u"Can access the Mumble service"),

View File

@@ -2,10 +2,8 @@ import logging
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from allianceauth.services.hooks import NameFormatter
from allianceauth.celery import app
from .manager import MumbleManager
from .models import MumbleUser
logger = logging.getLogger(__name__)
@@ -27,32 +25,25 @@ class MumbleTasks:
logger.info("Deleting all MumbleUser models")
MumbleUser.objects.all().delete()
@staticmethod
def get_username(user):
from .auth_hooks import MumbleService
return NameFormatter(MumbleService(), user).format_name()
@staticmethod
@app.task(bind=True, name="mumble.update_groups")
def update_groups(self, pk):
user = User.objects.get(pk=pk)
logger.debug("Updating mumble groups for user %s" % user)
if MumbleTasks.has_account(user):
groups = []
for group in user.groups.all():
groups.append(str(group.name))
if len(groups) == 0:
groups.append('empty')
logger.debug("Updating user %s mumble groups to %s" % (user, groups))
try:
if not MumbleManager.update_groups(user, groups):
if not user.mumble.update_groups():
raise Exception("Group sync failed")
logger.debug("Updated user %s mumble groups." % user)
return True
except MumbleUser.DoesNotExist:
logger.info("Mumble group sync failed for {}, user does not have a mumble account".format(user))
except:
logger.exception("Mumble group sync failed for %s, retrying in 10 mins" % user)
raise self.retry(countdown=60 * 10)
logger.debug("Updated user %s mumble groups." % user)
else:
logger.debug("User %s does not have a mumble account, skipping" % user)
return False
@staticmethod
@app.task(name="mumble.update_all_groups")

View File

@@ -25,7 +25,7 @@ class MumbleHooksTestCase(TestCase):
def setUp(self):
self.member = 'member_user'
member = AuthUtils.create_member(self.member)
MumbleUser.objects.create(user=member, username=self.member, pwhash='password', groups='Member')
MumbleUser.objects.create(user=member)
self.none_user = 'none_user'
none_user = AuthUtils.create_user(self.none_user)
self.service = MumbleService
@@ -45,13 +45,13 @@ class MumbleHooksTestCase(TestCase):
self.assertTrue(service.service_active_for_user(member))
self.assertFalse(service.service_active_for_user(none_user))
@mock.patch(MODULE_PATH + '.tasks.MumbleManager')
def test_update_all_groups(self, manager):
@mock.patch(MODULE_PATH + '.tasks.User.mumble')
def test_update_all_groups(self, mumble):
service = self.service()
service.update_all_groups()
# Check member and blue user have groups updated
self.assertTrue(manager.update_groups.called)
self.assertEqual(manager.update_groups.call_count, 1)
self.assertTrue(mumble.update_groups.called)
self.assertEqual(mumble.update_groups.call_count, 1)
def test_update_groups(self):
# Check member has Member group updated
@@ -66,11 +66,10 @@ class MumbleHooksTestCase(TestCase):
self.assertIn(DEFAULT_AUTH_GROUP, mumble_user.groups)
# Check none user does not have groups updated
with mock.patch(MODULE_PATH + '.tasks.MumbleManager') as manager:
service = self.service()
none_user = User.objects.get(username=self.none_user)
service.update_groups(none_user)
self.assertFalse(manager.update_groups.called)
service = self.service()
none_user = User.objects.get(username=self.none_user)
result = service.update_groups(none_user)
self.assertFalse(result)
def test_validate_user(self):
service = self.service()
@@ -81,7 +80,7 @@ class MumbleHooksTestCase(TestCase):
# Test none user is deleted
none_user = User.objects.get(username=self.none_user)
MumbleUser.objects.create(user=none_user, username='mr no-name', pwhash='password', groups='Blue,Orange')
MumbleUser.objects.create(user=none_user)
service.validate_user(none_user)
with self.assertRaises(ObjectDoesNotExist):
none_mumble = User.objects.get(username=self.none_user).mumble
@@ -139,11 +138,11 @@ class MumbleViewsTestCase(TestCase):
self.assertTrue(mumble_user.pwhash)
self.assertEqual('Member', mumble_user.groups)
def test_deactivate(self):
def test_deactivate_post(self):
self.login()
MumbleUser.objects.create(user=self.member, username='some member')
MumbleUser.objects.create(user=self.member)
response = self.client.get(urls.reverse('mumble:deactivate'))
response = self.client.post(urls.reverse('mumble:deactivate'))
self.assertRedirects(response, expected_url=urls.reverse('services:services'), target_status_code=200)
with self.assertRaises(ObjectDoesNotExist):
@@ -151,37 +150,39 @@ class MumbleViewsTestCase(TestCase):
def test_set_password(self):
self.login()
MumbleUser.objects.create(user=self.member, username='some member', pwhash='old')
created = MumbleUser.objects.create(user=self.member)
old_pwd = created.credentials.get('password')
response = self.client.post(urls.reverse('mumble:set_password'), data={'password': '1234asdf'})
self.assertNotEqual(MumbleUser.objects.get(user=self.member).pwhash, 'old')
self.assertNotEqual(MumbleUser.objects.get(user=self.member).pwhash, old_pwd)
self.assertRedirects(response, expected_url=urls.reverse('services:services'), target_status_code=200)
def test_reset_password(self):
self.login()
MumbleUser.objects.create(user=self.member, username='some member', pwhash='old')
created = MumbleUser.objects.create(user=self.member)
old_pwd = created.credentials.get('password')
response = self.client.get(urls.reverse('mumble:reset_password'))
self.assertNotEqual(MumbleUser.objects.get(user=self.member).pwhash, 'old')
self.assertNotEqual(MumbleUser.objects.get(user=self.member).pwhash, old_pwd)
self.assertTemplateUsed(response, 'services/service_credentials.html')
self.assertContains(response, 'some member')
self.assertContains(response, 'auth_member')
class MumbleManagerTestCase(TestCase):
def setUp(self):
from .manager import MumbleManager
from .models import MumbleManager
self.manager = MumbleManager
def test_generate_random_password(self):
password = self.manager._MumbleManager__generate_random_pass()
password = self.manager.generate_random_pass()
self.assertEqual(len(password), 16)
self.assertIsInstance(password, type(''))
def test_gen_pwhash(self):
pwhash = self.manager._gen_pwhash('test')
pwhash = self.manager.gen_pwhash('test')
self.assertEqual(pwhash[:15], '$bcrypt-sha256$')
self.assertEqual(len(pwhash), 75)

View File

@@ -6,11 +6,10 @@ app_name = 'mumble'
module_urls = [
# Mumble service control
url(r'^activate/$', views.activate_mumble, name='activate'),
url(r'^deactivate/$', views.deactivate_mumble, name='deactivate'),
url(r'^reset_password/$', views.reset_mumble_password,
name='reset_password'),
url(r'^set_password/$', views.set_mumble_password, name='set_password'),
url(r'^activate/$', views.CreateAccountMumbleView.as_view(), name='activate'),
url(r'^deactivate/$', views.DeleteMumbleView.as_view(), name='deactivate'),
url(r'^reset_password/$', views.ResetPasswordMumbleView.as_view(), name='reset_password'),
url(r'^set_password/$', views.SetPasswordMumbleView.as_view(), name='set_password'),
]
urlpatterns = [

View File

@@ -1,103 +1,37 @@
import logging
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.shortcuts import render, redirect
from allianceauth.services.forms import ServicePasswordForm
from allianceauth.services.forms import ServicePasswordModelForm
from allianceauth.services.abstract import BaseCreatePasswordServiceAccountView, BaseDeactivateServiceAccountView, \
BaseResetPasswordServiceAccountView, BaseSetPasswordServiceAccountView
from .manager import MumbleManager
from .tasks import MumbleTasks
from .models import MumbleUser
logger = logging.getLogger(__name__)
ACCESS_PERM = 'mumble.access_mumble'
class MumblePasswordForm(ServicePasswordModelForm):
class Meta:
model = MumbleUser
fields = ('password',)
@login_required
@permission_required(ACCESS_PERM)
def activate_mumble(request):
logger.debug("activate_mumble called by user %s" % request.user)
character = request.user.profile.main_character
logger.debug("Adding mumble user for %s with main character %s" % (request.user, character))
result = MumbleManager.create_user(request.user, MumbleTasks.get_username(request.user))
if result:
logger.debug("Updated authserviceinfo for user %s with mumble credentials. Updating groups." % request.user)
MumbleTasks.update_groups.apply(args=(request.user.pk,)) # Run synchronously to prevent timing issues
logger.info("Successfully activated mumble for user %s" % request.user)
messages.success(request, 'Activated Mumble account.')
credentials = {
'username': result[0],
'password': result[1],
}
return render(request, 'services/service_credentials.html',
context={'credentials': credentials, 'service': 'Mumble'})
else:
logger.error("Unsuccessful attempt to activate mumble for user %s" % request.user)
messages.error(request, 'An error occurred while processing your Mumble account.')
return redirect("services:services")
class MumbleViewMixin:
service_name = 'mumble'
model = MumbleUser
permission_required = 'mumble.access_mumble'
@login_required
@permission_required(ACCESS_PERM)
def deactivate_mumble(request):
logger.debug("deactivate_mumble called by user %s" % request.user)
# if we successfully remove the user or the user is already removed
if MumbleManager.delete_user(request.user):
logger.info("Successfully deactivated mumble for user %s" % request.user)
messages.success(request, 'Deactivated Mumble account.')
else:
logger.error("Unsuccessful attempt to deactivate mumble for user %s" % request.user)
messages.error(request, 'An error occurred while processing your Mumble account.')
return redirect("services:services")
class CreateAccountMumbleView(MumbleViewMixin, BaseCreatePasswordServiceAccountView):
pass
@login_required
@permission_required(ACCESS_PERM)
def reset_mumble_password(request):
logger.debug("reset_mumble_password called by user %s" % request.user)
result = MumbleManager.update_user_password(request.user)
# if blank we failed
if result != "":
logger.info("Successfully reset mumble password for user %s" % request.user)
messages.success(request, 'Reset Mumble password.')
credentials = {
'username': request.user.mumble.username,
'password': result,
}
return render(request, 'services/service_credentials.html',
context={'credentials': credentials, 'service': 'Mumble'})
else:
logger.error("Unsuccessful attempt to reset mumble password for user %s" % request.user)
messages.error(request, 'An error occurred while processing your Mumble account.')
return redirect("services:services")
class DeleteMumbleView(MumbleViewMixin, BaseDeactivateServiceAccountView):
pass
@login_required
@permission_required(ACCESS_PERM)
def set_mumble_password(request):
logger.debug("set_mumble_password called by user %s" % request.user)
if request.method == 'POST':
logger.debug("Received POST request with form.")
form = ServicePasswordForm(request.POST)
logger.debug("Form is valid: %s" % form.is_valid())
if form.is_valid() and MumbleTasks.has_account(request.user):
password = form.cleaned_data['password']
logger.debug("Form contains password of length %s" % len(password))
result = MumbleManager.update_user_password(request.user, password=password)
if result != "":
logger.info("Successfully reset mumble password for user %s" % request.user)
messages.success(request, 'Set Mumble password.')
else:
logger.error("Failed to install custom mumble password for user %s" % request.user)
messages.error(request, 'An error occurred while processing your Mumble account.')
return redirect("services:services")
else:
logger.debug("Request is not type POST - providing empty form.")
form = ServicePasswordForm()
class ResetPasswordMumbleView(MumbleViewMixin, BaseResetPasswordServiceAccountView):
pass
logger.debug("Rendering form for user %s" % request.user)
context = {'form': form, 'service': 'Mumble'}
return render(request, 'services/service_password.html', context=context)
class SetPasswordMumbleView(MumbleViewMixin, BaseSetPasswordServiceAccountView):
form_class = MumblePasswordForm