Restructure Alliance Auth package (#867)

* Refactor allianceauth into its own package

* Add setup

* Add missing default_app_config declarations

* Fix timerboard namespacing

* Remove obsolete future imports

* Remove py2 mock support

* Remove six

* Add experimental 3.7 support and multiple Dj versions

* Remove python_2_unicode_compatible

* Add navhelper as local package

* Update requirements
This commit is contained in:
Basraah
2017-09-19 09:46:40 +10:00
committed by GitHub
parent d10580b56b
commit 786859294d
538 changed files with 1197 additions and 1523 deletions

View File

@@ -0,0 +1 @@
default_app_config = 'allianceauth.services.modules.openfire.apps.OpenfireServiceConfig'

View File

@@ -0,0 +1,9 @@
from django.contrib import admin
from .models import OpenfireUser
class OpenfireUserAdmin(admin.ModelAdmin):
list_display = ('user', 'username')
search_fields = ('user__username', 'username')
admin.site.register(OpenfireUser, OpenfireUserAdmin)

View File

@@ -0,0 +1,6 @@
from django.apps import AppConfig
class OpenfireServiceConfig(AppConfig):
name = 'allianceauth.services.modules.openfire'
label = 'openfire'

View File

@@ -0,0 +1,106 @@
import logging
from django.conf import settings
from django.template.loader import render_to_string
from allianceauth import hooks
from allianceauth.services.hooks import ServicesHook, MenuItemHook
from .tasks import OpenfireTasks
from .urls import urlpatterns
logger = logging.getLogger(__name__)
class OpenfireService(ServicesHook):
def __init__(self):
ServicesHook.__init__(self)
self.name = 'openfire'
self.urlpatterns = urlpatterns
self.service_url = settings.JABBER_URL
self.access_perm = 'openfire.access_openfire'
@property
def title(self):
return "Jabber"
def delete_user(self, user, notify_user=False):
logger.debug('Deleting user %s %s account' % (user, self.name))
return OpenfireTasks.delete_user(user, notify_user=notify_user)
def validate_user(self, user):
logger.debug('Validating user %s %s account' % (user, self.name))
if OpenfireTasks.has_account(user) and not self.service_active_for_user(user):
self.delete_user(user, notify_user=True)
def update_groups(self, user):
logger.debug('Updating %s groups for %s' % (self.name, user))
if OpenfireTasks.has_account(user):
OpenfireTasks.update_groups.delay(user.pk)
def update_all_groups(self):
logger.debug('Update all %s groups called' % self.name)
OpenfireTasks.update_all_groups.delay()
def service_active_for_user(self, user):
return user.has_perm(self.access_perm)
def render_services_ctrl(self, request):
"""
Example for rendering the service control panel row
You can override the default template and create a
custom one if you wish.
:param request:
:return:
"""
urls = self.Urls()
urls.auth_activate = 'auth_activate_openfire'
urls.auth_deactivate = 'auth_deactivate_openfire'
urls.auth_set_password = 'auth_set_openfire_password'
urls.auth_reset_password = 'auth_reset_openfire_password'
return render_to_string(self.service_ctrl_template, {
'service_name': self.title,
'urls': urls,
'service_url': self.service_url,
'username': request.user.openfire.username if OpenfireTasks.has_account(request.user) else ''
}, request=request)
@hooks.register('services_hook')
def register_service():
return OpenfireService()
class JabberBroadcast(MenuItemHook):
def __init__(self):
MenuItemHook.__init__(self,
'Jabber Broadcast',
'fa fa-lock fa-fw fa-bullhorn grayiconecolor',
'auth_jabber_broadcast_view')
def render(self, request):
if request.user.has_perm('auth.jabber_broadcast') or request.user.has_perm('auth.jabber_broadcast_all'):
return MenuItemHook.render(self, request)
return ''
class FleetBroadcastFormatter(MenuItemHook):
def __init__(self):
MenuItemHook.__init__(self,
'Fleet Broadcast Formatter',
'fa fa-lock fa-fw fa-space-shuttle grayiconecolor',
'auth_fleet_format_tool_view')
def render(self, request):
if request.user.has_perm('auth.jabber_broadcast') or request.user.has_perm('auth.jabber_broadcast_all'):
return MenuItemHook.render(self, request)
return ''
@hooks.register('menu_item_hook')
def register_formatter():
return FleetBroadcastFormatter()
@hooks.register('menu_item_hook')
def register_menu():
return JabberBroadcast()

View File

@@ -0,0 +1,7 @@
from django import forms
from django.utils.translation import ugettext_lazy as _
class JabberBroadcastForm(forms.Form):
group = forms.ChoiceField(label=_('Group'), widget=forms.Select)
message = forms.CharField(label=_('Message'), widget=forms.Textarea)

View File

@@ -0,0 +1,220 @@
import re
import random
import string
from urllib.parse import urlparse
import sleekxmpp
from django.conf import settings
from ofrestapi.users import Users as ofUsers
from ofrestapi import exception
import logging
logger = logging.getLogger(__name__)
class OpenfireManager:
def __init__(self):
pass
@staticmethod
def __add_address_to_username(username):
address = urlparse(settings.OPENFIRE_ADDRESS).netloc.split(":")[0]
completed_username = username + "@" + address
return completed_username
@staticmethod
def __sanitize_username(username):
# https://xmpp.org/extensions/xep-0106.html#escaping
replace = [
("\\", "\\5c"), # Escape backslashes first to double escape existing escape sequences
("\"", "\\22"),
("&", "\\26"),
("'", "\\27"),
("/", "\\2f"),
(":", "\\3a"),
("<", "\\3c"),
(">", "\\3e"),
("@", "\\40"),
("\u007F", ""),
("\uFFFE", ""),
("\uFFFF", ""),
(" ", "\\20"),
]
sanitized = username.strip(' ')
for find, rep in replace:
sanitized = sanitized.replace(find, rep)
return sanitized
@staticmethod
def __generate_random_pass():
return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)])
@staticmethod
def _sanitize_groupname(name):
name = name.strip(' _').lower()
return re.sub('[^\w.-]', '', name)
@staticmethod
def add_user(username):
logger.debug("Adding username %s to openfire." % username)
try:
sanitized_username = OpenfireManager.__sanitize_username(username)
password = OpenfireManager.__generate_random_pass()
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
api.add_user(sanitized_username, password)
logger.info("Added openfire user %s" % username)
except exception.UserAlreadyExistsException:
# User exist
logger.error("Attempting to add a user %s to openfire which already exists on server." % username)
return "", ""
return sanitized_username, password
@staticmethod
def delete_user(username):
logger.debug("Deleting user %s from openfire." % username)
try:
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
api.delete_user(username)
logger.info("Deleted user %s from openfire." % username)
return True
except exception.UserNotFoundException:
logger.error("Attempting to delete a user %s from openfire which was not found on server." % username)
return False
@staticmethod
def lock_user(username):
logger.debug("Locking openfire user %s" % username)
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
api.lock_user(username)
logger.info("Locked openfire user %s" % username)
@staticmethod
def unlock_user(username):
logger.debug("Unlocking openfire user %s" % username)
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
api.unlock_user(username)
logger.info("Unlocked openfire user %s" % username)
@staticmethod
def update_user_pass(username, password=None):
logger.debug("Updating openfire user %s password." % username)
try:
if not password:
password = OpenfireManager.__generate_random_pass()
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
api.update_user(username, password=password)
logger.info("Updated openfire user %s password." % username)
return password
except exception.UserNotFoundException:
logger.error("Unable to update openfire user %s password - user not found on server." % username)
return ""
@classmethod
def update_user_groups(cls, username, groups):
logger.debug("Updating openfire user %s groups %s" % (username, groups))
s_groups = list(map(cls._sanitize_groupname, groups)) # Sanitized group names
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
response = api.get_user_groups(username)
remote_groups = []
if response:
remote_groups = response['groupname']
if isinstance(remote_groups, str):
remote_groups = [remote_groups]
remote_groups = list(map(cls._sanitize_groupname, remote_groups))
logger.debug("Openfire user %s has groups %s" % (username, remote_groups))
add_groups = []
del_groups = []
for g in s_groups:
if g not in remote_groups:
add_groups.append(g)
for g in remote_groups:
if g not in s_groups:
del_groups.append(g)
logger.info(
"Updating openfire groups for user %s - adding %s, removing %s" % (username, add_groups, del_groups))
if add_groups:
api.add_user_groups(username, add_groups)
if del_groups:
api.delete_user_groups(username, del_groups)
@staticmethod
def delete_user_groups(username, groups):
logger.debug("Deleting openfire groups %s from user %s" % (groups, username))
api = ofUsers(settings.OPENFIRE_ADDRESS, settings.OPENFIRE_SECRET_KEY)
api.delete_user_groups(username, groups)
logger.info("Deleted groups %s from openfire user %s" % (groups, username))
@classmethod
def send_broadcast_message(cls, group_name, broadcast_message):
s_group_name = cls._sanitize_groupname(group_name)
logger.debug("Sending jabber ping to group %s with message %s" % (s_group_name, broadcast_message))
to_address = s_group_name + '@' + settings.BROADCAST_SERVICE_NAME + '.' + settings.JABBER_URL
xmpp = PingBot(settings.BROADCAST_USER, settings.BROADCAST_USER_PASSWORD, to_address, broadcast_message)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0199') # XMPP Ping
if xmpp.connect(reattempt=False):
xmpp.process(block=True)
message = None
if xmpp.message_sent:
logger.debug("Sent jabber ping to group %s" % group_name)
return
else:
message = "Failed to send Openfire broadcast message."
logger.error(message)
raise PingBotException(message)
else:
logger.error("Unable to connect to jabber server")
raise PingBotException("Unable to connect to jabber server.")
class PingBot(sleekxmpp.ClientXMPP):
"""
A copy-paste of the example client bot from
http://sleekxmpp.com/getting_started/sendlogout.html
"""
def __init__(self, jid, password, recipient, message):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.reconnect_max_attempts = 5
self.auto_reconnect = False
# The message we wish to send, and the JID that
# will receive it.
self.recipient = recipient
self.msg = message
# Success checking
self.message_sent = False
# The session_start event will be triggered when
# the bot establishes its connection with the server
# and the XML streams are ready for use. We want to
# listen for this event so that we we can initialize
# our roster.
self.add_event_handler("session_start", self.start)
if getattr(settings, 'BROADCAST_IGNORE_INVALID_CERT', False):
self.add_event_handler("ssl_invalid_cert", self.discard)
def discard(self, *args, **kwargs):
# Discard the event
return
def start(self, event):
self.send_presence()
self.get_roster()
self.send_message(mto=self.recipient,
mbody=self.msg,
mtype='chat')
self.message_sent = True
# Using wait=True ensures that the send queue will be
# emptied before ending the session.
self.disconnect(wait=True)
class PingBotException(Exception):
pass

