import json import os import time import random import requests from werkzeug.security import check_password_hash from DBHelper.MongoHelper import MongoHelper from Utils.ErrorUtil import ReturnConditionCheckFailed from user.user_auth import create_token from user.user_obj import UserManage, ListUser, User, UserLogin, SendLoginVcodeEmail from user.user_utils import decrypt_data class ListUserImpl(ListUser): """用户列表实现""" db = MongoHelper("tfse_v0.21") def list(self): # 查询体构造方法 def make_search_body(param): body = dict() search_keys = list(param.keys()) if "uid" in search_keys: body['UID'] = param['uid'] if "姓名" in search_keys: body['name'] = {"$regex": param['姓名']} if "邮箱" in search_keys: body['email'] = {"$regex": param['邮箱']} if "状态" in search_keys: if param['状态'] == "正常": body['status'] = "normal" elif param['状态'] == "停用": body['status'] = "disable" else: pass if "角色" in search_keys: roles = list() role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"} for role in list(set(param['角色'])): roles.append(role_map[role]) body['role'] = {"$in": roles} return body # 顺序条件构造方法 def make_sort_body(param): if param != {}: columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"} asc_or_desc = 1 if list(param.values())[0] == "asc" else -1 sort_column = columns_map[list(param.keys())[0]] body = {sort_column: asc_or_desc} else: body = {"create_time": -1} return body search_body = make_search_body(self.search) sort = make_sort_body(self.sort) page_size = 10 if self.page_size > 10 else self.page_size page_no = int(self.page_no) total = self.db.find_all_data_with_count( "管理端", "用户", search_body ) records = self.db.find_data_by_page_with_sort( "管理端", "用户", search_body, ["UID", "email", "name", "status", "role", "create_time"], sort, page_size, page_no ) table_data = list() if records: for record in records: user = User() user.uid = record['UID'] user.email = record['email'] user.name = record['name'] user.status = record['status'] user.role = record['role'] user.create_time = record['create_time'] table_data.append(user.dict_to_show()) result = { "records": table_data, "total": total } return result class UserLoginImpl(UserLogin): """用户登录实现""" db = MongoHelper("tfse_v0.21") def login(self): """""" user_info = self.db.find_single_data( "管理端", "用户", {"email": self.email}, ["UID", "name", "pwd", "status", "role"] ) def check_email_existed(): if not user_info: raise ReturnConditionCheckFailed("邮箱不存在", 200) def check_user_is_disable(): if user_info['status'] != 'normal': raise ReturnConditionCheckFailed("账户已禁用", 200) def check_vcode_is_correct(): record = self.db.find_single_data( "管理端", "邮箱验证码记录", {"email": self.email}, ["vcode", "timestamp"] ) if not record: raise ReturnConditionCheckFailed("验证码不存在", 200) if record['vcode'] != self.vcode: raise ReturnConditionCheckFailed("验证码错误", 200) if time.time() - record['timestamp'] > 300: raise ReturnConditionCheckFailed("验证码过期", 200) self.db.delete_single_data( "管理端", "邮箱验证码记录", {"email": self.email} ) def check_pwd_is_correct(): try: if not check_password_hash(user_info['pwd'], decrypt_data(encrypt_msg=self.pwd)): raise ReturnConditionCheckFailed("密码错误", 200) except Exception: raise ReturnConditionCheckFailed("密码错误", 200) def make_menus(): role = user_info['role'] with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f: duties = json.load(f) self.menus = duties[role] def make_token(): choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' session_id = '' for i in range(4): session_id += random.choice(choices) self.db.update_single_data( "管理端", "token记录", {"UID": user_info['UID']}, {"session_id": session_id} ) self.token = create_token(session_id) def __main__(): check_email_existed() check_user_is_disable() check_pwd_is_correct() check_vcode_is_correct() make_menus() make_token() self.name = user_info['name'] return self.dict_to_show() return __main__() class SendLoginVcodeEmailImpl(SendLoginVcodeEmail): """发送登录验证码邮件 实现""" db = MongoHelper("tfse_v0.21") def send_vcode_email(self): def gen_vcode(): choices = '0123456789' salt = '' for i in range(6): salt += random.choice(choices) self.vcode = salt def save_vcode(): self.db.upsert_single_data( "管理端", "邮箱验证码记录", {"email": self.email}, {"vcode": self.vcode, "timestamp": round(time.time())} ) def send_email(): email_api = 'http://116.63.130.34:30001' headers = {"Content-Type": "application/json;charset=UTF-8"} data = { "title": "【远东资信】{}".format("登录验证码"), "sender": 'fecribd@fecr.com.cn', "recipients": [self.email], "msg_body": "您{}的验证码为 【{}】,5分钟内有效。".format("登录", self.vcode) } requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data)) def __main__(): gen_vcode() save_vcode() send_email() return __main__() class UserManageImpl(UserManage): db = MongoHelper("tfse_v0.21") def create(self): def check_email_registered(): """检查邮箱是否已注册""" email_is_existed = self.db.find_single_column( "管理端", "用户", {"email": self.email}, "email" ) if email_is_existed: raise ReturnConditionCheckFailed("邮箱已被注册", 200) def generate_new_uid(): """生成新的用户ID""" def uid_maker(num): """ 用户ID生成器 num: ID长度 int """ choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' salt = '' for i in range(num): salt += random.choice(choices) return salt def uid_checker(__uid): """ 用户ID重复检查 __uid: 用户ID str return: True 用户ID可用 False 用户ID不可用 """ is_uid_existed = self.db.find_single_column( "管理端", "用户", {"UID": __uid}, "UID" ) return True if is_uid_existed is None else False uid = uid_maker(8) while not uid_checker(uid): uid = uid_maker(8) self.uid = uid def __main__(): check_email_registered() generate_new_uid() self.status = "normal" self.create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self.db.upsert_single_data( "管理端", "用户", {"UID": self.uid}, self.dict_to_save() ) __main__() def disable(self): """禁用用户 实现""" self.db.update_single_data( "管理端", "用户", {"UID": self.uid}, {"status": self.status} ) def active(self): """激活用户 实现""" self.db.update_single_data( "管理端", "用户", {"UID": self.uid}, {"status": self.status} )