Source code for deepdataspace.model.user

"""
deepdataspace.model.user

The user related models.
"""

import secrets
import time
from typing import Union

from deepdataspace.constants import UserStatus
from deepdataspace.model._base import BaseModel
from deepdataspace.server.settings import SECRET_KEY
from deepdataspace.server.settings import TOKEN_AGE
from deepdataspace.utils.string import decrypt
from deepdataspace.utils.string import encrypt
from deepdataspace.utils.string import gen_password
from deepdataspace.utils.string import get_str_md5

IntegrationError = type("IntegrationError", (Exception,), {})


[docs]class UserToken(BaseModel): """ The session token for a logged-in user. Attributes: ----------- id: str The token id. user_id: str The user id this token bond to. expire: int The token expire timestamp, in second. """
[docs] @classmethod def get_collection(cls, *args, **kwargs): """ User tokens are stored in the "user_tokens" collection. """ return cls.db["user_tokens"]
id: str # the token id user_id: str # the user id expire: int # the token expire timestamp, in second
[docs]class User(BaseModel): """ The user model. Attributes: ----------- id: str The user id. name: str The username. password: str The password, encrypted. status: str The user status, active or inactive, see :class:`deepdataspace.constants.UserStatus`. is_staff: bool Is this user a staff? """
[docs] @classmethod def get_collection(cls, *args, **kwargs): """ Users are stored in the "users" collection. """ return cls.db["users"]
id: str # the user id name: str # the username password: str # the password, encrypted. status: str # the user status is_staff: bool = False @property def is_active(self): return self.status == UserStatus.Active
[docs] @classmethod def get_user(cls, username: str) -> Union[None, "User"]: """ Get a user by username. """ user_id = get_str_md5(username) user = User.find_one({"id": user_id}) if user is not None and user.status != UserStatus.Active: user = None return user
[docs] @classmethod def create_user(cls, username: str, is_staff: bool = False) -> "User": """ Create a user by username, set a random password for the user. """ username = username.lower() user_id = get_str_md5(username) user = User.find_one({"id": user_id}) if user is not None: raise IntegrationError(f"user already exists with username: {username}") password = gen_password(10) password = encrypt(password, SECRET_KEY) user = User(id=user_id, name=username, password=password, status=UserStatus.Active, is_staff=is_staff) user.save() return user
[docs] @classmethod def delete_user(cls, username: str): """ Delete users by username. """ user_id = get_str_md5(username) User.delete_many({"id": user_id})
[docs] def reset_password(self): """ Reset user password to a random one. """ password = gen_password(10) self.set_password(password) self.delete_all_token()
[docs] def ban_user(self): """ Ban a user, set user status to inactive. """ self.status = UserStatus.InActive self.delete_all_token() self.save()
[docs] def unban_user(self): """ Unban a user, set user status to active. """ self.status = UserStatus.Active self.save()
[docs] @classmethod def authenticate(cls, username: str, password: str) -> Union[None, "User"]: """ Authenticate a user by username and password. """ user = cls.get_user(username) if user is None: return None decrypted_pass = decrypt(user.password, SECRET_KEY) if password == decrypted_pass: return user return None
[docs] def get_password(self): """ Get the decrypted password. """ return decrypt(self.password, SECRET_KEY)
[docs] def set_password(self, password: str): """ Set the password for a user. store it in encrypted format. """ password = encrypt(password, SECRET_KEY) self.password = password self.save()
[docs] def login_user(self): """ Login a user, create a token for the user. """ cur_time = int(time.time()) expire = cur_time + TOKEN_AGE token_value = secrets.token_hex(16) token = UserToken(id=token_value, user_id=self.id, expire=expire) token.save() return token
[docs] def logout_user(self, token_value: str): """ Logout a user, delete the token of current session. """ UserToken.delete_many({"id": token_value, "user_id": self.id})
[docs] def refresh_token(self, token_value: str): """ Extend the expiry time for current session. """ cur_time = int(time.time()) expire = cur_time + TOKEN_AGE UserToken.update_one({"id": token_value, "user_id": self.id}, {"expire": expire})
[docs] def delete_all_token(self): """ Delete all session for current user. This method must be called after banning the user. """ UserToken.delete_many({"user_id": self.id})