View File

@@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.2 on 2016-12-12 03:27
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0008_alter_user_username_max_length'),
]
operations = [
migrations.CreateModel(
name='OpenfireUser',
fields=[
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, primary_key=True, related_name='openfire', serialize=False, to=settings.AUTH_USER_MODEL)),
('username', models.CharField(max_length=254)),
],
),
]

View File

@@ -0,0 +1,61 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-02-02 05:59
from __future__ import unicode_literals
from django.db import migrations
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.contrib.auth.management import create_permissions
import logging
logger = logging.getLogger(__name__)
def migrate_service_enabled(apps, schema_editor):
for app_config in apps.get_app_configs():
app_config.models_module = True
create_permissions(app_config, apps=apps, verbosity=0)
app_config.models_module = None
Group = apps.get_model("auth", "Group")
Permission = apps.get_model("auth", "Permission")
OpenfireUser = apps.get_model("openfire", "OpenfireUser")
perm = Permission.objects.get(codename='access_openfire')
member_group_name = getattr(settings, str('DEFAULT_AUTH_GROUP'), 'Member')
blue_group_name = getattr(settings, str('DEFAULT_BLUE_GROUP'), 'Blue')
# Migrate members
if OpenfireUser.objects.filter(user__groups__name=member_group_name).exists() or \
getattr(settings, str('ENABLE_AUTH_JABBER'), False):
try:
group = Group.objects.get(name=member_group_name)
group.permissions.add(perm)
except ObjectDoesNotExist:
logger.warning('Failed to migrate ENABLE_AUTH_JABBER setting')
# Migrate blues
if OpenfireUser.objects.filter(user__groups__name=blue_group_name).exists() or \
getattr(settings, str('ENABLE_BLUE_JABBER'), False):
try:
group = Group.objects.get(name=blue_group_name)
group.permissions.add(perm)
except ObjectDoesNotExist:
logger.warning('Failed to migrate ENABLE_BLUE_JABBER setting')
class Migration(migrations.Migration):
dependencies = [
('openfire', '0001_initial'),
]
operations = [
migrations.AlterModelOptions(
name='openfireuser',
options={'permissions': (('access_openfire', 'Can access the Openfire service'),)},
),
migrations.RunPython(migrate_service_enabled),
]

