SeAT Manager (#704)

* SeAT service in modular service app format

* Replace string concatenation with formatters

* Fix incorrect references to user

* Fix exception when user doesn't have seat active

* Prevent deletion of seat API keys by default

* Improve api response error handling

* Corrected notification message

* Added missing view returns

* Update SeAT to use permissions based access

* Update password generator to new style

* Correct logging message

* Fix seat role update tasks

* Correct validate user logic

* Add seat test settings

* Added basic seat unit tests

* Added add permissions function from other branch

* Remove obsolete settings references
This commit is contained in:
Basraah 2017-02-12 13:04:47 +10:00 committed by Adarnof
parent 918ecf812c
commit 914c204a40
17 changed files with 878 additions and 1 deletions

View File

@ -24,7 +24,6 @@ BROKER_URL = 'redis://localhost:6379/0'
CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler" CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
CELERYBEAT_SCHEDULE = dict() CELERYBEAT_SCHEDULE = dict()
# Build paths inside the project like this: os.path.join(BASE_DIR, ...) # Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@ -77,6 +76,7 @@ INSTALLED_APPS = [
'services.modules.ips4', 'services.modules.ips4',
'services.modules.market', 'services.modules.market',
'services.modules.openfire', 'services.modules.openfire',
'services.modules.seat',
'services.modules.smf', 'services.modules.smf',
'services.modules.phpbb3', 'services.modules.phpbb3',
'services.modules.xenforo', 'services.modules.xenforo',
@ -549,6 +549,14 @@ IPS4_DB = {
'PORT': os.environ.get('AA_DB_IPS4_PORT', '3306'), 'PORT': os.environ.get('AA_DB_IPS4_PORT', '3306'),
} }
#####################################
# SEAT Configuration
#####################################
# SEAT_URL - base url of the seat install (no trailing slash)
# SEAT_XTOKEN - API key X-Token provided by SeAT
#####################################
SEAT_URL = os.environ.get('AA_SEAT_URL', 'http://example.com/seat')
SEAT_XTOKEN = os.environ.get('AA_SEAT_XTOKEN', '')
###################################### ######################################
# SMF Configuration # SMF Configuration
@ -708,3 +716,9 @@ if 'services.modules.teamspeak3' in INSTALLED_APPS:
'task': 'services.modules.teamspeak3.tasks.Teamspeak3Tasks.run_ts3_group_update', 'task': 'services.modules.teamspeak3.tasks.Teamspeak3Tasks.run_ts3_group_update',
'schedule': crontab(minute='*/30'), 'schedule': crontab(minute='*/30'),
} }
if 'services.modules.seat' in INSTALLED_APPS:
CELERYBEAT_SCHEDULE['run_seat_api_sync'] = {
'task': 'services.modules.seat.tasks.SeatTasks.run_api_sync',
'schedule': crontab(minute='*/30'),
}

View File

@ -79,3 +79,15 @@ class AuthUtils:
user=user user=user
) )
AuthServicesInfo.objects.update_or_create(user=user, defaults={'main_char_id': character_id}) AuthServicesInfo.objects.update_or_create(user=user, defaults={'main_char_id': character_id})
@classmethod
def add_permissions_to_groups(cls, perms, groups, disconnect_signals=True):
if disconnect_signals:
cls.disconnect_signals()
for group in groups:
for perm in perms:
group.permissions.add(perm)
if disconnect_signals:
cls.connect_signals()

View File

