2024-12-30 13:28:36 +10:00

632 lines
25 KiB
Python

#!/usr/bin/env python
# Copyright (C) 2010 Stefan Hacker <dd0t@users.sourceforge.net>
# All rights reserved.
# Adapted by Adarnof for AllianceAuth
# Further modified by the Alliance Auth team and contributers
# Rewritten for Django Context by the Alliance Auth Team
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# - Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# - Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# - Neither the name of the Mumble Developers nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# `AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import importlib.util
import logging
import Ice
from urllib.request import urlopen
from threading import Timer
from passlib.hash import bcrypt_sha256
from hashlib import sha1
import django
from django.utils.datetime_safe import datetime
import os
import sys
logger = logging.getLogger(__name__)
sys.path.append(os.getcwd())
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myauth.settings.local")
os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"
django.setup()
from allianceauth import __version__ # noqa
from allianceauth.services.modules.mumble.models import MumbleServerServer, MumbleUser, TempUser # noqa
def main(server_id: int = 1) -> None:
"""_summary_
Args:
server_id (int, optional): _description_. Defaults to 1.
Raises:
e: _description_
Murmur.InvalidSecretException: _description_
e: _description_
Returns:
_type_: _description_
"""
slicedir = Ice.getSliceDir()
if not slicedir:
slicedir = ["-I/usr/share/Ice/slice", "-I/usr/share/slice"]
else:
slicedir = ['-I' + slicedir]
package_dir = next(iter(importlib.util.find_spec('allianceauth.services.modules.mumble').submodule_search_locations))
package_ice = os.path.join(package_dir, server_config_obj.slice)
logger.info(f"Using slice file: {package_ice}")
slicedir.append("-I" + os.path.dirname(package_ice))
Ice.loadSlice("", slicedir + [package_ice])
try:
import MumbleServer as Murmur # Mumble >=1.5.17
except ImportError:
import Murmur # Mumble <=1.5.17
server_config_obj = MumbleServerServer.objects.get(id=server_id)
class AllianceAuthAuthenticatorApp(Ice.Application):
def run(self, args) -> int:
self.shutdownOnInterrupt()
if not self.initializeIceConnection():
return 1
if server_config_obj.watchdog > 0:
self.failedWatch = True
self.checkConnection()
# Serve till we are stopped
self.communicator().waitForShutdown()
self.watchdog.cancel()
if self.interrupted():
logger.warning("Caught interrupt, shutting down")
return 0
def initializeIceConnection(self) -> bool:
"""
Establishes the two-way Ice connection and adds the authenticator to the
configured servers
"""
ice = self.communicator()
logger.debug("Using shared ice secret")
ice.getImplicitContext().put("secret", server_config_obj.secret)
logger.info(f"Connecting to Ice server ({server_config_obj.ip}:{server_config_obj.port})")
base = ice.stringToProxy(f"Meta:tcp -h {server_config_obj.ip} -p {server_config_obj.port}")
self.meta = Murmur.MetaPrx.uncheckedCast(base)
adapter = ice.createObjectAdapterWithEndpoints("Callback.Client", f"tcp -h {server_config_obj.endpoint}")
adapter.activate()
metacbprx = adapter.addWithUUID(metaCallback(self))
self.metacb = Murmur.MetaCallbackPrx.uncheckedCast(metacbprx)
servercbprx = adapter.addWithUUID(serverCallback(self))
self.servercb = Murmur.ServerCallbackPrx.uncheckedCast(servercbprx)
authprx = adapter.addWithUUID(AllianceAuthAuthenticator())
self.auth = Murmur.ServerUpdatingAuthenticatorPrx.uncheckedCast(authprx)
return self.attachCallbacks()
def attachCallbacks(self, quiet=False) -> bool:
"""
Attaches all callbacks for meta and authenticators
"""
# Ice.ConnectionRefusedException
# logger.debug('Attaching callbacks')
try:
if not quiet:
logger.info("Attaching meta callback")
self.meta.addCallback(self.metacb)
for server in self.meta.getBootedServers():
if server.id() in server_config_obj.virtual_servers_list():
logger.info("Setting authenticator for virtual server %d", server.id())
server.setAuthenticator(self.auth)
server.addCallback(self.servercb)
if server_config_obj.idler_handler:
idler_handler(server)
except (Murmur.InvalidSecretException, Ice.UnknownUserException, Ice.ConnectionRefusedException) as e:
logger.exception(e)
if isinstance(e, Ice.ConnectionRefusedException):
logger.error("Server refused connection")
elif (isinstance(e, Murmur.InvalidSecretException) or isinstance(e, Ice.UnknownUserException) and (e.unknown == "Murmur::InvalidSecretException")):
logger.error("Invalid ice secret")
else:
# We do not actually want to handle this one, re-raise it
raise e
self.connected = False
return False
self.connected = True
return True
def checkConnection(self) -> None:
"""
Tries reapplies all callbacks to make sure the authenticator
survives server restarts and disconnects.
"""
try:
if not self.attachCallbacks(quiet=not self.failedWatch):
self.failedWatch = True
else:
self.failedWatch = False
except Ice.Exception as e:
logger.error(f"Failed connection check, will retry in next watchdog run ({server_config_obj.watchdog}s)")
logger.exception(e)
self.failedWatch = True
# Renew the timer
self.watchdog = Timer(server_config_obj.watchdog, self.checkConnection)
self.watchdog.start()
def checkSecret(func):
"""
Decorator that checks whether the server transmitted the right secret
if a secret is supposed to be used.
"""
if not server_config_obj.secret:
return func
def newfunc(*args, **kws):
if "current" in kws:
current = kws["current"]
else:
current = args[-1]
if not current or "secret" not in current.ctx or current.ctx["secret"] != server_config_obj.secret:
logger.error("Server transmitted invalid secret. Possible injection attempt.")
raise Murmur.InvalidSecretException()
return func(*args, **kws)
return newfunc
def fortifyIceFu(retval=None, exceptions=(Ice.Exception,)):
"""
Decorator that catches exceptions,logs them and returns a safe retval
value. This helps preventing the authenticator getting stuck in
critical code paths. Only exceptions that are instances of classes
given in the exceptions list are not caught.
The default is to catch all non-Ice exceptions.
"""
def newdec(func):
def newfunc(*args, **kws):
try:
return func(*args, **kws)
except Exception as e:
catch = True
for ex in exceptions:
if isinstance(e, ex):
catch = False
break
if catch:
logger.critical("Unexpected exception caught")
logger.exception(e)
return retval
raise
return newfunc
return newdec
class metaCallback(Murmur.MetaCallback):
def __init__(self, app) -> None:
Murmur.MetaCallback.__init__(self)
self.app = app
@fortifyIceFu()
@checkSecret
def started(self, server, current=None) -> None:
"""
This function is called when a virtual server is started
and makes sure an authenticator gets attached if needed.
"""
if server.id() in server_config_obj.virtual_servers_list():
logger.info("Setting authenticator for virtual server %d", server.id())
try:
server.setAuthenticator(self.app.auth)
# Apparently this server was restarted without us noticing
except (Murmur.InvalidSecretException, Ice.UnknownUserException) as e:
if (hasattr(e, "unknown") and e.unknown != "Murmur::InvalidSecretException"):
# Special handling for Murmur 1.2.2 servers with invalid slice files
raise e
logger.error("Invalid ice secret")
return
else:
logger.debug("Virtual server %d got started", server.id())
@fortifyIceFu()
@checkSecret
def stopped(self, server, current=None) -> None:
"""
This function is called when a virtual server is stopped
"""
if self.app.connected:
# Only try to output the server id if we think we are still connected to prevent
# flooding of our thread pool
try:
if server.id() in server_config_obj.virtual_servers_list():
logger.info(f"Authenticated virtual server {server.id()} got stopped")
else:
logger.debug(f"Virtual server {server.id()} got stopped")
return
except Ice.ConnectionRefusedException:
self.app.connected = False
logger.debug("Server shutdown stopped a virtual server")
if server_config_obj.reject_on_error: # Python 2.4 compat
authenticateFortifyResult = (-1, None, None)
else:
authenticateFortifyResult = (-2, None, None)
class serverCallback(Murmur.ServerCallback):
def __init__(self, app) -> None:
Murmur.ServerCallback.__init__(self)
self.app = app
def userConnected(self, user, current=None) -> None:
try:
mumble_user = MumbleUser.objects.get(username=user)
mumble_user.release = user.release
mumble_user.version = user.version
mumble_user.last_connect = datetime.now()
mumble_user.save()
except MumbleUser.DoesNotExist as a:
try:
mumble_user = TempUser.objects.get(username=user)
mumble_user.release = user.release
mumble_user.version = user.version
mumble_user.last_connect = datetime.now()
mumble_user.save()
except Exception as b:
logger.exception(a)
logger.exception(b)
def userDisconnected(self, user, current=None) -> None:
try:
mumble_user = MumbleUser.objects.get(username=user)
mumble_user.last_disconnect = datetime.now()
mumble_user.save()
except MumbleUser.DoesNotExist as a:
try:
mumble_user = TempUser.objects.get(username=user)
mumble_user.last_disconnect = datetime.now()
mumble_user.save()
except Exception as b:
logger.exception(a)
logger.exception(b)
def userStateChanged(self, user, current=None) -> None:
pass
def userTextMessage(self, user, text_message=None) -> None:
if text_message.text == "!kicktemps":
if self.server.hasPermission(user.session, 0, 0x10000):
self.server.sendMessage(user.session, "Kicking all templink clients!")
users = self.server.getUsers()
for (userid, auser) in users.items():
if auser.userid > (server_config_obj.offset * 2):
self.server.kickUser(auser.session, "Kicking all temp users! :-)")
self.server.sendMessage(user.session, "All templink clients kicked!")
else:
self.server.sendMessage(user.session, "You do not have kick permissions!")
def channelCreated(self, channel, current=None) -> None:
pass
def channelRemoved(self, channel, current=None) -> None:
pass
def channelStateChanged(self, channel, current=None) -> None:
pass
class AllianceAuthAuthenticator(Murmur.ServerUpdatingAuthenticator):
texture_cache = {}
def __init__(self):
Murmur.ServerUpdatingAuthenticator.__init__(self)
@fortifyIceFu(authenticateFortifyResult)
@checkSecret
def authenticate(self, name, pw, certlist, certhash, strong, current=None) -> tuple[int, str | None, str | None]:
"""
This function is called to authenticate a user
"""
FALL_THROUGH = -2
AUTH_REFUSED = -1
if name == "SuperUser":
logger.debug("Forced fall through for SuperUser")
return (FALL_THROUGH, None, None)
try:
mumble_user = MumbleUser.objects.get(username=name)
except MumbleUser.DoesNotExist:
try:
mumble_user = TempUser.objects.get(username=name)
except TempUser.DoesNotExist:
return (-2, None, None) # No Standard or Temp User
logger.debug("checking password with hash function: %s" % mumble_user.hashfn)
if allianceauth_check_hash(pw, mumble_user.pwhash, mumble_user.hashfn):
logger.info(f'User authenticated: {mumble_user.display_name} {mumble_user.user_id + server_config_obj.offset}')
logger.debug("Group memberships: %s", mumble_user.group_string())
return (mumble_user.user_id + server_config_obj.offset, mumble_user.display_name, mumble_user.group_string())
logger.info(
f'Failed authentication attempt for user: {name} {mumble_user.user_id + server_config_obj.offset}')
return (AUTH_REFUSED, None, None)
@fortifyIceFu((False, None))
@checkSecret
def getInfo(self, id, current=None) -> tuple[bool, None]:
"""
Gets called to fetch user specific information
"""
# We do not expose any additional information so always fall through
logger.debug("getInfo for %d -> denied", id)
return (False, None)
@fortifyIceFu(-2)
@checkSecret
def nameToId(self, name, current=None) -> int:
"""
Gets called to get the id for a given username
"""
if name == "SuperUser":
logger.debug("nameToId SuperUser -> forced fall through")
return -2 # FALL_THROUGH
try:
return (MumbleUser.objects.get(username=name).pk + server_config_obj.offset)
except MumbleUser.DoesNotExist:
try:
return (TempUser.objects.get(username=name).pk + server_config_obj.offset * 2)
except TempUser.DoesNotExist:
return -2 # FALL_THROUGH
@fortifyIceFu("")
@checkSecret
def idToName(self, id, current=None) -> str:
"""
Gets called to get the username for a given id
"""
if id < server_config_obj.offset:
return "" # FALL_THROUGH
try:
mumble_user = MumbleUser.objects.get(user_id=id - server_config_obj.offset)
mumble_user.username
except MumbleUser.DoesNotExist:
try:
mumble_user = TempUser.objects.get(user_id=id - server_config_obj.offset * 2)
mumble_user.username
except TempUser.DoesNotExist:
return "" # FALL_THROUGH
# I dont quite rightly know why we have this
# SuperUser shouldnt be in our Authenticator?
# But Maybe it can be?
if MumbleUser.objects.get(user_id=id - server_config_obj.offset).username == "SuperUser":
logger.debug('idToName %d -> "SuperUser" caught')
return "" # FALL_THROUGH
else:
return mumble_user.username
@fortifyIceFu("")
@checkSecret
def idToTexture(self, id, current=None):
"""
Gets called to get the corresponding texture for a user
"""
if server_config_obj.avatar_enable is False:
logger.debug(f"idToTexture {id} -> avatar display disabled, fall through")
return "" # FALL_THROUGH
if id < server_config_obj.offset:
return "" # FALL_THROUGH
try:
avatar_url = MumbleUser.objects.get(user_id=id - server_config_obj.offset).user.profile.main_character.portrait_url()
except MumbleUser.DoesNotExist:
logger.debug(f"idToTexture {id} -> MumbleUser.DoesNotExist, Fall Through")
return "" # FALL_THROUGH
if avatar_url:
if avatar_url in self.texture_cache:
logger.debug('idToTexture {id} -> cached avatar returned: {avatar_url}')
return self.texture_cache[avatar_url]
# Not cached? Try to retrieve from CCP image server.
try:
logger.debug('idToTexture %d -> try file "%s"', id, avatar_url)
handle = urlopen(avatar_url)
except (OSError, Exception) as e:
logger.exception(e)
logger.debug(f'idToTexture {id} -> image download for {avatar_url} failed, fall through')
return "" # FALL_THROUGH
else:
file = handle.read()
handle.close()
# Cache resulting avatar by file address and return image.
self.texture_cache[avatar_url] = file
logger.debug(f'idToTexture {id} -> avatar from {avatar_url} retrieved and returned')
return self.texture_cache[avatar_url]
else:
logger.debug(f"idToTexture {id} -> empty avatar_url, final fall through")
return "" # FALL_THROUGH
@fortifyIceFu(-2)
@checkSecret
def registerUser(self, name, current=None) -> int:
"""
Gets called when the server is asked to register a user.
"""
logger.debug(f'registerUser {name} -> fall through')
return -2 # FALL_THROUGH
@fortifyIceFu(-1)
@checkSecret
def unregisterUser(self, id, current=None) -> int:
"""
Gets called when the server is asked to unregister a user.
"""
# Return -1 to fall through to internal server database, so as to not modify Alliance Auth
# but we can make murmur delete all additional logger.information it got this way.
logger.debug(f"unregisterUser {id} -> fall through", )
return -1 # FALL_THROUGH
@fortifyIceFu({})
@checkSecret
def getRegisteredUsers(self, filter, current=None) -> dict:
"""
Returns a list of usernames in the AllianceAuth database which contain
filter as a substring.
"""
if not filter:
mumble_users = MumbleUser.objects.all()
else:
mumble_users = MumbleUser.objects.filter(username__icontains=filter)
if not mumble_users.exists():
logger.debug(f'getRegisteredUsers -> empty list for filter {filter}', )
return {}
logger.debug(f'getRegisteredUsers -> {len(mumble_users)} results for filter {filter}')
return {mumble_user.user_id + server_config_obj.offset: mumble_user.username for mumble_user in mumble_users}
@fortifyIceFu(-1)
@checkSecret
def setInfo(self, id, info, current=None) -> int:
"""
Gets called when the server is supposed to save additional information
about a user to his database
"""
# Return -1 to fall through to the internal server handler.
# Store this in Murmur, Not Alliance Auth
logger.debug(f"setInfo {id} -> fall through")
return -1 # FALL_THROUGH
@fortifyIceFu(-1)
@checkSecret
def setTexture(self, id, texture, current=None) -> int:
"""
Gets called when the server is asked to update the user texture of a user
"""
# Return -1 to fall through to the internal server handler.
# Store this in Murmur, Not Alliance Auth
logger.debug(f"setTexture {id} -> fall through")
return -1 # FALL_THROUGH
#
# --- Start of authenticator
#
logger.info(f"Starting AllianceAuth Mumble Authenticator V:{__version__}")
initdata = Ice.InitializationData()
initdata.properties = Ice.createProperties([], initdata.properties)
initdata.properties.setProperty('Ice.ImplicitContext', 'Shared')
initdata.properties.setProperty('Ice.Default.EncodingVersion', '1.0')
initdata.logger = logger
app = AllianceAuthAuthenticatorApp()
state = app.main(sys.argv, initData=initdata)
logger.info('Shutdown complete')
def allianceauth_check_hash(password, hash, hash_type) -> bool:
"""
:param password: Password to be verified
:param hash: Hash for the password to be checked against
:param hash_type: Hashing function originally used to generate the hash
"""
if hash_type == "sha1":
return sha1(password).hexdigest() == hash
elif hash_type == "bcrypt-sha256":
return bcrypt_sha256.verify(password, hash)
else:
logger.warning("No valid hash function found for %s" % hash_type)
return False
def idler_handler(server) -> None:
logger.debug("IdlerHandler: Starting")
users = server.getUsers().values()
logger.debug("IdleHandler: Fetched All Users")
for user in users:
logger.debug(f"IdleHandler: Checking user {user.name}")
if isinstance(user, int):
logger.debug(f"IdleHandler: Skipping User {user.name}, This happens occasionally")
continue
if user_idlesecs > server_config_obj.idler_handler.seconds:
logger.debug(
f"IdleHandler: User {user.name} is AFK, for {user_idlesecs}/{server_config_obj.idler_handler.seconds}"
)
state = server.getState(user.session)
if state:
# AllowList > DenyList
if server_config_obj.idler_handler.denylist is False:
if state.channel not in server_config_obj.idler_handler.list:
return
elif server_config_obj.idler_handler.denylist is True:
if state.channel in server_config_obj.idler_handler.list:
return
if state.channel == server_config_obj.idler_handler.channel:
return
state.channel = server_config_obj.idler_handler.channel
state.selfMute = True
state.selfDeaf = True
server.setState(state)
logger.debug(f"IdleHandler: Moved AFK User {user.name}")
Timer(server_config_obj.idler_handler.interval, idler_handler, (server,)).start()
if __name__ == "__main__":
main()