View File

@@ -0,0 +1,17 @@
from django.db import models
class OpenfireUser(models.Model):
user = models.OneToOneField('auth.User',
primary_key=True,
on_delete=models.CASCADE,
related_name='openfire')
username = models.CharField(max_length=254)
def __str__(self):
return self.username
class Meta:
permissions = (
("access_openfire", u"Can access the Openfire service"),
)

View File

@@ -0,0 +1,67 @@
import logging
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from allianceauth.notifications import notify
from allianceauth.celeryapp import app
from allianceauth.services.modules.openfire.manager import OpenfireManager
from .models import OpenfireUser
logger = logging.getLogger(__name__)
class OpenfireTasks:
def __init__(self):
pass
@classmethod
def delete_user(cls, user, notify_user=False):
if cls.has_account(user):
logger.debug("User %s has jabber account %s. Deleting." % (user, user.openfire.username))
OpenfireManager.delete_user(user.openfire.username)
user.openfire.delete()
if notify_user:
notify(user, 'Jabber Account Disabled', level='danger')
return True
return False
@staticmethod
def has_account(user):
try:
return user.openfire.username != ''
except ObjectDoesNotExist:
return False
@staticmethod
def disable_jabber():
logging.debug("Deleting all Openfire users")
OpenfireUser.objects.all().delete()
@staticmethod
@app.task(bind=True, name="openfire.update_groups")
def update_groups(self, pk):
user = User.objects.get(pk=pk)
logger.debug("Updating jabber groups for user %s" % user)
if OpenfireTasks.has_account(user):
groups = []
for group in user.groups.all():
groups.append(str(group.name))
if len(groups) == 0:
groups.append('empty')
logger.debug("Updating user %s jabber groups to %s" % (user, groups))
try:
OpenfireManager.update_user_groups(user.openfire.username, groups)
except:
logger.exception("Jabber group sync failed for %s, retrying in 10 mins" % user)
raise self.retry(countdown=60 * 10)
logger.debug("Updated user %s jabber groups." % user)
else:
logger.debug("User does not have an openfire account")
@staticmethod
@app.task(name="openfire.update_all_groups")
def update_all_groups():
logger.debug("Updating ALL jabber groups")
for openfire_user in OpenfireUser.objects.exclude(username__exact=''):
OpenfireTasks.update_groups.delay(openfire_user.user.pk)

