Basraah 2d6c641648 IPS4 and Market PHP hash fix (#727)
Force bcrypt 2y for PHP apps

2b isn't supported by older versions of PHP supplied by e.g. Ubuntu
14.04. 2a is insecure.

Remove plaintext warning

No services store plaintext passwords anymore.

Switch form to password field
2017-02-20 23:20:12 -05:00

150 lines
6.5 KiB
Python

from __future__ import unicode_literals
import logging
import random
import string
import re
from django.db import connections
from passlib.hash import bcrypt
# requires yum install libffi-devel and pip install bcrypt
logger = logging.getLogger(__name__)
class MarketManager:
def __init__(self):
pass
SQL_ADD_USER = r"INSERT INTO fos_user (username, username_canonical, email, email_canonical, enabled, salt," \
r"password, locked, expired, roles, credentials_expired, characterid, characterName)" \
r"VALUES (%s, %s, %s, %s, 1,%s, %s, 0, 0, 'a:0:{}', 0, %s, %s) "
SQL_GET_USER_ID = r"SELECT id FROM fos_user WHERE username = %s"
SQL_DISABLE_USER = r"UPDATE fos_user SET enabled = '0' WHERE username = %s"
SQL_ENABLE_USER = r"UPDATE fos_user SET enabled = '1' WHERE username = %s"
SQL_UPDATE_PASSWORD = r"UPDATE fos_user SET password = %s, salt = %s WHERE username = %s"
SQL_CHECK_EMAIL = r"SELECT email FROM fos_user WHERE email = %s"
SQL_CHECK_USERNAME = r"SELECT username FROM fos_user WHERE username = %s"
SQL_UPDATE_USER = r"UPDATE fos_user SET password = %s, salt = %s, enabled = '1' WHERE username = %s"
@staticmethod
def __santatize_username(username):
sanatized = username.replace(" ", "_")
return sanatized.lower()
@staticmethod
def __generate_random_pass():
return ''.join([random.choice(string.ascii_letters + string.digits) for n in range(16)])
@staticmethod
def _gen_pwhash(password):
return bcrypt.using(ident='2y').encrypt(password.encode('utf-8'), rounds=13)
@staticmethod
def _get_salt(pw_hash):
search = re.compile(r"^\$2[a-z]?\$([0-9]+)\$(.{22})(.{31})$")
match = re.match(search, pw_hash)
return match.group(2)
@classmethod
def check_username(cls, username):
logger.debug("Checking alliance market username %s" % username)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_CHECK_USERNAME, [cls.__santatize_username(username)])
row = cursor.fetchone()
if row:
logger.debug("Found user %s on alliance market" % username)
return True
logger.debug("User %s not found on alliance market" % username)
return False
@classmethod
def check_user_email(cls, username, email):
logger.debug("Checking if alliance market email exists for user %s" % username)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_CHECK_EMAIL, [email])
row = cursor.fetchone()
if row:
logger.debug("Found user %s email address on alliance market" % username)
return True
logger.debug("User %s email address not found on alliance market" % username)
return False
@classmethod
def add_user(cls, username, email, characterid, charactername):
logger.debug("Adding new market user %s" % username)
plain_password = cls.__generate_random_pass()
hash = cls._gen_pwhash(plain_password)
salt = cls._get_salt(hash)
username_clean = cls.__santatize_username(username)
if not cls.check_username(username):
if not cls.check_user_email(username, email):
try:
logger.debug("Adding user %s to alliance market" % username)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_ADD_USER, [username_clean, username_clean, email, email, salt,
hash, characterid, charactername])
return username_clean, plain_password
except:
logger.debug("Unsuccessful attempt to add market user %s" % username)
return "", ""
else:
logger.debug("Alliance market email %s already exists Updating instead" % email)
username_clean, password = cls.update_user_info(username)
return username_clean, password
else:
logger.debug("Alliance market username %s already exists Updating instead" % username)
username_clean, password = cls.update_user_info(username)
return username_clean, password
@classmethod
def disable_user(cls, username):
logger.debug("Disabling alliance market user %s " % username)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_DISABLE_USER, [username])
return True
@classmethod
def update_custom_password(cls, username, plain_password):
logger.debug("Updating alliance market user %s password" % username)
if cls.check_username(username):
username_clean = cls.__santatize_username(username)
hash = cls._gen_pwhash(plain_password)
salt = cls._get_salt(hash)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_UPDATE_PASSWORD, [hash, salt, username_clean])
return plain_password
else:
logger.error("Unable to update alliance market user %s password" % username)
return ""
@classmethod
def update_user_password(cls, username):
logger.debug("Updating alliance market user %s password" % username)
if cls.check_username(username):
username_clean = cls.__santatize_username(username)
plain_password = cls.__generate_random_pass()
hash = cls._gen_pwhash(plain_password)
salt = cls._get_salt(hash)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_UPDATE_PASSWORD, [hash, salt, username_clean])
return plain_password
else:
logger.error("Unable to update alliance market user %s password" % username)
return ""
@classmethod
def update_user_info(cls, username):
logger.debug("Updating alliance market user %s" % username)
try:
username_clean = cls.__santatize_username(username)
plain_password = cls.__generate_random_pass()
hash = cls._gen_pwhash(plain_password)
salt = cls._get_salt(hash)
cursor = connections['market'].cursor()
cursor.execute(cls.SQL_UPDATE_USER, [hash, salt, username_clean])
return username_clean, plain_password
except:
logger.debug("Alliance market update user failed for %s" % username)
return "", ""