197 lines
5.4 KiB
Python
197 lines
5.4 KiB
Python
import json
|
||
import time
|
||
import random
|
||
|
||
import requests
|
||
from werkzeug.security import generate_password_hash, check_password_hash
|
||
|
||
from common.db import insert_data, find_data, update_data_upsert
|
||
from common.scripts import read_json_file
|
||
from common.security.APIAuth import create_token
|
||
from user.scripts import check_mail_fmt, check_pwd_fmt
|
||
from common.security.rsa import decrypt_data
|
||
|
||
|
||
def create_user_process(email, name, pwd, role, duty):
|
||
"""
|
||
创建新用户流程
|
||
Parameters:
|
||
email str 邮箱
|
||
name str 姓名
|
||
pwd str 密码
|
||
role str 角色
|
||
duty str 职责
|
||
Returns:
|
||
执行成功 bool True
|
||
执行失败 str 异常信息
|
||
"""
|
||
def check_params():
|
||
"""
|
||
参数检查
|
||
"""
|
||
roles = ['admin', 'member', 'guest']
|
||
if role not in roles:
|
||
return "用户角色异常"
|
||
|
||
duties = ['developer', 'analysts', 'manager', None]
|
||
if duty not in duties:
|
||
return "成员职责异常"
|
||
|
||
if not check_mail_fmt(email):
|
||
return "邮箱格式错误"
|
||
|
||
if not check_pwd_fmt(pwd):
|
||
return "密码格式错误"
|
||
|
||
if len(find_data("tfse_admin", "用户", "用户信息", {"email": email})) > 0:
|
||
return "邮箱已被注册"
|
||
|
||
return True
|
||
|
||
def gen_new_uid():
|
||
"""
|
||
生成新的用户ID,如果该ID存在,则重新生成
|
||
Returns:
|
||
生成的用户ID
|
||
"""
|
||
|
||
def make_id(num):
|
||
"""
|
||
随机生成字符串
|
||
"""
|
||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||
salt = ''
|
||
for i in range(num):
|
||
salt += random.choice(choices)
|
||
return salt
|
||
|
||
def gen_id():
|
||
"""
|
||
生成新ID,
|
||
检查新ID是否存在,如果存在则继续生成新ID
|
||
若新ID可使用,返回新ID
|
||
"""
|
||
new_id = make_id(8)
|
||
case = find_data("tfse_admin", "用户", "用户信息", {"企业ID": new_id}) is []
|
||
while case:
|
||
new_id = make_id(8)
|
||
return new_id
|
||
|
||
return gen_id()
|
||
|
||
def process_main():
|
||
"""
|
||
执行流程
|
||
"""
|
||
check_param_result = check_params()
|
||
if check_param_result is not True:
|
||
return check_param_result
|
||
|
||
user = dict()
|
||
user['UID'] = gen_new_uid()
|
||
user['email'] = email
|
||
user['name'] = name
|
||
user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
|
||
user['status'] = 'normal'
|
||
user['role'] = role
|
||
user['duty'] = duty
|
||
user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||
|
||
insert_data("tfse_admin", "用户", "用户信息", user)
|
||
|
||
return True
|
||
|
||
return process_main()
|
||
|
||
|
||
def login_process(email, pwd):
|
||
user_info = find_data('tfse_admin', '用户', '用户信息', {"email": email})
|
||
|
||
def check_email():
|
||
if user_info is []:
|
||
return "不存在该邮箱"
|
||
if not check_mail_fmt(email):
|
||
return "邮箱格式错误"
|
||
return True
|
||
|
||
def check_pwd():
|
||
hash_pwd = user_info[0]['pwd']
|
||
try:
|
||
if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)):
|
||
return "密码错误"
|
||
except Exception:
|
||
return "密码错误"
|
||
|
||
return True
|
||
|
||
def check_menus():
|
||
role = user_info[0]['role']
|
||
duty = user_info[0]['duty']
|
||
duties = read_json_file('/user/static/menus.json')
|
||
if role == 'admin':
|
||
return duties[role]
|
||
elif role == 'member':
|
||
return duties[role][duty]
|
||
else:
|
||
return duties[role]
|
||
|
||
def gen_res_data():
|
||
res_data = dict()
|
||
res_data['token'] = create_token({"UID": user_info[0]['UID']})
|
||
res_data['menus'] = check_menus()
|
||
return res_data
|
||
|
||
def main_process():
|
||
res = check_email()
|
||
if res is not True:
|
||
return res
|
||
|
||
res = check_pwd()
|
||
if res is not True:
|
||
return res
|
||
|
||
return gen_res_data()
|
||
|
||
return main_process()
|
||
|
||
|
||
def send_vcode_to_user_process(email):
|
||
|
||
def check_param():
|
||
if not check_mail_fmt(email):
|
||
return "邮箱格式错误"
|
||
return True
|
||
|
||
def gen_vcode():
|
||
choices = '0123456789'
|
||
salt = ''
|
||
for i in range(6):
|
||
salt += random.choice(choices)
|
||
return salt
|
||
|
||
def send_email():
|
||
email_api = 'http://116.63.130.34:30001'
|
||
vcode = gen_vcode()
|
||
timestamp = round(time.time())
|
||
headers = {"Content-Type": "application/json;charset=UTF-8"}
|
||
data = {"title": "【远东资信】{}".format("登录验证码"),
|
||
"sender": 'fecribd@fecr.com.cn',
|
||
"recipients": [email],
|
||
"msg_body": "您{}的验证码为 【{}】,5分钟内有效。".format("登录", vcode)}
|
||
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
|
||
update_data_upsert('tfse_admin', '用户', '验证记录', {"email": email}, {"vcode": vcode, "timestamp": timestamp})
|
||
return True
|
||
|
||
def main_process():
|
||
res = check_param()
|
||
if res is not True:
|
||
return res
|
||
|
||
res = send_email()
|
||
if res is not True:
|
||
return res
|
||
|
||
return True
|
||
|
||
return main_process()
|