mirror of
https://gitlab.com/allianceauth/allianceauth.git
synced 2026-02-09 00:26:20 +01:00
Added Teamspeak3 support
This commit is contained in:
221
services/managers/teamspeak3_manager.py
Executable file
221
services/managers/teamspeak3_manager.py
Executable file
@@ -0,0 +1,221 @@
|
||||
from django.conf import settings
|
||||
|
||||
from services.managers.util.ts3 import TS3Server
|
||||
|
||||
|
||||
class Teamspeak3Manager:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __get_created_server():
|
||||
server = TS3Server(settings.TEAMSPEAK3_SERVER_IP, settings.TEAMSPEAK3_SERVER_PORT)
|
||||
server.login(settings.TEAMSPEAK3_SERVERQUERY_USER, settings.TEAMSPEAK3_SERVERQUERY_PASSWORD)
|
||||
server.use(settings.TEAMSPEAK3_VIRTUAL_SERVER)
|
||||
return server
|
||||
|
||||
@staticmethod
|
||||
def __santatize_username(username):
|
||||
sanatized = username.replace(" ", "_")
|
||||
sanatized = sanatized.replace("'", "")
|
||||
return sanatized.lower()
|
||||
|
||||
@staticmethod
|
||||
def __generate_username(username, corp_ticker):
|
||||
return "[" + corp_ticker + "]" + username
|
||||
|
||||
@staticmethod
|
||||
def __generate_username_blue(username, corp_ticker):
|
||||
return "[BLUE][" + corp_ticker + "]" + username
|
||||
|
||||
@staticmethod
|
||||
def _get_userid(uid):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
ret = server.send_command('customsearch', {'ident': 'sso_uid', 'pattern': uid})
|
||||
if ret and 'keys' in ret and 'cldbid' in ret['keys']:
|
||||
return ret['keys']['cldbid']
|
||||
|
||||
@staticmethod
|
||||
def _group_id_by_name(groupname):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
group_cache = server.send_command('servergrouplist')
|
||||
for group in group_cache:
|
||||
if group['keys']['name'] == groupname:
|
||||
return group['keys']['sgid']
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _create_group(groupname):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
sgid = Teamspeak3Manager._group_id_by_name(groupname)
|
||||
if not sgid:
|
||||
ret = server.send_command('servergroupadd', {'name': groupname})
|
||||
Teamspeak3Manager.__group_cache = None
|
||||
sgid = ret['keys']['sgid']
|
||||
server.send_command('servergroupaddperm',
|
||||
{'sgid': sgid, 'permsid': 'i_group_needed_modify_power', 'permvalue': 75,
|
||||
'permnegated': 0, 'permskip': 0})
|
||||
server.send_command('servergroupaddperm',
|
||||
{'sgid': sgid, 'permsid': 'i_group_needed_member_add_power', 'permvalue': 100,
|
||||
'permnegated': 0, 'permskip': 0})
|
||||
server.send_command('servergroupaddperm',
|
||||
{'sgid': sgid, 'permsid': 'i_group_needed_member_remove_power', 'permvalue': 100,
|
||||
'permnegated': 0, 'permskip': 0})
|
||||
return sgid
|
||||
|
||||
@staticmethod
|
||||
def _user_group_list(cldbid):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
groups = server.send_command('servergroupsbyclientid', {'cldbid': cldbid})
|
||||
outlist = {}
|
||||
|
||||
if type(groups) == list:
|
||||
for group in groups:
|
||||
outlist[group['keys']['name']] = group['keys']['sgid']
|
||||
elif type(groups) == dict:
|
||||
outlist[groups['keys']['name']] = groups['keys']['sgid']
|
||||
|
||||
return outlist
|
||||
|
||||
@staticmethod
|
||||
def _group_list():
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
group_cache = server.send_command('servergrouplist')
|
||||
outlist = {}
|
||||
for group in group_cache:
|
||||
outlist[group['keys']['name']] = group['keys']['sgid']
|
||||
return outlist
|
||||
|
||||
@staticmethod
|
||||
def _add_user_to_group(uid, groupname):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
server_groups = Teamspeak3Manager._group_list()
|
||||
user_groups = Teamspeak3Manager._user_group_list(uid)
|
||||
|
||||
if not groupname in server_groups:
|
||||
Teamspeak3Manager._create_group(groupname)
|
||||
if not groupname in user_groups:
|
||||
server.send_command('servergroupaddclient',
|
||||
{'sgid': Teamspeak3Manager._group_id_by_name(groupname), 'cldbid': uid})
|
||||
|
||||
@staticmethod
|
||||
def _remove_user_from_group(uid, groupname):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
server_groups = Teamspeak3Manager._group_list()
|
||||
user_groups = Teamspeak3Manager._user_group_list(uid)
|
||||
|
||||
if groupname in server_groups:
|
||||
Teamspeak3Manager._create_group(groupname)
|
||||
if groupname in user_groups:
|
||||
server.send_command('servergroupdelclient',
|
||||
{'sgid': Teamspeak3Manager._group_id_by_name(groupname), 'cldbid': uid})
|
||||
|
||||
@staticmethod
|
||||
def add_user(username, corp_ticker):
|
||||
username_clean = Teamspeak3Manager.__generate_username(Teamspeak3Manager.__santatize_username(username),
|
||||
corp_ticker)
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
token = ""
|
||||
|
||||
server_groups = Teamspeak3Manager._group_list()
|
||||
if not settings.DEFAULT_ALLIANCE_GROUP in server_groups:
|
||||
Teamspeak3Manager._create_group(settings.DEFAULT_ALLIANCE_GROUP)
|
||||
|
||||
alliange_group_id = Teamspeak3Manager._group_id_by_name(settings.DEFAULT_ALLIANCE_GROUP)
|
||||
|
||||
ret = server.send_command('tokenadd', {'tokentype': 0, 'tokenid1': alliange_group_id, 'tokenid2': 0,
|
||||
'tokendescription': username_clean,
|
||||
'tokencustomset': "ident=sso_uid value=%s" % username_clean})
|
||||
|
||||
try:
|
||||
if 'keys' in ret:
|
||||
if 'token' in ret['keys']:
|
||||
token = ret['keys']['token']
|
||||
except:
|
||||
pass
|
||||
|
||||
return username_clean, token
|
||||
|
||||
@staticmethod
|
||||
def add_blue_user(username, corp_ticker):
|
||||
username_clean = Teamspeak3Manager.__generate_username_blue(Teamspeak3Manager.__santatize_username(username),
|
||||
corp_ticker)
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
token = ""
|
||||
|
||||
server_groups = Teamspeak3Manager._group_list()
|
||||
if not settings.DEFAULT_BLUE_GROUP in server_groups:
|
||||
Teamspeak3Manager._create_group(settings.DEFAULT_BLUE_GROUP)
|
||||
|
||||
blue_group_id = Teamspeak3Manager._group_id_by_name(settings.DEFAULT_BLUE_GROUP)
|
||||
|
||||
ret = server.send_command('tokenadd', {'tokentype': 0, 'tokenid1': blue_group_id, 'tokenid2': 0,
|
||||
'tokendescription': username_clean,
|
||||
'tokencustomset': "ident=sso_uid value=%s" % username_clean})
|
||||
|
||||
try:
|
||||
if 'keys' in ret:
|
||||
if 'token' in ret['keys']:
|
||||
token = ret['keys']['token']
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
return username_clean, token
|
||||
|
||||
@staticmethod
|
||||
def delete_user(uid):
|
||||
server = Teamspeak3Manager.__get_created_server()
|
||||
user = Teamspeak3Manager._get_userid(uid)
|
||||
if user:
|
||||
for client in server.send_command('clientlist'):
|
||||
if client['keys']['client_database_id'] == user:
|
||||
server.send_command('clientkick', {'clid': client['keys']['clid'], 'reasonid': 5,
|
||||
'reasonmsg': 'Auth service deleted'})
|
||||
|
||||
ret = server.send_command('clientdbdelete', {'cldbid': user})
|
||||
if ret == '0':
|
||||
return True
|
||||
else:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def check_user_exists(uid):
|
||||
if Teamspeak3Manager._get_userid(uid):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def generate_new_permissionkey(uid, username, corpticker):
|
||||
Teamspeak3Manager.delete_user(uid)
|
||||
return Teamspeak3Manager.add_user(username, corpticker)
|
||||
|
||||
@staticmethod
|
||||
def generate_new_blue_permissionkey(uid, username, corpticker):
|
||||
Teamspeak3Manager.delete_user(uid)
|
||||
return Teamspeak3Manager.add_blue_user(username, corpticker)
|
||||
|
||||
@staticmethod
|
||||
def update_groups(uid, groups):
|
||||
userid = Teamspeak3Manager._get_userid(uid)
|
||||
if userid:
|
||||
server_groups = Teamspeak3Manager._group_list()
|
||||
user_groups = set(Teamspeak3Manager._user_group_list(userid))
|
||||
act_groups = set([g.replace(' ', '-') for g in groups])
|
||||
addgroups = act_groups - user_groups
|
||||
remgroups = user_groups - act_groups
|
||||
|
||||
print userid
|
||||
print addgroups
|
||||
print remgroups
|
||||
|
||||
for g in addgroups:
|
||||
if not g in server_groups.keys():
|
||||
Teamspeak3Manager._create_group(g)
|
||||
Teamspeak3Manager._add_user_to_group(userid, g)
|
||||
|
||||
for g in remgroups:
|
||||
Teamspeak3Manager._remove_user_from_group(userid, g)
|
||||
|
||||
1
services/managers/util/__init__.py
Executable file
1
services/managers/util/__init__.py
Executable file
@@ -0,0 +1 @@
|
||||
__author__ = 'r4stl1n'
|
||||
244
services/managers/util/ts3.py
Executable file
244
services/managers/util/ts3.py
Executable file
@@ -0,0 +1,244 @@
|
||||
import socket
|
||||
import logging
|
||||
|
||||
|
||||
class ConnectionError():
|
||||
def __init__(self, ip, port):
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
|
||||
def __str__(self):
|
||||
return 'Error connecting to host %s port %s' % (self.ip, self.port)
|
||||
|
||||
|
||||
ts3_escape = {'/': r"\/",
|
||||
' ': r'\s',
|
||||
'|': r'\p',
|
||||
"\a": r'\a',
|
||||
"\b": r'\b',
|
||||
"\f": r'\f',
|
||||
"\n": r'\n',
|
||||
"\r": r'\r',
|
||||
"\t": r'\t',
|
||||
"\v": r'\v'}
|
||||
|
||||
|
||||
class TS3Proto():
|
||||
bytesin = 0
|
||||
bytesout = 0
|
||||
|
||||
_connected = False
|
||||
|
||||
def __init__(self):
|
||||
self._log = logging.getLogger('%s.%s' % (__name__, self.__class__.__name__))
|
||||
pass
|
||||
|
||||
def connect(self, ip, port):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
try:
|
||||
s.connect((ip, port))
|
||||
except:
|
||||
# raise ConnectionError(ip, port)
|
||||
raise
|
||||
else:
|
||||
self._sock = s
|
||||
self._sockfile = s.makefile('r', 0)
|
||||
|
||||
data = self._sockfile.readline()
|
||||
if data.strip() == "TS3":
|
||||
self._sockfile.readline()
|
||||
self._connected = True
|
||||
return True
|
||||
|
||||
def disconnect(self):
|
||||
self.send_command("quit")
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
self._connected = False
|
||||
self._log.info('Disconnected')
|
||||
|
||||
def send_command(self, command, keys=None, opts=None):
|
||||
cmd = self.construct_command(command, keys=keys, opts=opts)
|
||||
self.send('%s\n' % cmd)
|
||||
|
||||
data = []
|
||||
|
||||
while True:
|
||||
resp = self._sockfile.readline()
|
||||
resp = self.parse_command(resp)
|
||||
if not 'command' in resp:
|
||||
data.append(resp)
|
||||
else:
|
||||
break
|
||||
|
||||
if resp['command'] == 'error':
|
||||
if data and resp['keys']['id'] == '0':
|
||||
if len(data) > 1:
|
||||
return data
|
||||
else:
|
||||
return data[0]
|
||||
else:
|
||||
return resp['keys']['id']
|
||||
|
||||
def construct_command(self, command, keys=None, opts=None):
|
||||
"""
|
||||
Constructs a TS3 formatted command string
|
||||
Keys can have a single nested list to construct a nested parameter
|
||||
@param command: Command list
|
||||
@type command: string
|
||||
@param keys: Key/Value pairs
|
||||
@type keys: dict
|
||||
@param opts: Options
|
||||
@type opts: list
|
||||
"""
|
||||
|
||||
cstr = [command]
|
||||
|
||||
# Add the keys and values, escape as needed
|
||||
if keys:
|
||||
for key in keys:
|
||||
if isinstance(keys[key], list):
|
||||
ncstr = []
|
||||
for nest in keys[key]:
|
||||
ncstr.append("%s=%s" % (key, self._escape_str(nest)))
|
||||
cstr.append("|".join(ncstr))
|
||||
else:
|
||||
cstr.append("%s=%s" % (key, self._escape_str(keys[key])))
|
||||
|
||||
# Add in options
|
||||
if opts:
|
||||
for opt in opts:
|
||||
cstr.append("-%s" % opt)
|
||||
|
||||
return " ".join(cstr)
|
||||
|
||||
def parse_command(self, commandstr):
|
||||
"""
|
||||
Parses a TS3 command string into command/keys/opts tuple
|
||||
@param commandstr: Command string
|
||||
@type commandstr: string
|
||||
"""
|
||||
|
||||
if len(commandstr.split('|')) > 1:
|
||||
vals = []
|
||||
for cmd in commandstr.split('|'):
|
||||
vals.append(self.parse_command(cmd))
|
||||
return vals
|
||||
|
||||
cmdlist = commandstr.strip().split(' ')
|
||||
command = None
|
||||
keys = {}
|
||||
opts = []
|
||||
|
||||
for key in cmdlist:
|
||||
v = key.strip().split('=')
|
||||
if len(v) > 1:
|
||||
# Key
|
||||
if len > 2:
|
||||
# Fix the stupidities in TS3 escaping
|
||||
v = [v[0], '='.join(v[1:])]
|
||||
key, value = v
|
||||
keys[key] = self._unescape_str(value)
|
||||
elif v[0][0] == '-':
|
||||
# Option
|
||||
opts.append(v[0][1:])
|
||||
else:
|
||||
command = v[0]
|
||||
|
||||
d = {'keys': keys, 'opts': opts}
|
||||
if command:
|
||||
d['command'] = command
|
||||
return d
|
||||
|
||||
@staticmethod
|
||||
def _escape_str(value):
|
||||
"""
|
||||
Escape a value into a TS3 compatible string
|
||||
@param value: Value
|
||||
@type value: string/int
|
||||
"""
|
||||
|
||||
if isinstance(value, int): return "%d" % value
|
||||
value = value.replace("\\", r'\\')
|
||||
for i, j in ts3_escape.iteritems():
|
||||
value = value.replace(i, j)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _unescape_str(value):
|
||||
"""
|
||||
Unescape a TS3 compatible string into a normal string
|
||||
@param value: Value
|
||||
@type value: string/int
|
||||
"""
|
||||
|
||||
if isinstance(value, int): return "%d" % value
|
||||
value = value.replace(r"\\", "\\")
|
||||
for i, j in ts3_escape.iteritems():
|
||||
value = value.replace(j, i)
|
||||
return value
|
||||
|
||||
|
||||
def send(self, payload):
|
||||
if self._connected:
|
||||
self._log.debug('Sent: %s' % payload)
|
||||
self._sockfile.write(payload)
|
||||
|
||||
|
||||
class TS3Server(TS3Proto):
|
||||
def __init__(self, ip, port, id=0, sock=None):
|
||||
"""
|
||||
Abstraction class for TS3 Servers
|
||||
@param ip: IP Address
|
||||
@type ip: str
|
||||
@param port: Port Number
|
||||
@type port: int
|
||||
"""
|
||||
TS3Proto.__init__(self)
|
||||
|
||||
if not sock:
|
||||
if self.connect(ip, port) and id > 0:
|
||||
self.use(id)
|
||||
else:
|
||||
self._sock = sock
|
||||
self._sockfile = sock.makefile('r', 0)
|
||||
self._connected = True
|
||||
|
||||
def login(self, username, password):
|
||||
"""
|
||||
Login to the TS3 Server
|
||||
@param username: Username
|
||||
@type username: str
|
||||
@param password: Password
|
||||
@type password: str
|
||||
"""
|
||||
d = self.send_command('login', keys={'client_login_name': username, 'client_login_password': password})
|
||||
if d == 0:
|
||||
self._log.info('Login Successful')
|
||||
return True
|
||||
return False
|
||||
|
||||
def serverlist(self):
|
||||
"""
|
||||
Get a list of all Virtual Servers on the connected TS3 instance
|
||||
"""
|
||||
if self._connected:
|
||||
return self.send_command('serverlist')
|
||||
|
||||
def gm(self, msg):
|
||||
"""
|
||||
Send a global message to the current Virtual Server
|
||||
@param msg: Message
|
||||
@type ip: str
|
||||
"""
|
||||
if self._connected:
|
||||
return self.send_command('gm', keys={'msg': msg})
|
||||
|
||||
def use(self, id):
|
||||
"""
|
||||
Use a particular Virtual Server instance
|
||||
@param id: Virtual Server ID
|
||||
@type id: int
|
||||
"""
|
||||
if self._connected and id > 0:
|
||||
self.send_command('use', keys={'sid': id})
|
||||
@@ -3,23 +3,25 @@ from django.shortcuts import HttpResponseRedirect
|
||||
from django.shortcuts import render_to_response
|
||||
from django.contrib.auth.decorators import login_required
|
||||
from django.contrib.auth.decorators import permission_required
|
||||
from django.contrib.auth.decorators import user_passes_test
|
||||
|
||||
from managers.openfire_manager import OpenfireManager
|
||||
from managers.phpbb3_manager import Phpbb3Manager
|
||||
from managers.mumble_manager import MumbleManager
|
||||
from managers.ipboard_manager import IPBoardManager
|
||||
from managers.teamspeak3_manager import Teamspeak3Manager
|
||||
from authentication.managers import AuthServicesInfoManager
|
||||
from eveonline.managers import EveManager
|
||||
from celerytask.tasks import update_jabber_groups
|
||||
from celerytask.tasks import update_mumble_groups
|
||||
from celerytask.tasks import update_forum_groups
|
||||
from celerytask.tasks import update_ipboard_groups
|
||||
from celerytask.tasks import update_teamspeak3_groups
|
||||
from forms import JabberBroadcastForm
|
||||
from forms import FleetFormatterForm
|
||||
from django.contrib.auth.decorators import user_passes_test
|
||||
|
||||
from util import check_if_user_has_permission
|
||||
|
||||
|
||||
@login_required
|
||||
def fleet_formatter_view(request):
|
||||
if request.method == 'POST':
|
||||
@@ -150,7 +152,6 @@ def reset_ipboard_password(request):
|
||||
return HttpResponseRedirect("/dashboard")
|
||||
|
||||
|
||||
|
||||
@login_required
|
||||
@user_passes_test(service_blue_alliance_test)
|
||||
def activate_jabber(request):
|
||||
@@ -228,3 +229,55 @@ def reset_mumble_password(request):
|
||||
AuthServicesInfoManager.update_user_mumble_info(authinfo.mumble_username, result, request.user)
|
||||
return HttpResponseRedirect("/services/")
|
||||
return HttpResponseRedirect("/")
|
||||
|
||||
|
||||
@login_required
|
||||
@user_passes_test(service_blue_alliance_test)
|
||||
def activate_teamspeak3(request):
|
||||
authinfo = AuthServicesInfoManager.get_auth_service_info(request.user)
|
||||
character = EveManager.get_character_by_id(authinfo.main_char_id)
|
||||
if check_if_user_has_permission(request.user, "blue_member"):
|
||||
result = Teamspeak3Manager.add_blue_user(character.character_name, character.corporation_ticker)
|
||||
else:
|
||||
result = Teamspeak3Manager.add_user(character.character_name, character.corporation_ticker)
|
||||
|
||||
# if its empty we failed
|
||||
if result[0] is not "":
|
||||
AuthServicesInfoManager.update_user_teamspeak3_info(result[0], result[1], request.user)
|
||||
update_teamspeak3_groups(request.user)
|
||||
return HttpResponseRedirect("/services/")
|
||||
return HttpResponseRedirect("/dashboard")
|
||||
|
||||
|
||||
@login_required
|
||||
@user_passes_test(service_blue_alliance_test)
|
||||
def deactivate_teamspeak3(request):
|
||||
authinfo = AuthServicesInfoManager.get_auth_service_info(request.user)
|
||||
result = Teamspeak3Manager.delete_user(authinfo.teamspeak3_uid)
|
||||
# if false we failed
|
||||
if result:
|
||||
AuthServicesInfoManager.update_user_teamspeak3_info("", "", request.user)
|
||||
return HttpResponseRedirect("/services/")
|
||||
return HttpResponseRedirect("/")
|
||||
|
||||
|
||||
@login_required
|
||||
@user_passes_test(service_blue_alliance_test)
|
||||
def reset_teamspeak3_perm(request):
|
||||
authinfo = AuthServicesInfoManager.get_auth_service_info(request.user)
|
||||
character = EveManager.get_character_by_id(authinfo.main_char_id)
|
||||
|
||||
Teamspeak3Manager.delete_user(authinfo.teamspeak3_uid)
|
||||
|
||||
if check_if_user_has_permission(request.user, "blue_member"):
|
||||
result = Teamspeak3Manager.generate_new_blue_permissionkey(authinfo.teamspeak3_uid, character.character_name,
|
||||
character.corporation_ticker)
|
||||
else:
|
||||
result = Teamspeak3Manager.generate_new_permissionkey(authinfo.teamspeak3_uid, character.character_name,
|
||||
character.corporation_ticker)
|
||||
|
||||
# if blank we failed
|
||||
if result != "":
|
||||
AuthServicesInfoManager.update_user_teamspeak3_info(result[0], result[1], request.user)
|
||||
return HttpResponseRedirect("/services/")
|
||||
return HttpResponseRedirect("/")
|
||||
Reference in New Issue
Block a user