@ -64,6 +64,7 @@ INSTALLED_APPS = [
'services.modules.ips4', 'services.modules.ips4',
'services.modules.market', 'services.modules.market',
'services.modules.openfire', 'services.modules.openfire',
'services.modules.seat',
'services.modules.smf', 'services.modules.smf',
'services.modules.phpbb3', 'services.modules.phpbb3',
'services.modules.xenforo', 'services.modules.xenforo',
@ -486,6 +487,14 @@ DISCOURSE_SSO_SECRET = 'd836444a9e4084d5b224a60c208dce14'
IPS4_URL = os.environ.get('AA_IPS4_URL', 'http://example.com/ips4') IPS4_URL = os.environ.get('AA_IPS4_URL', 'http://example.com/ips4')
IPS4_API_KEY = os.environ.get('AA_IPS4_API_KEY', '') IPS4_API_KEY = os.environ.get('AA_IPS4_API_KEY', '')
#####################################
# SEAT Configuration
#####################################
# SEAT_URL - base url of the seat install (no trailing slash)
# SEAT_XTOKEN - API key X-Token provided by SeAT
#####################################
SEAT_URL = os.environ.get('AA_SEAT_URL', 'http://example.com/seat')
SEAT_XTOKEN = os.environ.get('AA_SEAT_XTOKEN', 'tokentokentoken')
###################################### ######################################
# SMF Configuration # SMF Configuration

View File

@ -165,6 +165,14 @@ class EveManager(object):
else: else:
logger.debug("No api keypairs found for user %s" % user) logger.debug("No api keypairs found for user %s" % user)
@staticmethod
def get_all_api_key_pairs():
if EveApiKeyPair.objects.exists():
logger.debug("Returning all api keypairs.")
return EveApiKeyPair.objects.all()
else:
logger.debug("No api keypairs found.")
@staticmethod @staticmethod
def check_if_api_key_pair_exist(api_id): def check_if_api_key_pair_exist(api_id):
if EveApiKeyPair.objects.filter(api_id=api_id).exists(): if EveApiKeyPair.objects.filter(api_id=api_id).exists():

View File

View File

@ -0,0 +1,10 @@
from __future__ import unicode_literals
from django.contrib import admin
from .models import SeatUser
class SeatUserAdmin(admin.ModelAdmin):
list_display = ('user', 'username')
search_fields = ('user__username', 'username')
admin.site.register(SeatUser, SeatUserAdmin)

View File

@ -0,0 +1,7 @@
from __future__ import unicode_literals
from django.apps import AppConfig
class SeatServiceConfig(AppConfig):
name = 'seat'

View File

@ -0,0 +1,66 @@
from __future__ import unicode_literals
from django.template.loader import render_to_string
from django.conf import settings
from services.hooks import ServicesHook
from alliance_auth import hooks
from .urls import urlpatterns
from .tasks import SeatTasks
import logging
logger = logging.getLogger(__name__)
class SeatService(ServicesHook):
def __init__(self):
ServicesHook.__init__(self)
self.urlpatterns = urlpatterns
self.name = 'seat'
self.service_url = settings.SEAT_URL
self.access_perm = 'seat.access_seat'
@property
def title(self):
return "SeAT"
def delete_user(self, user, notify_user=False):
logger.debug('Deleting user %s %s account' % (user, self.name))
return SeatTasks.delete_user(user, notify_user=notify_user)
def validate_user(self, user):
logger.debug('Validating user %s %s account' % (user, self.name))
if SeatTasks.has_account(user) and not self.service_active_for_user(user):
self.delete_user(user)
def update_groups(self, user):
logger.debug("Updating %s groups for %s" % (self.name, user))
if SeatTasks.has_account(user):
SeatTasks.update_roles.delay(user.pk)
def update_all_groups(self):
logger.debug('Update all %s groups called' % self.name)
SeatTasks.update_all_roles.delay()
def service_active_for_user(self, user):
return user.has_perm(self.access_perm)
def render_services_ctrl(self, request):
urls = self.Urls()
urls.auth_activate = 'auth_activate_seat'
urls.auth_deactivate = 'auth_deactivate_seat'
urls.auth_reset_password = 'auth_reset_seat_password'
urls.auth_set_password = 'auth_set_seat_password'
return render_to_string(self.service_ctrl_template, {
'service_name': self.title,
'urls': urls,
'service_url': self.service_url,
'username': request.user.seat.username if SeatTasks.has_account(request.user) else ''
}, request=request)
@hooks.register('services_hook')
def register_service():
return SeatService()

View File

@ -0,0 +1,261 @@
from __future__ import unicode_literals
import random
import string
import requests
from eveonline.managers import EveManager
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from six import iteritems
import logging
logger = logging.getLogger(__name__)
class SeatManager:
def __init__(self):
pass
RESPONSE_OK = 'ok'
@staticmethod
def __santatize_username(username):
sanatized = username.replace(" ", "_")
return sanatized.lower()
@staticmethod
def __generate_random_pass():
return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)])
@classmethod
def _response_ok(cls, response):
return cls.RESPONSE_OK in response
@staticmethod
def exec_request(endpoint, func, **kwargs):
""" Send an https api request """
try:
endpoint = '{0}/api/v1/{1}'.format(settings.SEAT_URL, endpoint)
headers = {'X-Token': settings.SEAT_XTOKEN, 'Accept': 'application/json'}
logger.debug(headers)
logger.debug(endpoint)
ret = getattr(requests, func)(endpoint, headers=headers, data=kwargs)
ret.raise_for_status()
return ret.json()
except:
logger.exception("Error encountered while performing API request to SeAT with url {}".format(endpoint))
return {}
@classmethod
def add_user(cls, username, email):
""" Add user to service """
sanatized = str(SeatManager.__santatize_username(username))
logger.debug("Adding user to SeAT with username %s" % sanatized)
password = SeatManager.__generate_random_pass()
ret = SeatManager.exec_request('user', 'post', username=sanatized, email=str(email), password=password)
logger.debug(ret)
if cls._response_ok(ret):
logger.info("Added SeAT user with username %s" % sanatized)
return sanatized, password
logger.info("Failed to add SeAT user with username %s" % sanatized)
return None
@classmethod
def delete_user(cls, username):
""" Delete user """
ret = cls.exec_request('user/{}'.format(username), 'delete')
logger.debug(ret)
if cls._response_ok(ret):
logger.info("Deleted SeAT user with username %s" % username)
return username
return None
@classmethod
def disable_user(cls, username):
""" Disable user """
ret = cls.exec_request('user/{}'.format(username), 'put', active=0)
logger.debug(ret)
ret = cls.exec_request('user/{}'.format(username), 'put', email="")
logger.debug(ret)
if cls._response_ok(ret):
try:
cls.update_roles(username, [])
logger.info("Disabled SeAT user with username %s" % username)
return username
except KeyError:
# if something goes wrong, delete user from seat instead of disabling
if cls.delete_user(username):
return username
logger.info("Failed to disabled SeAT user with username %s" % username)
return None
@classmethod
def enable_user(cls, username):
""" Enable user """
ret = SeatManager.exec_request('user/{}'.format(username), 'put', active=1)
logger.debug(ret)
if cls._response_ok(ret):
logger.info("Enabled SeAT user with username %s" % username)
return username
logger.info("Failed to enabled SeAT user with username %s" % username)
return None
@classmethod
def update_user(cls, username, email, password):
""" Edit user info """
logger.debug("Updating SeAT username %s with email %s and password hash starting with %s" % (username, email,
password[0:5]))
ret = SeatManager.exec_request('user/{}'.format(username), 'put', email=email)
logger.debug(ret)
if not cls._response_ok(ret):
logger.warn("Failed to update email for username {}".format(username))
ret = SeatManager.exec_request('user/{}'.format(username), 'put', password=password)
logger.debug(ret)
if not cls._response_ok(ret):
logger.warn("Failed to update password for username {}".format(username))
return None
logger.info("Updated SeAT user with username %s" % username)
return username
@staticmethod
def update_user_password(username, email, plain_password=None):
logger.debug("Settings new SeAT password for user %s" % username)
if not plain_password:
plain_password = SeatManager.__generate_random_pass()
if SeatManager.update_user(username, email, plain_password):
return plain_password
@staticmethod
def check_user_status(username):
sanatized = str(SeatManager.__santatize_username(username))
logger.debug("Checking SeAT status for user %s" % sanatized)
ret = SeatManager.exec_request('user/{}'.format(sanatized), 'get')
logger.debug(ret)
return ret
@staticmethod
def get_all_seat_eveapis():
seat_all_keys = SeatManager.exec_request('key', 'get')
seat_keys = {}
for key in seat_all_keys:
try:
seat_keys[key["key_id"]] = key["user_id"]
except KeyError:
seat_keys[key["key_id"]] = None
return seat_keys
@staticmethod
def synchronize_eveapis(user=None):
seat_all_keys = SeatManager.get_all_seat_eveapis()
userinfo = None
# retrieve only user-specific api keys if user is specified
if user:
keypars = EveManager.get_api_key_pairs(user)
try:
userinfo = SeatManager.check_user_status(user.seat.username)
except ObjectDoesNotExist:
pass
else:
# retrieve all api keys instead
keypars = EveManager.get_all_api_key_pairs()
if keypars:
for keypar in keypars:
if keypar.api_id not in seat_all_keys.keys():
#Add new keys
logger.debug("Adding Api Key with ID %s" % keypar.api_id)
ret = SeatManager.exec_request('key', 'post', key_id=keypar.api_id, v_code=keypar.api_key)
logger.debug(ret)
else:
# remove it from the list so it doesn't get deleted in the last step
seat_all_keys.pop(keypar.api_id)
if not userinfo: # TODO: should the following be done only for new keys?
# Check the key's user status
logger.debug("Retrieving user name from Auth's SeAT users database")
try:
if keypar.user.seat.username:
logger.debug("Retrieving user %s info from SeAT users database" % keypar.user.seat.username)
userinfo = SeatManager.check_user_status(keypar.user.seat.username)
except ObjectDoesNotExist:
pass
if userinfo:
try:
# If the user has activated seat, assign the key to him.
logger.debug("Transferring Api Key with ID %s to user %s with ID %s " % (
keypar.api_id,
keypar.user.seat.username,
userinfo['id']))
ret = SeatManager.exec_request('key/transfer/{}/{}'.format(keypar.api_id, userinfo['id']),
'get')
logger.debug(ret)
except ObjectDoesNotExist:
logger.debug("User does not have SeAT activated, could not assign key to user")
if bool(seat_all_keys) and not user and hasattr(settings, 'SEAT_PURGE_DELETED') and settings.SEAT_PURGE_DELETED:
# remove from SeAT keys that were removed from Auth
for key, key_user in iteritems(seat_all_keys):
# Remove the key only if it is an account or character key
ret = SeatManager.exec_request('key/{}'.format(key), 'get')
logger.debug(ret)
try:
if (ret['info']['type'] == "Account") or (ret['info']['type'] == "Character"):
logger.debug("Removing api key %s from SeAT database" % key)
ret = SeatManager.exec_request('key/{}'.format(key), 'delete')
logger.debug(ret)
except KeyError:
pass
@staticmethod
def get_all_roles():
groups = {}
ret = SeatManager.exec_request('role', 'get')
logger.debug(ret)
for group in ret:
groups[group["title"]] = group["id"]
logger.debug("Retrieved role list from SeAT: %s" % str(groups))
return groups
@staticmethod
def add_role(role):
ret = SeatManager.exec_request('role/new', 'post', name=role)
logger.debug(ret)
logger.info("Added Seat group %s" % role)
role_info = SeatManager.exec_request('role/detail/{}'.format(role), 'get')
logger.debug(role_info)
return role_info["id"]
@staticmethod
def add_role_to_user(user_id, role_id):
ret = SeatManager.exec_request('role/grant-user-role/{}/{}'.format(user_id, role_id), 'get')
logger.info("Added role %s to user %s" % (role_id, user_id))
return ret
@staticmethod
def revoke_role_from_user(user_id, role_id):
ret = SeatManager.exec_request('role/revoke-user-role/{}/{}'.format(user_id, role_id), 'get')
logger.info("Revoked role %s from user %s" % (role_id, user_id))
return ret
@staticmethod
def update_roles(seat_user, roles):
logger.debug("Updating SeAT user %s with roles %s" % (seat_user, roles))
user_info = SeatManager.check_user_status(seat_user)
user_roles = {}
if type(user_info["roles"]) is list:
for role in user_info["roles"]:
user_roles[role["title"]] = role["id"]
logger.debug("Got user %s SeAT roles %s" % (seat_user, user_roles))
seat_roles = SeatManager.get_all_roles()
addroles = set(roles) - set(user_roles.keys())
remroles = set(user_roles.keys()) - set(roles)
logger.info("Updating SeAT roles for user %s - adding %s, removing %s" % (seat_user, addroles, remroles))
for r in addroles:
if r not in seat_roles:
seat_roles[r] = SeatManager.add_role(r)
logger.debug("Adding role %s to SeAT user %s" % (r, seat_user))
SeatManager.add_role_to_user(user_info["id"], seat_roles[r])
for r in remroles:
logger.debug("Removing role %s from user %s" % (r, seat_user))
SeatManager.revoke_role_from_user(user_info["id"], seat_roles[r])

