Merge branch 'main' into ps
This commit is contained in:
commit
132d9af22b
|
@ -0,0 +1,40 @@
|
|||
from werkzeug.security import generate_password_hash
|
||||
|
||||
from DBHelper.MongoHelperInstance import DB_GUA
|
||||
from Modules.AdminUser.UserObject1 import User
|
||||
from Modules.AdminUser.UserUtils1 import UserUtils
|
||||
from Modules.Common.CommonUtils import CommonUtils
|
||||
|
||||
|
||||
class UserImpl(object):
|
||||
|
||||
@staticmethod
|
||||
def create(email, name, pwd, role):
|
||||
"""创建"""
|
||||
|
||||
UserUtils.check_mail_fmt(email)
|
||||
UserUtils.check_email_registered(email)
|
||||
|
||||
user = User()
|
||||
user.uid = UserUtils.generate_new_uid()
|
||||
user.email = email
|
||||
user.name = name
|
||||
user.pwd = generate_password_hash(pwd)
|
||||
user.status = "normal"
|
||||
user.role = role
|
||||
user.create_time = CommonUtils.get_current_time()
|
||||
|
||||
DB_GUA.insert_single_data(
|
||||
"管理端",
|
||||
"用户",
|
||||
user.dict_keys_toggle()
|
||||
)
|
||||
|
||||
def login(self):
|
||||
"""登录"""
|
||||
|
||||
def disable(self):
|
||||
"""禁用"""
|
||||
|
||||
def active(self):
|
||||
"""激活"""
|
|
@ -65,7 +65,7 @@ class User:
|
|||
def check_role(self):
|
||||
"""用户角色校验"""
|
||||
|
||||
if self.role not in ['admin', 'analysts', 'developer', 'operator', 'guest']:
|
||||
if self.role not in ['admin', 'analysts', 'guest']:
|
||||
raise ReturnConditionCheckFailed("用户角色格式错误", 200)
|
||||
|
||||
def check_obj(self, **kwargs):
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
from Modules.AdminUser.UserUtils1 import UserUtils
|
||||
|
||||
from Utils.ObjUtil import SpecObject
|
||||
from Utils.ValidateUtil import ValidateAttr, Validate
|
||||
|
||||
|
||||
class User(SpecObject):
|
||||
"""管理端用户"""
|
||||
|
||||
uid = ValidateAttr(field="uid", type=str)
|
||||
email = ValidateAttr(field="email", func=Validate.email)
|
||||
name = ValidateAttr(field="name", type=str)
|
||||
pwd = ValidateAttr(field="pwd", func=Validate.password)
|
||||
status = ValidateAttr(field="status", in_list=UserUtils.user_enum("User", "status"))
|
||||
role = ValidateAttr(field="role", in_list=UserUtils.user_enum("User", "role"))
|
||||
create_time = ValidateAttr(field="create_time", func=Validate.time_format)
|
||||
|
||||
fields_map = {
|
||||
"uid": "用户ID",
|
||||
"email": "邮箱",
|
||||
"name": "用户名",
|
||||
"pwd": "密码",
|
||||
"status": "用户状态",
|
||||
"role": "角色",
|
||||
"create_time": "创建时间"
|
||||
}
|
|
@ -1,7 +1,9 @@
|
|||
from flask import Blueprint, request
|
||||
|
||||
from Utils.ErrorUtil import ReturnConditionCheckFailed
|
||||
from Modules.AdminUser.UserImpl import UserManageImpl, ListUserImpl, UserLoginImpl, SendLoginVcodeEmailImpl
|
||||
from Modules.AdminUser.UserImpl1 import UserImpl
|
||||
from Modules.AdminUser.UserUtils1 import UserUtils
|
||||
from Utils.ErrorUtil import ReturnConditionCheckFailed, AttrCheckError
|
||||
from Modules.AdminUser.UserImpl import UserLoginImpl, SendLoginVcodeEmailImpl
|
||||
from Modules.AdminUser.UserAuthUtils import verify_token, authority_scope
|
||||
|
||||
user_route = Blueprint('user', __name__)
|
||||
|
@ -17,43 +19,27 @@ def online_check_route(**kwargs):
|
|||
|
||||
|
||||
@user_route.route('/create', methods=['POST'])
|
||||
@verify_token
|
||||
@authority_scope(['admin'])
|
||||
# @verify_token
|
||||
# @authority_scope(['admin'])
|
||||
def create_user_route(**kwargs):
|
||||
"""新建用户"""
|
||||
try:
|
||||
req = request.json
|
||||
user_manage = UserManageImpl()
|
||||
user_manage.email, user_manage.name, user_manage.pwd, user_manage.role = req['email'], req['name'], req['pwd'], req['role']
|
||||
user_manage.check_obj(columns=["email", "name", "pwd", "role"])
|
||||
user_manage.create()
|
||||
return {"info": "用户创建成功"}
|
||||
except ReturnConditionCheckFailed as e:
|
||||
e.log_error()
|
||||
return {"info": e.__str__()}, e.status_code
|
||||
|
||||
impl = UserImpl()
|
||||
|
||||
@user_route.route('/list', methods=['POST'])
|
||||
@verify_token
|
||||
@authority_scope(['admin'])
|
||||
def list_user_route(**kwargs):
|
||||
"""
|
||||
用户信息列表接口
|
||||
"""
|
||||
try:
|
||||
req = request.json
|
||||
list_user = ListUserImpl(
|
||||
req['search'],
|
||||
req['sort'],
|
||||
req['page_size'],
|
||||
req['page_no']
|
||||
impl.create(
|
||||
req['email'],
|
||||
req['name'],
|
||||
UserUtils.decrypt_data(encrypt_msg=req['pwd']),
|
||||
req['role']
|
||||
)
|
||||
list_user.check_obj()
|
||||
result = list_user.list()
|
||||
return {"info": '查询结果', "result": result}, 200
|
||||
except ReturnConditionCheckFailed as e:
|
||||
e.log_error()
|
||||
return {"info": e.__str__()}, e.status_code
|
||||
|
||||
return {"info": "用户创建成功"}, 200
|
||||
except AssertionError as e:
|
||||
return {"info": e.__str__()}, 202
|
||||
except AttrCheckError as e:
|
||||
return {"info": e.__str__()}, 202
|
||||
|
||||
|
||||
@user_route.route('/login', methods=['POST'])
|
||||
|
@ -87,51 +73,74 @@ def send_vcode_to_user_route():
|
|||
return {"info": e.__str__()}, e.status_code
|
||||
|
||||
|
||||
@user_route.route('/disable', methods=['GET'])
|
||||
@verify_token
|
||||
@authority_scope(['admin'])
|
||||
def disable_user_route(**kwargs):
|
||||
"""停用用户接口"""
|
||||
try:
|
||||
user = UserManageImpl()
|
||||
user.uid = request.args['UID']
|
||||
user.status = "disable"
|
||||
user.check_obj(columns=["uid", "status"])
|
||||
user.disable()
|
||||
return {"info": "已停用该用户"}, 200
|
||||
except ReturnConditionCheckFailed as e:
|
||||
e.log_error()
|
||||
return {"info": e.__str__()}, e.status_code
|
||||
# @user_route.route('/list', methods=['POST'])
|
||||
# @verify_token
|
||||
# @authority_scope(['admin'])
|
||||
# def list_user_route(**kwargs):
|
||||
# """
|
||||
# 用户信息列表接口
|
||||
# """
|
||||
# try:
|
||||
# req = request.json
|
||||
# list_user = ListUserImpl(
|
||||
# req['search'],
|
||||
# req['sort'],
|
||||
# req['page_size'],
|
||||
# req['page_no']
|
||||
# )
|
||||
# list_user.check_obj()
|
||||
# result = list_user.list()
|
||||
# return {"info": '查询结果', "result": result}, 200
|
||||
# except ReturnConditionCheckFailed as e:
|
||||
# e.log_error()
|
||||
# return {"info": e.__str__()}, e.status_code
|
||||
|
||||
|
||||
@user_route.route('/active', methods=['GET'])
|
||||
@verify_token
|
||||
@authority_scope(['admin'])
|
||||
def active_user_route(**kwargs):
|
||||
"""激活用户接口"""
|
||||
try:
|
||||
user = UserManageImpl()
|
||||
user.uid = request.args['UID']
|
||||
user.status = "normal"
|
||||
user.check_obj(columns=["uid", "status"])
|
||||
user.active()
|
||||
return {"info": "已激活该用户"}, 200
|
||||
except ReturnConditionCheckFailed as e:
|
||||
e.log_error()
|
||||
return {"info": e.__str__()}, e.status_code
|
||||
|
||||
|
||||
@user_route.route('/delete', methods=['GET'])
|
||||
@verify_token
|
||||
@authority_scope(['admin'])
|
||||
def delete_user_route(**kwargs):
|
||||
"""删除用户接口"""
|
||||
return {"info": "调整中"}, 200
|
||||
|
||||
|
||||
@user_route.route('/manage_role', methods=['POST'])
|
||||
@verify_token
|
||||
@authority_scope(['admin'])
|
||||
def manage_role_of_user_route(**kwargs):
|
||||
"""管理用户角色接口"""
|
||||
return {"info": "调整中"}, 200
|
||||
# @user_route.route('/disable', methods=['GET'])
|
||||
# @verify_token
|
||||
# @authority_scope(['admin'])
|
||||
# def disable_user_route(**kwargs):
|
||||
# """停用用户接口"""
|
||||
# try:
|
||||
# user = UserManageImpl()
|
||||
# user.uid = request.args['UID']
|
||||
# user.status = "disable"
|
||||
# user.check_obj(columns=["uid", "status"])
|
||||
# user.disable()
|
||||
# return {"info": "已停用该用户"}, 200
|
||||
# except ReturnConditionCheckFailed as e:
|
||||
# e.log_error()
|
||||
# return {"info": e.__str__()}, e.status_code
|
||||
#
|
||||
#
|
||||
# @user_route.route('/active', methods=['GET'])
|
||||
# @verify_token
|
||||
# @authority_scope(['admin'])
|
||||
# def active_user_route(**kwargs):
|
||||
# """激活用户接口"""
|
||||
# try:
|
||||
# user = UserManageImpl()
|
||||
# user.uid = request.args['UID']
|
||||
# user.status = "normal"
|
||||
# user.check_obj(columns=["uid", "status"])
|
||||
# user.active()
|
||||
# return {"info": "已激活该用户"}, 200
|
||||
# except ReturnConditionCheckFailed as e:
|
||||
# e.log_error()
|
||||
# return {"info": e.__str__()}, e.status_code
|
||||
#
|
||||
#
|
||||
# @user_route.route('/delete', methods=['GET'])
|
||||
# @verify_token
|
||||
# @authority_scope(['admin'])
|
||||
# def delete_user_route(**kwargs):
|
||||
# """删除用户接口"""
|
||||
# return {"info": "调整中"}, 200
|
||||
#
|
||||
#
|
||||
# @user_route.route('/manage_role', methods=['POST'])
|
||||
# @verify_token
|
||||
# @authority_scope(['admin'])
|
||||
# def manage_role_of_user_route(**kwargs):
|
||||
# """管理用户角色接口"""
|
||||
# return {"info": "调整中"}, 200
|
||||
|
|
|
@ -78,3 +78,7 @@ def check_pwd_fmt(pwd):
|
|||
case = (len(password) >= 8) and (re.match(regex, password) is not None)
|
||||
result = True if case else False
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(encrypt_data(msg="Fecr1988."))
|
|
@ -0,0 +1,131 @@
|
|||
import json
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
import base64
|
||||
|
||||
from Crypto.PublicKey import RSA
|
||||
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
|
||||
|
||||
from DBHelper.MongoHelperInstance import DB_GUA
|
||||
|
||||
|
||||
class UserUtils(object):
|
||||
|
||||
@staticmethod
|
||||
def encrypt_data(**kwargs):
|
||||
"""
|
||||
用公钥加密
|
||||
Parameters: msg str 待加密信息
|
||||
Returns: 加密后结果
|
||||
"""
|
||||
msg = kwargs['msg'] # 待加密信息
|
||||
|
||||
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f:
|
||||
data = f.read()
|
||||
public_key = RSA.importKey(data)
|
||||
|
||||
cipher = PKCS1_cipher.new(public_key)
|
||||
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
|
||||
return encrypt_text.decode('utf-8')
|
||||
|
||||
@staticmethod
|
||||
def decrypt_data(**kwargs):
|
||||
"""
|
||||
用私钥解密
|
||||
Parameters: encrypt_msg str 加密信息
|
||||
Returns: 执行正确 解密后结果 执行错误 False
|
||||
"""
|
||||
try:
|
||||
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
|
||||
|
||||
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f:
|
||||
data = f.read()
|
||||
private_key = RSA.importKey(data)
|
||||
|
||||
cipher = PKCS1_cipher.new(private_key)
|
||||
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
|
||||
return back_text.decode('utf-8')
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def user_enum(obj, field):
|
||||
"""获取枚举属性范围"""
|
||||
|
||||
with open(os.path.abspath(os.path.dirname(__file__)+'/static/EnumDataSetting.json')) as f:
|
||||
return json.loads(f.read())[obj][field]
|
||||
|
||||
@staticmethod
|
||||
def check_mail_fmt(email):
|
||||
"""
|
||||
邮箱地址格式校验,仅允许@fecr.com.cn
|
||||
Parameters: email: 邮箱
|
||||
Returns: result: 邮箱校验结果,正确返回True,不正确返回False
|
||||
"""
|
||||
regex = "^.+\\@fecr.com.cn"
|
||||
case = (len(email) > 7) and (re.match(regex, email) is not None)
|
||||
result = True if case else False
|
||||
assert result, "未满足@fecr.com.cn邮箱格式"
|
||||
|
||||
@staticmethod
|
||||
def check_pwd_fmt(pwd):
|
||||
"""
|
||||
密码强度校验
|
||||
Parameters: pwd: 密码(已加密)
|
||||
Returns: result: 密码强度校验结果,正确返回True,不正确返回False
|
||||
"""
|
||||
password = UserUtils.decrypt_data(encrypt_msg=pwd)
|
||||
assert not password, "密码强度校验失败"
|
||||
|
||||
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
|
||||
case = (len(password) >= 8) and (re.match(regex, password) is not None)
|
||||
assert not case, "密码强度不够"
|
||||
|
||||
@staticmethod
|
||||
def check_email_registered(email):
|
||||
"""检查邮箱是否已注册"""
|
||||
|
||||
email_is_existed = DB_GUA.find_single_column(
|
||||
"管理端",
|
||||
"用户",
|
||||
{"邮箱": email},
|
||||
"邮箱"
|
||||
)
|
||||
|
||||
assert not email_is_existed, "邮箱已被注册"
|
||||
|
||||
@staticmethod
|
||||
def generate_new_uid():
|
||||
"""生成新的用户ID"""
|
||||
|
||||
def uid_maker(num):
|
||||
"""
|
||||
用户ID生成器
|
||||
num: ID长度 int
|
||||
"""
|
||||
|
||||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||||
salt = ''
|
||||
for i in range(num):
|
||||
salt += random.choice(choices)
|
||||
return salt
|
||||
|
||||
def uid_checker(__uid):
|
||||
"""
|
||||
用户ID重复检查
|
||||
__uid: 用户ID str
|
||||
return: True 用户ID可用 False 用户ID不可用
|
||||
"""
|
||||
is_uid_existed = DB_GUA.find_single_column(
|
||||
"管理端",
|
||||
"用户",
|
||||
{"UID": __uid},
|
||||
"UID"
|
||||
)
|
||||
return True if is_uid_existed is None else False
|
||||
|
||||
uid = uid_maker(8)
|
||||
while not uid_checker(uid):
|
||||
uid = uid_maker(8)
|
||||
return uid
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"User":{
|
||||
"status": ["normal", "disable"],
|
||||
"role": ["admin", "analysts", "guest"]
|
||||
}
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
import time
|
||||
|
||||
|
||||
class CommonUtils(object):
|
||||
|
||||
@staticmethod
|
||||
def get_current_time():
|
||||
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
@ -1,6 +1,89 @@
|
|||
import xlrd
|
||||
|
||||
from Modules.Company.CompanyObject import BasicBusinessInfo
|
||||
from Modules.Company.CompanyUtils import CommonUtils, ExcelSheetParser
|
||||
|
||||
|
||||
class BasicBusinessInfoImpl(object):
|
||||
class BasicBusinessInfoImpl(BasicBusinessInfo):
|
||||
|
||||
def parse_excel(self):
|
||||
""""""
|
||||
def create_company_and_parse_excel(self, file):
|
||||
""""""
|
||||
|
||||
work_book = xlrd.open_workbook(file_contents=file.read())
|
||||
|
||||
for sheet in work_book.sheets():
|
||||
|
||||
if sheet.name == '企业信息':
|
||||
data = ExcelSheetParser(sheet=sheet).parse_sheet1()
|
||||
|
||||
data
|
||||
|
||||
# elif sheet.name == '高管股东信息':
|
||||
# data1, data2 = parse_gua_sheet2(sheet)
|
||||
# for i in data1:
|
||||
# i['企业名称'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_2.update_one({"高管姓名": i['姓名'], "企业名称": i['企业名称']}, {"$set": i}, upsert=True)
|
||||
# for i in data2:
|
||||
# i['企业名称'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_3.update_one({"股东名称": i['股东名称'], "企业名称": i['企业名称']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '经营情况':
|
||||
# data = parse_gua_sheet3(sheet)
|
||||
# for i in data:
|
||||
# i['企业名称'] = company
|
||||
# i['所在省份'] = province
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_4.update_one({"年度": i['年度'], "企业名称": i['企业名称']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '客户信息':
|
||||
# data = parse_gua_sheet4(sheet)
|
||||
# for i in data:
|
||||
# i['担保企业'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_5.update_one({"年度": i['年度'], "客户名称": i['客户名称'], "担保企业": i['担保企业']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '区域分布':
|
||||
# data = parse_gua_sheet5(sheet)
|
||||
# for i in data:
|
||||
# i['担保企业'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_6.update_one({"年度": i['年度'], "地区": i['地区'], "担保企业": i['担保企业']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '行业分布':
|
||||
# data = parse_gua_sheet5(sheet)
|
||||
# for i in data:
|
||||
# i['担保企业'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_7.update_one({"年度": i['年度'], "所属行业": i['所属行业'], "担保企业": i['担保企业']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '担保金额分布':
|
||||
# data = parse_gua_sheet6(sheet)
|
||||
# for i in data:
|
||||
# i['担保企业'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_8.update_one({"年度": i['年度'], "担保企业": i['担保企业']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '资产负债表':
|
||||
# data = parse_gua_sheet7(sheet)
|
||||
# for i in data:
|
||||
# i['企业名称'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_9.update_one({"年度": i['年度'], "企业名称": i['企业名称']}, {"$set": i}, upsert=True)
|
||||
#
|
||||
# elif sheet.name == '利润表':
|
||||
# data = parse_gua_sheet8(sheet)
|
||||
# for i in data:
|
||||
# i['企业名称'] = company
|
||||
# if cid:
|
||||
# i['cid'] = cid
|
||||
# col_10.update_one({"年度": i['年度'], "企业名称": i['企业名称']}, {"$set": i}, upsert=True)
|
||||
|
|
|
@ -13,4 +13,4 @@ def create_route(**kwargs):
|
|||
company_name = request.form['company_name']
|
||||
file = request.files['file']
|
||||
|
||||
BasicBusinessInfoImpl
|
||||
BasicBusinessInfoImpl.parse_excel(file)
|
|
@ -6,6 +6,33 @@ from datetime import datetime, timedelta
|
|||
from DBHelper.MongoHelperInstance import DB_GUA
|
||||
|
||||
|
||||
class CommonUtils(object):
|
||||
|
||||
# 生成新的企业ID,如果该ID存在,则重新生成
|
||||
@staticmethod
|
||||
def make_new_cid():
|
||||
|
||||
def random_cid(num):
|
||||
"""随机企业ID"""
|
||||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||||
salt = ''
|
||||
for i in range(num):
|
||||
salt += random.choice(choices)
|
||||
return salt
|
||||
|
||||
new_cid = random_cid(8)
|
||||
|
||||
case = DB_GUA.find_single_column(
|
||||
"应用端",
|
||||
"企业用户",
|
||||
{"企业ID": new_cid},
|
||||
"企业ID"
|
||||
) is not None
|
||||
while case:
|
||||
new_cid = random_cid(8)
|
||||
return new_cid
|
||||
|
||||
|
||||
class ExcelParserUtil(object):
|
||||
|
||||
# 转换excel日期
|
||||
|
@ -33,30 +60,6 @@ class ExcelParserUtil(object):
|
|||
def list_decimal(_list):
|
||||
return list(map(lambda _v: round(_v, 2) if isinstance(_v, float) else _v, _list))
|
||||
|
||||
# 生成新的企业ID,如果该ID存在,则重新生成
|
||||
@staticmethod
|
||||
def make_new_cid():
|
||||
|
||||
def random_cid(num):
|
||||
"""随机企业ID"""
|
||||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||||
salt = ''
|
||||
for i in range(num):
|
||||
salt += random.choice(choices)
|
||||
return salt
|
||||
|
||||
new_cid = random_cid(8)
|
||||
|
||||
case = DB_GUA.find_single_column(
|
||||
"应用端",
|
||||
"企业用户",
|
||||
{"企业ID": new_cid},
|
||||
"企业ID"
|
||||
) is not None
|
||||
while case:
|
||||
new_cid = random_cid(8)
|
||||
return new_cid
|
||||
|
||||
|
||||
class ExcelSheetParser(object):
|
||||
|
||||
|
|
|
@ -43,6 +43,16 @@ class APIReturnError(RuntimeError):
|
|||
return self.error_info
|
||||
|
||||
|
||||
class AttrCheckError(RuntimeError):
|
||||
"""属性检查异常"""
|
||||
|
||||
def __init__(self, error_info):
|
||||
self.error_info = error_info
|
||||
|
||||
def __str__(self):
|
||||
return self.error_info
|
||||
|
||||
|
||||
class CheckFailed(RuntimeError):
|
||||
"""检查异常"""
|
||||
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
|
||||
class SpecObject(object):
|
||||
"""自定义类"""
|
||||
"""自定义父类"""
|
||||
|
||||
fields_map = {}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import re
|
||||
|
||||
from Utils.ErrorUtil import ReturnConditionCheckFailed
|
||||
from Utils.ErrorUtil import AttrCheckError
|
||||
|
||||
|
||||
class Validate(object):
|
||||
|
@ -59,71 +59,62 @@ class Validate(object):
|
|||
class ValidateAttr(object):
|
||||
"""对象属性值检查"""
|
||||
|
||||
FIELD_ERROR_INFO = '字段【{}】异常'
|
||||
FILED_MAP_ERROR = '字段未映射完整'
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
self.code = kwargs['error_code'] if 'error_code' in kwargs else 202
|
||||
|
||||
if 'error_info' in kwargs:
|
||||
self.info = kwargs['error_info']
|
||||
elif 'mark' in kwargs:
|
||||
self.info = self.FIELD_ERROR_INFO.format(kwargs['mark'])
|
||||
else:
|
||||
self.info = None
|
||||
self.error_info = kwargs['error_info'] if 'error_info' in kwargs else None
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
if self.kwargs['field'] in instance.__dict__:
|
||||
return instance.__dict__[self.kwargs['field']]
|
||||
|
||||
def __set__(self, instance, value):
|
||||
try:
|
||||
if not self.info:
|
||||
if instance.fields_map.__contains__(self.kwargs['field']):
|
||||
self.info = self.FIELD_ERROR_INFO.format(instance.fields_map[self.kwargs['field']])
|
||||
except AttributeError:
|
||||
raise ReturnConditionCheckFailed(self.FILED_MAP_ERROR, self.code)
|
||||
|
||||
if not self.error_info:
|
||||
if instance.fields_map.__contains__(self.kwargs['field']):
|
||||
info_template = '赋值异常: {}; 请检查: {};'
|
||||
self.error_info = info_template.format(
|
||||
self.kwargs['field'],
|
||||
instance.fields_map[self.kwargs['field']]
|
||||
)
|
||||
|
||||
if 'type' in self.kwargs:
|
||||
"""检查实例的属性类型"""
|
||||
if type(self.kwargs['type']) is list:
|
||||
if type(value) not in self.kwargs['type']:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
else:
|
||||
if not isinstance(value, self.kwargs['type']):
|
||||
if value is not None and 'default' not in self.kwargs:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
|
||||
if 'length' in self.kwargs:
|
||||
"""检查实例的属性值长度(一般是str类型)"""
|
||||
if len(value) != self.kwargs['length']:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
|
||||
if 'in_list' in self.kwargs:
|
||||
"""检查实例属性是否包含于列表中(属性有列表和非列表两种情况)"""
|
||||
if type(value) is not list:
|
||||
if value not in self.kwargs['in_list']:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
else:
|
||||
for item in value:
|
||||
if item not in self.kwargs['in_list']:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
|
||||
if 'instance_list' in self.kwargs:
|
||||
"""检查实例列表"""
|
||||
if type(value) is not list:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
else:
|
||||
for item in value:
|
||||
if not isinstance(item, self.kwargs['instance_list']):
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
|
||||
if 'func' in self.kwargs:
|
||||
"""属性检查函数"""
|
||||
if not list(map(self.kwargs['func'], [value]))[0]:
|
||||
raise ReturnConditionCheckFailed(self.info, self.code)
|
||||
raise AttrCheckError(self.error_info)
|
||||
|
||||
if 'default' in self.kwargs:
|
||||
"""实例属性默认值"""
|
||||
|
|
|
@ -7,4 +7,5 @@ gevent
|
|||
pymongo~=3.11.0
|
||||
requests~=2.25.1
|
||||
pandas~=1.3.5
|
||||
pycryptodome
|
||||
pycryptodome
|
||||
xlrd~=1.2.0
|
Loading…
Reference in New Issue