import json import os import time import random import requests from werkzeug.security import generate_password_hash, check_password_hash from user.user_db import INSERT_DATA, FIND_DATA, UPSERT_DATA, DELETE_DATA from user.user_utils import check_mail_fmt, check_pwd_fmt, create_token, decrypt_data def create_user_impl(email, name, pwd, role): """ 创建新用户流程 Parameters: email str 邮箱 name str 姓名 pwd str 密码 role str 角色 Returns: 执行成功 bool True 执行失败 str 异常信息 """ def check_params(): """ 参数检查 """ roles = ['admin', 'developer', 'analysts', 'manager', 'guest'] if role not in roles: return "用户角色异常" if not check_mail_fmt(email): return "邮箱格式错误" if not check_pwd_fmt(pwd): return "密码格式错误" if len(FIND_DATA("用户", "用户信息", {"email": email})) > 0: return "邮箱已被注册" return True def gen_new_uid(): """ 生成新的用户ID,如果该ID存在,则重新生成 Returns: 生成的用户ID """ def make_id(num): """ 随机生成字符串 """ choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' salt = '' for i in range(num): salt += random.choice(choices) return salt def gen_id(): """ 生成新ID, 检查新ID是否存在,如果存在则继续生成新ID 若新ID可使用,返回新ID """ new_id = make_id(8) case = FIND_DATA("用户", "用户信息", {"企业ID": new_id}) is [] while case: new_id = make_id(8) return new_id return gen_id() def start_impl(): """ 执行流程 """ check_param_result = check_params() if check_param_result is not True: return check_param_result user = dict() user['UID'] = gen_new_uid() user['email'] = email user['name'] = name user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd)) user['status'] = 'normal' user['role'] = role user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) INSERT_DATA("用户", "用户信息", user) return True return start_impl() def login_impl(email, pwd, vcode): user_info = FIND_DATA('用户', '用户信息', {"email": email}) def check_email(): if user_info is []: return "不存在该邮箱" if not check_mail_fmt(email): return "邮箱格式错误" return True def check_vcode(): records = FIND_DATA("用户", "验证记录", {"email": email}) if len(records) == 0: return "无验证信息" if records[0]['vcode'] != vcode: return "验证码错误" if time.time() - records[0]['timestamp'] > 300: return "验证码过期" DELETE_DATA("用户", "验证记录", {"email": email}) return True def check_pwd(): hash_pwd = user_info[0]['pwd'] try: if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)): return "密码错误" except Exception: return "密码错误" return True def make_menus(): role = user_info[0]['role'] duty = user_info[0]['duty'] with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f: duties = json.load(f) if role == 'member': return duties[role][duty] else: return duties[role] def make_session_id(): choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' session_id = '' for i in range(4): session_id += random.choice(choices) UPSERT_DATA("用户", "token记录", {"UID": user_info[0]['UID']}, {"session_id": session_id}) return session_id def make_result_data(): res_data = dict() res_data['token'] = create_token(make_session_id()) res_data['menus'] = make_menus() return res_data def start_impl(): result = check_email() if result is not True: return result result = check_pwd() if result is not True: return result result = check_vcode() if result is not True: return result return make_result_data() return start_impl() def send_vcode_to_user_impl(email): def check_param(): if not check_mail_fmt(email): return "邮箱格式错误" return True def gen_vcode(): choices = '0123456789' salt = '' for i in range(6): salt += random.choice(choices) return salt def send_email(): email_api = 'http://116.63.130.34:30001' vcode = gen_vcode() timestamp = round(time.time()) headers = {"Content-Type": "application/json;charset=UTF-8"} data = {"title": "【远东资信】{}".format("登录验证码"), "sender": 'fecribd@fecr.com.cn', "recipients": [email], "msg_body": "您{}的验证码为 【{}】,5分钟内有效。".format("登录", vcode)} requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data)) UPSERT_DATA('用户', '验证记录', {"email": email}, {"vcode": vcode, "timestamp": timestamp}) return True def start_impl(): res = check_param() if res is not True: return res res = send_email() if res is not True: return res return True return start_impl() def list_user_impl(): """ 用户信息列表 """ # TODO def disable_user_impl(): """ 禁用用户 """ # TODO def delete_user_impl(): """ 删除用户 """ # TODO def manage_role_of_user_impl(): """ 管理用户角色 """ # TODO