View File

@ -0,0 +1,26 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.4 on 2017-01-15 07:06
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0008_alter_user_username_max_length'),
]
operations = [
migrations.CreateModel(
name='SeatUser',
fields=[
('user', models.OneToOneField(on_delete=django.db.models.deletion.CASCADE, primary_key=True, related_name='seat', serialize=False, to=settings.AUTH_USER_MODEL)),
('username', models.CharField(max_length=254)),
],
),
]

View File

@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.10.5 on 2017-02-02 10:49
from __future__ import unicode_literals
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('seat', '0001_initial'),
]
operations = [
migrations.AlterModelOptions(
name='seatuser',
options={'permissions': (('access_seat', 'Can access the SeAT service'),)},
),
]

View File

@ -0,0 +1,21 @@
from __future__ import unicode_literals
from django.utils.encoding import python_2_unicode_compatible
from django.contrib.auth.models import User
from django.db import models
@python_2_unicode_compatible
class SeatUser(models.Model):
user = models.OneToOneField(User,
primary_key=True,
on_delete=models.CASCADE,
related_name='seat')
username = models.CharField(max_length=254)
def __str__(self):
return self.username
class Meta:
permissions = (
("access_seat", u"Can access the SeAT service"),
)

View File

@ -0,0 +1,76 @@
from __future__ import unicode_literals
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
from django.conf import settings
from notifications import notify
from alliance_auth.celeryapp import app
from .models import SeatUser
from .manager import SeatManager
import logging
logger = logging.getLogger(__name__)
class SeatTasks:
def __init__(self):
pass
@staticmethod
def has_account(user):
try:
return user.seat.username != ''
except ObjectDoesNotExist:
return False
@classmethod
def delete_user(cls, user, notify_user=False):
if cls.has_account(user) and SeatManager.disable_user(user.seat.username):
user.seat.delete()
logger.info("Successfully deactivated SeAT for user %s" % user)
if notify_user:
notify(user, 'SeAT Account Disabled', level='danger')
return True
return False
@staticmethod
@app.task(bind=True)
def update_roles(self, pk):
user = User.objects.get(pk=pk)
logger.debug("Updating SeAT roles for user %s" % user)
groups = []
if SeatTasks.has_account(user):
for group in user.groups.all():
groups.append(str(group.name))
if len(groups) == 0:
logger.debug("No syncgroups found for user. Adding empty group.")
groups.append('empty')
logger.debug("Updating user %s SeAT roles to %s" % (user, groups))
try:
SeatManager.update_roles(user.seat.username, groups)
except:
logger.warn("SeAT group sync failed for %s, retrying in 10 mins" % user, exc_info=True)
raise self.retry(countdown=60 * 10)
logger.debug("Updated user %s SeAT roles." % user)
else:
logger.debug("User %s does not have a SeAT account")
@staticmethod
@app.task
def update_all_roles():
logger.debug("Updating ALL SeAT roles")
for user in SeatUser.objects.all():
SeatTasks.update_roles.delay(user.user_id)
@staticmethod
def deactivate():
SeatUser.objects.all().delete()
@staticmethod
@app.task
def run_api_sync():
logger.debug("Running EVE API synchronization with SeAT")
SeatManager.synchronize_eveapis()