View File

@@ -0,0 +1,231 @@
from unittest import mock
from django.test import TestCase, RequestFactory
from django import urls
from django.contrib.auth.models import User, Group, Permission
from django.core.exceptions import ObjectDoesNotExist
from allianceauth.tests.auth_utils import AuthUtils
from .auth_hooks import OpenfireService
from .models import OpenfireUser
from .tasks import OpenfireTasks
MODULE_PATH = 'allianceauth.services.modules.openfire'
DEFAULT_AUTH_GROUP = 'Member'
def add_permissions():
permission = Permission.objects.get(codename='access_openfire')
members = Group.objects.get_or_create(name=DEFAULT_AUTH_GROUP)[0]
AuthUtils.add_permissions_to_groups([permission], [members])
class OpenfireHooksTestCase(TestCase):
def setUp(self):
self.member = 'member_user'
member = AuthUtils.create_member(self.member)
OpenfireUser.objects.create(user=member, username=self.member)
self.none_user = 'none_user'
none_user = AuthUtils.create_user(self.none_user)
self.service = OpenfireService
add_permissions()
def test_has_account(self):
member = User.objects.get(username=self.member)
none_user = User.objects.get(username=self.none_user)
self.assertTrue(OpenfireTasks.has_account(member))
self.assertFalse(OpenfireTasks.has_account(none_user))
def test_service_enabled(self):
service = self.service()
member = User.objects.get(username=self.member)
none_user = User.objects.get(username=self.none_user)
self.assertTrue(service.service_active_for_user(member))
self.assertFalse(service.service_active_for_user(none_user))
@mock.patch(MODULE_PATH + '.tasks.OpenfireManager')
def test_update_all_groups(self, manager):
service = self.service()
service.update_all_groups()
# Check member and blue user have groups updated
self.assertTrue(manager.update_user_groups.called)
self.assertEqual(manager.update_user_groups.call_count, 1)
def test_update_groups(self):
# Check member has Member group updated
with mock.patch(MODULE_PATH + '.tasks.OpenfireManager') as manager:
service = self.service()
member = User.objects.get(username=self.member)
service.update_groups(member)
self.assertTrue(manager.update_user_groups.called)
args, kwargs = manager.update_user_groups.call_args
user_id, groups = args
self.assertIn(DEFAULT_AUTH_GROUP, groups)
self.assertEqual(user_id, member.openfire.username)
# Check none user does not have groups updated
with mock.patch(MODULE_PATH + '.tasks.OpenfireManager') as manager:
service = self.service()
none_user = User.objects.get(username=self.none_user)
service.update_groups(none_user)
self.assertFalse(manager.update_user_groups.called)
@mock.patch(MODULE_PATH + '.tasks.OpenfireManager')
def test_validate_user(self, manager):
service = self.service()
# Test member is not deleted
member = User.objects.get(username=self.member)
service.validate_user(member)
self.assertTrue(member.openfire)
# Test none user is deleted
none_user = User.objects.get(username=self.none_user)
OpenfireUser.objects.create(user=none_user, username='abc123')
service.validate_user(none_user)
self.assertTrue(manager.delete_user.called)
with self.assertRaises(ObjectDoesNotExist):
none_openfire = User.objects.get(username=self.none_user).openfire
@mock.patch(MODULE_PATH + '.tasks.OpenfireManager')
def test_delete_user(self, manager):
member = User.objects.get(username=self.member)
service = self.service()
result = service.delete_user(member)
self.assertTrue(result)
self.assertTrue(manager.delete_user.called)
with self.assertRaises(ObjectDoesNotExist):
openfire_user = User.objects.get(username=self.member).openfire
def test_render_services_ctrl(self):
service = self.service()
member = User.objects.get(username=self.member)
request = RequestFactory().get('/en/services/')
request.user = member
response = service.render_services_ctrl(request)
self.assertTemplateUsed(service.service_ctrl_template)
self.assertIn(urls.reverse('auth_deactivate_openfire'), response)
self.assertIn(urls.reverse('auth_reset_openfire_password'), response)
self.assertIn(urls.reverse('auth_set_openfire_password'), response)
# Test register becomes available
member.openfire.delete()
member = User.objects.get(username=self.member)
request.user = member
response = service.render_services_ctrl(request)
self.assertIn(urls.reverse('auth_activate_openfire'), response)
class OpenfireViewsTestCase(TestCase):
def setUp(self):
self.member = AuthUtils.create_member('auth_member')
self.member.set_password('password')
self.member.email = 'auth_member@example.com'
self.member.save()
AuthUtils.add_main_character(self.member, 'auth_member', '12345', corp_id='111', corp_name='Test Corporation')
add_permissions()
def login(self):
self.client.login(username=self.member.username, password='password')
@mock.patch(MODULE_PATH + '.tasks.OpenfireManager')
@mock.patch(MODULE_PATH + '.views.OpenfireManager')
def test_activate(self, manager, tasks_manager):
self.login()
expected_username = 'auth_member'
manager.add_user.return_value = (expected_username, 'abc123')
response = self.client.get(urls.reverse('auth_activate_openfire'))
self.assertTrue(manager.add_user.called)
self.assertTrue(tasks_manager.update_user_groups.called)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed('registered/service_credentials.html')
self.assertContains(response, expected_username)
openfire_user = OpenfireUser.objects.get(user=self.member)
self.assertEqual(openfire_user.username, expected_username)
@mock.patch(MODULE_PATH + '.tasks.OpenfireManager')
def test_deactivate(self, manager):
self.login()
OpenfireUser.objects.create(user=self.member, username='some member')
response = self.client.get(urls.reverse('auth_deactivate_openfire'))
self.assertTrue(manager.delete_user.called)
self.assertRedirects(response, expected_url=urls.reverse('auth_services'), target_status_code=200)
with self.assertRaises(ObjectDoesNotExist):
openfire_user = User.objects.get(pk=self.member.pk).openfire
@mock.patch(MODULE_PATH + '.views.OpenfireManager')
def test_set_password(self, manager):
self.login()
OpenfireUser.objects.create(user=self.member, username='some member')
response = self.client.post(urls.reverse('auth_set_openfire_password'), data={'password': '1234asdf'})
self.assertTrue(manager.update_user_pass.called)
args, kwargs = manager.update_user_pass.call_args
self.assertEqual(kwargs['password'], '1234asdf')
self.assertRedirects(response, expected_url=urls.reverse('auth_services'), target_status_code=200)
@mock.patch(MODULE_PATH + '.views.OpenfireManager')
def test_reset_password(self, manager):
self.login()
OpenfireUser.objects.create(user=self.member, username='some member')
manager.update_user_pass.return_value = 'hunter2'
response = self.client.get(urls.reverse('auth_reset_openfire_password'))
self.assertTemplateUsed(response, 'registered/service_credentials.html')
self.assertContains(response, 'some member')
self.assertContains(response, 'hunter2')
class OpenfireManagerTestCase(TestCase):
def setUp(self):
from .manager import OpenfireManager
self.manager = OpenfireManager
def test_generate_random_password(self):
password = self.manager._OpenfireManager__generate_random_pass()
self.assertEqual(len(password), 16)
self.assertIsInstance(password, type(''))
def test__sanitize_username(self):
test_username = " My_Test User\"'&/:<>@name\\20name"
result_username = self.manager._OpenfireManager__sanitize_username(test_username)
self.assertEqual(result_username, 'My_Test\\20User\\22\\27\\26\\2f\\3a\\3c\\3e\\40name\\5c20name')
def test__sanitize_groupname(self):
test_groupname = " My_Test Groupname"
result_groupname = self.manager._sanitize_groupname(test_groupname)
self.assertEqual(result_groupname, "my_testgroupname")
@mock.patch(MODULE_PATH + '.manager.ofUsers')
def test_update_user_groups(self, api):
groups = ["AddGroup", "othergroup", "Guest Group"]
server_groups = ["othergroup", "Guest Group", "REMOVE group"]
username = "testuser"
api_instance = api.return_value
api_instance.get_user_groups.return_value = {'groupname': server_groups}
self.manager.update_user_groups(username, groups)
self.assertTrue(api_instance.add_user_groups.called)
args, kwargs = api_instance.add_user_groups.call_args
self.assertEqual(args[1], ["addgroup"])
self.assertTrue(api_instance.delete_user_groups.called)
args, kwargs = api_instance.delete_user_groups.call_args
self.assertEqual(args[1], ["removegroup"])

