import re from Utils.ErrorUtil import ReturnConditionCheckFailed from user.user_utils import decrypt_data class User: """管理端用户""" def __init__(self): self.uid = None # 用户ID str self.email = None # 邮箱 str self.name = None # 用户名 str self.pwd = None # 密码 str self.status = None # 用户状态 str self.role = None # 角色 str self.create_time = None # 创建时间 str def check_uid(self): """用户ID校验""" if type(self.uid) is not str: raise ReturnConditionCheckFailed("用户ID格式错误", 200) if len(self.uid) != 8: raise ReturnConditionCheckFailed("用户ID格式错误", 200) def check_email(self): """邮箱格式仅允许@fecr.com.cn""" regex = "^.+\\@fecr.com.cn" case = (len(self.email) > 7) and (re.match(regex, self.email) is not None) result = True if case else False if not result: raise ReturnConditionCheckFailed("邮箱格式错误", 200) def check_name(self): """用户名格式校验""" if type(self.name) is not str: raise ReturnConditionCheckFailed("用户名格式错误", 200) def check_pwd(self): """密码格式校验""" if type(self.pwd) is not str: raise ReturnConditionCheckFailed("密码格式错误", 200) password = decrypt_data(encrypt_msg=self.pwd) if not password: raise ReturnConditionCheckFailed("密码格式错误", 200) regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$" case = (len(password) >= 8) and (re.match(regex, password) is not None) result = True if case else False if not result: raise ReturnConditionCheckFailed("密码格式错误", 200) def check_status(self): """用户状态校验""" if self.status not in ['normal', 'disable']: raise ReturnConditionCheckFailed("用户状态格式错误", 200) def check_role(self): """用户角色校验""" if self.role not in ['admin', 'analysts', 'developer', 'operator', 'guest']: raise ReturnConditionCheckFailed("用户角色格式错误", 200) def check_obj(self, **kwargs): """对象字段校验""" columns = kwargs['columns'] for column in columns: if column == "uid": self.check_uid() elif column == "email": self.check_email() elif column == "name": self.check_name() elif column == "pwd": self.check_pwd() elif column == "status": self.check_status() elif column == "role": self.check_role() else: pass def trans_status(self): """用户状态转换成前端显示格式""" return "正常" if self.status == "normal" else "停用" def trans_role(self): """用户角色转换成前端显示格式""" role_map = {"admin": "管理员", "analysts": "分析师", "developer": "开发者", "operator": "运营人员", "guest": "访客"} return role_map[self.role] def dict_to_save(self, **kwargs): """存储对象""" user_dict = { "UID": self.uid, "email": self.email, "name": self.name, "pwd": self.pwd, "status": self.status, "role": self.role, "create_time": self.create_time } if 'columns' in list(kwargs.keys()): user_dict = {key: user_dict[key] for key in kwargs['columns']} return user_dict def dict_to_show(self, **kwargs): """显示对象""" user_dict = { "uid": self.uid, "邮箱": self.email, "姓名": self.name, "状态": self.trans_status(), "角色": self.trans_role(), "创建时间": self.create_time } if 'columns' in list(kwargs.keys()): user_dict = {key: user_dict[key] for key in kwargs['columns']} return user_dict class ListUser: """用户列表""" def __init__(self, search, sort, page_size, page_no): self.search = search self.sort = sort self.page_size = page_size self.page_no = page_no def list(self): """查询用户列表""" def check_search(self): """ type: dict desc: 用户搜索条件 支持模糊查询条件 demo: {"姓名": "xxx", "邮箱": "xxx"} """ if type(self.search) is not dict: raise ReturnConditionCheckFailed("参数 search 格式错误", 400) search_keys = list(self.search.keys()) if "uid" in search_keys: if type(self.search['UID']) is not str: raise ReturnConditionCheckFailed("参数 search.uid 格式错误", 400) if "姓名" in search_keys: if type(self.search['姓名']) is not str: raise ReturnConditionCheckFailed("参数 search.姓名 格式错误", 400) if "邮箱" in search_keys: if type(self.search['邮箱']) is not str: raise ReturnConditionCheckFailed("参数 search.邮箱 格式错误", 400) if "状态" in search_keys: if type(self.search['状态']) is not str: raise ReturnConditionCheckFailed("参数 search.状态 格式错误", 400) if self.search['状态'] not in ["正常", "停用"]: raise ReturnConditionCheckFailed("参数 search.状态 格式错误", 400) if "角色" in search_keys: if type(self.search['角色']) is not list: raise ReturnConditionCheckFailed("参数 search.角色 格式错误", 400) for role in self.search['角色']: if role not in ["管理员", "分析师", "开发者", "运营人员", "访客"]: raise ReturnConditionCheckFailed("参数 search.角色 格式错误", 400) def check_sort(self): """ type: dict desc: 排序条件 asc正序 desc倒序 demo: {"姓名": asc} """ if type(self.sort) is not dict: raise ReturnConditionCheckFailed("参数 sort 格式错误", 400) if len(list(self.sort.values())) > 1: raise ReturnConditionCheckFailed("参数 sort 格式错误", 400) if len(list(self.sort.values())) == 1 and list(self.sort.values())[0] not in ["asc", "desc"]: raise ReturnConditionCheckFailed("参数 sort 格式错误", 400) def check_page_size(self): """ type: int desc: 每页数据数量 """ if type(self.page_size) is not int: raise ReturnConditionCheckFailed("参数 page_size 格式错误", 400) def check_page_no(self): """ type: int desc: 当前页数 """ if type(self.page_no) is not int: raise ReturnConditionCheckFailed("参数 page_no 格式错误", 400) def check_obj(self): """ """ self.check_search() self.check_sort() self.check_page_size() self.check_page_no() class UserLogin(User): """用户登录""" def __init__(self): super().__init__() self.vcode = None self.token = None self.menus = None def login(self): """登录""" def check_vcode(self): """检查验证码格式""" if type(self.vcode) is not str: raise ReturnConditionCheckFailed("验证码格式错误", 200) def check_obj(self): """""" self.check_email() self.check_vcode() def dict_to_show(self): """显示对象""" show_dict = { "name": self.name, "token": self.token, "menus": self.menus } return show_dict class SendLoginVcodeEmail(User): """发送登录验证码邮件""" def __init__(self): super().__init__() self.vcode = None def check_obj(self): self.check_email() class UserManage(User): """用户管理""" def create(self): """创建新用户""" def disable(self): """禁用用户""" def active(self): """激活用户""" def delete_user(self): """删除用户""" def manage_role_of_user(self): """管理用户角色"""