View File

@ -0,0 +1,220 @@
from __future__ import unicode_literals
try:
# Py3
from unittest import mock
except ImportError:
# Py2
import mock
from django.test import TestCase, RequestFactory
from django.conf import settings
from django import urls
from django.contrib.auth.models import User, Group, Permission
from django.core.exceptions import ObjectDoesNotExist
from alliance_auth.tests.auth_utils import AuthUtils
from .auth_hooks import SeatService
from .models import SeatUser
from .tasks import SeatTasks
MODULE_PATH = 'services.modules.seat'
def add_permissions():
permission = Permission.objects.get(codename='access_seat')
members = Group.objects.get(name=settings.DEFAULT_AUTH_GROUP)
blues = Group.objects.get(name=settings.DEFAULT_BLUE_GROUP)
AuthUtils.add_permissions_to_groups([permission], [members, blues])
class SeatHooksTestCase(TestCase):
def setUp(self):
self.member = 'member_user'
member = AuthUtils.create_member(self.member)
SeatUser.objects.create(user=member, username=self.member)
self.blue = 'blue_user'
blue = AuthUtils.create_blue(self.blue)
SeatUser.objects.create(user=blue, username=self.blue)
self.none_user = 'none_user'
none_user = AuthUtils.create_user(self.none_user, disconnect_signals=True)
self.service = SeatService
add_permissions()
def test_has_account(self):
member = User.objects.get(username=self.member)
blue = User.objects.get(username=self.blue)
none_user = User.objects.get(username=self.none_user)
self.assertTrue(SeatTasks.has_account(member))
self.assertTrue(SeatTasks.has_account(blue))
self.assertFalse(SeatTasks.has_account(none_user))
def test_service_enabled(self):
service = self.service()
member = User.objects.get(username=self.member)
blue = User.objects.get(username=self.blue)
none_user = User.objects.get(username=self.none_user)
self.assertTrue(service.service_active_for_user(member))
self.assertTrue(service.service_active_for_user(blue))
self.assertFalse(service.service_active_for_user(none_user))
@mock.patch(MODULE_PATH + '.tasks.SeatManager')
def test_update_all_groups(self, manager):
service = self.service()
service.update_all_groups()
# Check member and blue user have groups updated
self.assertTrue(manager.update_roles.called)
self.assertEqual(manager.update_roles.call_count, 2)
def test_update_groups(self):
# Check member has Member group updated
with mock.patch(MODULE_PATH + '.tasks.SeatManager') as manager:
service = self.service()
member = User.objects.get(username=self.member)
service.update_groups(member)
self.assertTrue(manager.update_roles.called)
args, kwargs = manager.update_roles.call_args
user_id, groups = args
self.assertIn(settings.DEFAULT_AUTH_GROUP, groups)
self.assertEqual(user_id, member.seat.username)
# Check none user does not have groups updated
with mock.patch(MODULE_PATH + '.tasks.SeatManager') as manager:
service = self.service()
none_user = User.objects.get(username=self.none_user)
service.update_groups(none_user)
self.assertFalse(manager.update_roles.called)
@mock.patch(MODULE_PATH + '.tasks.SeatManager')
def test_validate_user(self, manager):
service = self.service()
# Test member is not deleted
member = User.objects.get(username=self.member)
# Pre assertion
self.assertTrue(member.has_perm('seat.access_seat'))
service.validate_user(member)
self.assertTrue(User.objects.get(username=self.member).seat)
# Test none user is deleted
none_user = User.objects.get(username=self.none_user)
manager.disable_user.return_value = 'abc123'
SeatUser.objects.create(user=none_user, username='abc123')
service.validate_user(none_user)
self.assertTrue(manager.disable_user.called)
with self.assertRaises(ObjectDoesNotExist):
none_seat = User.objects.get(username=self.none_user).seat
@mock.patch(MODULE_PATH + '.tasks.SeatManager')
def test_delete_user(self, manager):
member = User.objects.get(username=self.member)
service = self.service()
result = service.delete_user(member)
self.assertTrue(result)
self.assertTrue(manager.disable_user.called)
with self.assertRaises(ObjectDoesNotExist):
seat_user = User.objects.get(username=self.member).seat
def test_render_services_ctrl(self):
service = self.service()
member = User.objects.get(username=self.member)
request = RequestFactory().get('/en/services/')
request.user = member
response = service.render_services_ctrl(request)
self.assertTemplateUsed(service.service_ctrl_template)
self.assertIn(urls.reverse('auth_deactivate_seat'), response)
self.assertIn(urls.reverse('auth_reset_seat_password'), response)
self.assertIn(urls.reverse('auth_set_seat_password'), response)
# Test register becomes available
member.seat.delete()
member = User.objects.get(username=self.member)
request.user = member
response = service.render_services_ctrl(request)
self.assertIn(urls.reverse('auth_activate_seat'), response)
class SeatViewsTestCase(TestCase):
def setUp(self):
self.member = AuthUtils.create_member('auth_member')
self.member.set_password('password')
self.member.email = 'auth_member@example.com'
self.member.save()
AuthUtils.add_main_character(self.member, 'auth_member', '12345', corp_id='111', corp_name='Test Corporation')
add_permissions()
def login(self):
self.client.login(username=self.member.username, password='password')
@mock.patch(MODULE_PATH + '.tasks.SeatManager')
@mock.patch(MODULE_PATH + '.views.SeatManager')
def test_activate(self, manager, tasks_manager):
self.login()
expected_username = 'auth_member'
manager.check_user_status.return_value = {}
manager.add_user.return_value = (expected_username, 'abc123')
response = self.client.get(urls.reverse('auth_activate_seat'))
self.assertTrue(manager.add_user.called)
self.assertTrue(tasks_manager.update_roles.called)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed('registered/service_credentials.html')
self.assertContains(response, expected_username)
seat_user = SeatUser.objects.get(user=self.member)
self.assertEqual(seat_user.username, expected_username)
self.assertTrue(manager.synchronize_eveapis.called)
@mock.patch(MODULE_PATH + '.tasks.SeatManager')
def test_deactivate(self, manager):
self.login()
SeatUser.objects.create(user=self.member, username='some member')
response = self.client.get(urls.reverse('auth_deactivate_seat'))
self.assertTrue(manager.disable_user.called)
self.assertRedirects(response, expected_url=urls.reverse('auth_services'), target_status_code=200)
with self.assertRaises(ObjectDoesNotExist):
seat_user = User.objects.get(pk=self.member.pk).seat
@mock.patch(MODULE_PATH + '.views.SeatManager')
def test_set_password(self, manager):
self.login()
SeatUser.objects.create(user=self.member, username='some member')
response = self.client.post(urls.reverse('auth_set_seat_password'), data={'password': '1234asdf'})
self.assertTrue(manager.update_user_password.called)
args, kwargs = manager.update_user_password.call_args
self.assertEqual(kwargs['plain_password'], '1234asdf')
self.assertRedirects(response, expected_url=urls.reverse('auth_services'), target_status_code=200)
@mock.patch(MODULE_PATH + '.views.SeatManager')
def test_reset_password(self, manager):
self.login()
SeatUser.objects.create(user=self.member, username='some member')
manager.update_user_password.return_value = 'hunter2'
response = self.client.get(urls.reverse('auth_reset_seat_password'))
self.assertTemplateUsed(response, 'registered/service_credentials.html')
self.assertContains(response, 'some member')
self.assertContains(response, 'hunter2')
class SeatManagerTestCase(TestCase):
def setUp(self):
from .manager import SeatManager
self.manager = SeatManager
def test_generate_random_password(self):
password = self.manager._SeatManager__generate_random_pass()
self.assertEqual(len(password), 16)
self.assertIsInstance(password, type(''))