View File

@@ -0,0 +1,27 @@
from django.conf.urls import url, include
from django.conf.urls.i18n import i18n_patterns
from django.utils.translation import ugettext_lazy as _
from . import views
module_urls = [
# Jabber Service Control
url(r'^activate/$', views.activate_jabber, name='auth_activate_openfire'),
url(r'^deactivate/$', views.deactivate_jabber, name='auth_deactivate_openfire'),
url(r'^reset_password/$', views.reset_jabber_password, name='auth_reset_openfire_password'),
]
module_i18n_urls = [
url(_(r'^set_password/$'), views.set_jabber_password, name='auth_set_openfire_password'),
]
urlpatterns = [
url(r'^openfire/', include(module_urls))
]
urlpatterns += i18n_patterns(
# Jabber Broadcast
url(_(r'^services/jabber_broadcast/$'), views.jabber_broadcast_view, name='auth_jabber_broadcast_view'),
# Jabber
url(r'openfire/', include(module_i18n_urls))
)

View File

@@ -0,0 +1,157 @@
import datetime
import logging
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.models import Group
from django.shortcuts import render, redirect
from allianceauth.services.forms import ServicePasswordForm
from .forms import JabberBroadcastForm
from .manager import OpenfireManager, PingBotException
from .models import OpenfireUser
from .tasks import OpenfireTasks
logger = logging.getLogger(__name__)
ACCESS_PERM = 'openfire.access_openfire'
@login_required
@permission_required(ACCESS_PERM)
def activate_jabber(request):
logger.debug("activate_jabber called by user %s" % request.user)
character = request.user.profile.main_character
logger.debug("Adding jabber user for user %s with main character %s" % (request.user, character))
info = OpenfireManager.add_user(character.character_name)
# If our username is blank means we already had a user
if info[0] is not "":
OpenfireUser.objects.update_or_create(user=request.user, defaults={'username': info[0]})
logger.debug("Updated authserviceinfo for user %s with jabber credentials. Updating groups." % request.user)
OpenfireTasks.update_groups.delay(request.user.pk)
logger.info("Successfully activated jabber for user %s" % request.user)
messages.success(request, 'Activated jabber account.')
credentials = {
'username': info[0],
'password': info[1],
}
return render(request, 'registered/service_credentials.html',
context={'credentials': credentials, 'service': 'Jabber'})
else:
logger.error("Unsuccessful attempt to activate jabber for user %s" % request.user)
messages.error(request, 'An error occurred while processing your jabber account.')
return redirect("auth_services")
@login_required
@permission_required(ACCESS_PERM)
def deactivate_jabber(request):
logger.debug("deactivate_jabber called by user %s" % request.user)
if OpenfireTasks.has_account(request.user) and OpenfireTasks.delete_user(request.user):
logger.info("Successfully deactivated jabber for user %s" % request.user)
messages.success(request, 'Deactivated jabber account.')
else:
logger.error("Unsuccessful attempt to deactivate jabber for user %s" % request.user)
messages.error(request, 'An error occurred while processing your jabber account.')
return redirect("auth_services")
@login_required
@permission_required(ACCESS_PERM)
def reset_jabber_password(request):
logger.debug("reset_jabber_password called by user %s" % request.user)
if OpenfireTasks.has_account(request.user):
result = OpenfireManager.update_user_pass(request.user.openfire.username)
# If our username is blank means we failed
if result != "":
logger.info("Successfully reset jabber password for user %s" % request.user)
messages.success(request, 'Reset jabber password.')
credentials = {
'username': request.user.openfire.username,
'password': result,
}
return render(request, 'registered/service_credentials.html',
context={'credentials': credentials, 'service': 'Jabber'})
logger.error("Unsuccessful attempt to reset jabber for user %s" % request.user)
messages.error(request, 'An error occurred while processing your jabber account.')
return redirect("auth_services")
@login_required
@permission_required('auth.jabber_broadcast')
def jabber_broadcast_view(request):
logger.debug("jabber_broadcast_view called by user %s" % request.user)
allchoices = []
if request.user.has_perm('auth.jabber_broadcast_all'):
allchoices.append(('all', 'all'))
for g in Group.objects.all():
allchoices.append((str(g.name), str(g.name)))
else:
for g in request.user.groups.all():
allchoices.append((str(g.name), str(g.name)))
if request.method == 'POST':
form = JabberBroadcastForm(request.POST)
form.fields['group'].choices = allchoices
logger.debug("Received POST request containing form, valid: %s" % form.is_valid())
if form.is_valid():
main_char = request.user.profile.main_character
logger.debug("Processing jabber broadcast for user %s with main character %s" % (request.user, main_char))
try:
if main_char is not None:
message_to_send = form.cleaned_data[
'message'] + "\n##### SENT BY: " + "[" + main_char.corporation_ticker + "]" + \
main_char.character_name + " TO: " + \
form.cleaned_data['group'] + " WHEN: " + datetime.datetime.utcnow().strftime(
"%Y-%m-%d %H:%M:%S") + " #####\n##### Replies are NOT monitored #####\n"
group_to_send = form.cleaned_data['group']
else:
message_to_send = form.cleaned_data[
'message'] + "\n##### SENT BY: " + "No character but can send pings?" + " TO: " + \
form.cleaned_data['group'] + " WHEN: " + datetime.datetime.utcnow().strftime(
"%Y-%m-%d %H:%M:%S") + " #####\n##### Replies are NOT monitored #####\n"
group_to_send = form.cleaned_data['group']
OpenfireManager.send_broadcast_message(group_to_send, message_to_send)
messages.success(request, 'Sent jabber broadcast to %s' % group_to_send)
logger.info("Sent jabber broadcast on behalf of user %s" % request.user)
except PingBotException as e:
messages.error(request, e)
else:
form = JabberBroadcastForm()
form.fields['group'].choices = allchoices
logger.debug("Generated broadcast form for user %s containing %s groups" % (
request.user, len(form.fields['group'].choices)))
context = {'form': form}
return render(request, 'registered/jabberbroadcast.html', context=context)
@login_required
@permission_required(ACCESS_PERM)
def set_jabber_password(request):
logger.debug("set_jabber_password called by user %s" % request.user)
if request.method == 'POST':
logger.debug("Received POST request with form.")
form = ServicePasswordForm(request.POST)
logger.debug("Form is valid: %s" % form.is_valid())
if form.is_valid() and OpenfireTasks.has_account(request.user):
password = form.cleaned_data['password']
logger.debug("Form contains password of length %s" % len(password))
result = OpenfireManager.update_user_pass(request.user.openfire.username, password=password)
if result != "":
logger.info("Successfully set jabber password for user %s" % request.user)
messages.success(request, 'Set jabber password.')
else:
logger.error("Failed to install custom jabber password for user %s" % request.user)
messages.error(request, 'An error occurred while processing your jabber account.')
return redirect("auth_services")
else:
logger.debug("Request is not type POST - providing empty form.")
form = ServicePasswordForm()
logger.debug("Rendering form for user %s" % request.user)
context = {'form': form, 'service': 'Jabber'}
return render(request, 'registered/service_password.html', context=context)