View File

@ -0,0 +1,16 @@
from __future__ import unicode_literals
from django.conf.urls import url, include
from . import views
module_urls = [
# SeAT Service Control
url(r'^activate/$', views.activate_seat, name='auth_activate_seat'),
url(r'^deactivate/$', views.deactivate_seat, name='auth_deactivate_seat'),
url(r'^reset_password/$', views.reset_seat_password, name='auth_reset_seat_password'),
url(r'^set_password/$', views.set_seat_password, name='auth_set_seat_password'),
]
urlpatterns = [
url(r'^seat/', include(module_urls)),
]

View File

@ -0,0 +1,112 @@
from __future__ import unicode_literals
from django.shortcuts import render, redirect
from django.contrib.auth.decorators import login_required, permission_required
from .manager import SeatManager
from eveonline.managers import EveManager
from .tasks import SeatTasks
from .models import SeatUser
from services.forms import ServicePasswordForm
import logging
logger = logging.getLogger(__name__)
ACCESS_PERM = 'seat.access_seat'
@login_required
@permission_required(ACCESS_PERM)
def activate_seat(request):
logger.debug("activate_seat called by user %s" % request.user)
# Valid now we get the main characters
character = EveManager.get_main_character(request.user)
logger.debug("Checking SeAT for inactive users with the same username")
stat = SeatManager.check_user_status(character.character_name)
if stat == {}:
logger.debug("User not found, adding SeAT user for user %s with main character %s" % (request.user, character))
result = SeatManager.add_user(character.character_name, request.user.email)
else:
logger.debug("User found, resetting password")
username = SeatManager.enable_user(stat["name"])
password = SeatManager.update_user_password(username, request.user.email)
result = [username, password]
# if empty we failed
if result[0] and result[1]:
SeatUser.objects.update_or_create(user=request.user, defaults={'username': result[0]})
logger.debug("Updated SeatUser for user %s with SeAT credentials. Adding eve-apis..." % request.user)
SeatTasks.update_roles.delay(request.user.pk)
logger.info("Successfully activated SeAT for user %s" % request.user)
SeatManager.synchronize_eveapis(request.user)
credentials = {
'username': request.user.seat.username,
'password': result[1],
}
return render(request, 'registered/service_credentials.html',
context={'credentials': credentials, 'service': 'SeAT'})
logger.error("Unsuccessful attempt to activate seat for user %s" % request.user)
return redirect("auth_services")
@login_required
@permission_required(ACCESS_PERM)
def deactivate_seat(request):
logger.debug("deactivate_seat called by user %s" % request.user)
# false we failed
if SeatTasks.delete_user(request.user):
logger.info("Successfully deactivated SeAT for user %s" % request.user)
return redirect("auth_services")
else:
logging.error("User does not have a SeAT account")
logger.error("Unsuccessful attempt to activate SeAT for user %s" % request.user)
return redirect("auth_services")
@login_required
@permission_required(ACCESS_PERM)
def reset_seat_password(request):
logger.debug("reset_seat_password called by user %s" % request.user)
if SeatTasks.has_account(request.user):
result = SeatManager.update_user_password(request.user.seat.username, request.user.email)
# false we failed
if result:
credentials = {
'username': request.user.seat.username,
'password': result,
}
logger.info("Succesfully reset SeAT password for user %s" % request.user)
return render(request, 'registered/service_credentials.html',
context={'credentials': credentials, 'service': 'SeAT'})
logger.error("Unsuccessful attempt to reset SeAT password for user %s" % request.user)
return redirect("auth_services")
@login_required
@permission_required(ACCESS_PERM)
def set_seat_password(request):
logger.debug("set_seat_password called by user %s" % request.user)
if request.method == 'POST':
logger.debug("Received POST request with form.")
form = ServicePasswordForm(request.POST)
logger.debug("Form is valid: %s" % form.is_valid())
if form.is_valid() and SeatTasks.has_account(request.user):
password = form.cleaned_data['password']
logger.debug("Form contains password of length %s" % len(password))
result = SeatManager.update_user_password(request.user.seat.username,
request.user.email,
plain_password=password)
if result:
logger.info("Succesfully reset SeAT password for user %s" % request.user)
return redirect("auth_services")
else:
logger.error("Failed to install custom SeAT password for user %s" % request.user)
else:
logger.error("Invalid SeAT password provided")
else:
logger.debug("Request is not type POST - providing empty form.")
form = ServicePasswordForm()
logger.debug("Rendering form for user %s" % request.user)
context = {'form': form, 'service': 'SeAT'}
return render(request, 'registered/service_password.html', context)