Initial commit

This commit is contained in:
王思川 2022-05-24 02:25:59 +08:00
commit 97c66eeeaf
26 changed files with 1611 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.idea
/venv/
*/__pycache__/
*.cpython-38.pyc

14
.gitlab-ci.yml Normal file
View File

@ -0,0 +1,14 @@
stages:
- deploy
job:
stage: deploy
script:
- docker stop guarantee_admin_api_v0.2
- docker rm guarantee_admin_api_v0.2
- docker build -t guarantee_admin_api_v0.2 .
- docker run -d -p 51011:51011 --name guarantee_admin_api_v0.2 -v /etc/timezone:/etc/timezone:ro -v /etc/localtime:/etc/localtime:ro guarantee_admin_api_v0.2
only:
- master
tags:
- guarantee_admin_api_v0.2

12
DBHelper/DBConfig.json Normal file
View File

@ -0,0 +1,12 @@
{
"MongoDB": {
"guarantee": "root:RYIHrqml#LSW6#!*@116.63.130.34:27020",
"tyc": "root:gP@DwMSVd5Sh6EiH@116.63.130.34:27019"
},
"Mysql": {
},
"Redis": {
}
}

232
DBHelper/MongoHelper.py Normal file
View File

@ -0,0 +1,232 @@
import re
import os
import json
import gridfs
import pymongo
from urllib import parse
from bson import ObjectId
from gridfs import GridFS
class MongoHelper:
def __init__(self, param):
"""
param:
typestr
desc: 选择连接哪个MongoDB数据库
"""
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
db_configs = json.load(f)
this_mongo_cfg = db_configs['MongoDB'][param]
m = re.match('([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg)
parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)), m.group(3))
self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config))
def find_single_column(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据的某个指定字段值
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: str 一个指定查询字段
return:
type: None or dict
desc: 查询结果为空返回None; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
column = {**{'_id': False}, **{param4: 1}}
record = list(collection.find(param3, column))
return None if record == [] else record[0][param4]
def find_single_data(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns))
return False if record == [] else record[0]
def find_single_data_with_single_sort(self, param1, param2, param3, param4, param5):
"""
查询符合条件的第一条数据,按单个排序条件返回
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(1))
return False if record == [] else record[0]
def find_all_data(self, param1, param2, param3, param4):
"""
查询符合条件的所有数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: list
desc: 查询结果
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns))
return record
def find_all_data_with_count(self, param1, param2, param3):
"""
查询所有符合条件的数据并返回统计数量
param1: str 数据库
param2: str 数据表
param3: str 查询条件
return: int 符合条件的数据数量
"""
collection = self.client[param1][param2]
num = collection.find(param3).count()
return num
def find_all_data_with_single_sort(self, param1, param2, param3, param4, param5):
"""
查询符合条件的数据,按单个排序条件返回
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]))
return False if record == [] else record
def find_data_with_aggregate(self, param1, param2, param3):
"""
根据聚合条件查询
param1: str 数据库
param2: str 数据集
param3:
type: list
desc: 聚合条件
demo: [{'$match':{'price':{'$gte':50}}}, {'$group': {'_id': "$fName", 'count': {'$sum': 1}}}]
"""
collection = self.client[param1][param2]
data = list(collection.aggregate(param3))
return data
def find_data_by_page_with_sort(self, param1, param2, param3, param4, param5, param6, param7):
"""
根据聚合翻页查询,且按照需求字段排序返回
param1: str 数据库
param2: str 数据集
param3: dict 查询条件
param4: list 显示字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
param6: int page_size 每页数据条数
param7: int page_no 当前页码
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
page_size = int(param6)
page_no = int(param7)
skip_num = page_size * (page_no - 1)
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(page_size).skip(skip_num))
return False if record == [] else record
def insert_single_data(self, param1, param2, param3):
"""
插入一条数据
param1: str 数据库
param2: str 数据集
param3: obj 插入数据
return: None
"""
collection = self.client[param1][param2]
collection.insert_one(param3)
def upsert_single_data(self, param1, param2, param3, param4):
"""
插入单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$set": param4}, upsert=True)
def update_single_data(self, param1, param2, param3, param4):
"""
插入单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$set": param4})
def delete_single_data(self, param1, param2, param3):
"""
根据查询条件删除一条文档
param1: str 数据库
param2: str 数据集
param3: obj 查询条件
return: None
"""
collection = self.client[param1][param2]
collection.delete_one(param3)
return True
def find_file(self, param1, param2, param3):
"""
读取一个文件
param1: str 数据库
param2: str 存储桶
param3: str 文件id
return:
type: binary?
desc: 二进制文件流
"""
try:
# 实例化一个文件存储器
gfs = GridFS(self.client[param1], collection=param2)
# 二进制读取文件
data_stream = gfs.get(ObjectId(param3)).read()
# 返回文件二进制流
return data_stream
except gridfs.errors.NoFile:
return False
def delete_file(self, param1, param2, param3):
"""
根据id删除文件
param1: str 数据库
param2: str 存储桶
param3: str 文件fid
"""
fs = GridFS(self.client[param1], param2)
fs.delete(ObjectId(param3))

View File

@ -0,0 +1,7 @@
from DBHelper.MongoHelper import MongoHelper
# 天眼查数据库
DB_TYC = MongoHelper("tyc")
# 股交项目数据库
DB_GUA = MongoHelper("guarantee")

0
DBHelper/__init__.py Normal file
View File

6
Dockerfile Normal file
View File

@ -0,0 +1,6 @@
FROM python:3.8
WORKDIR /usr/src/app/guarantee_admin_api_v0_2
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.mirrors.ustc.edu.cn/simple
COPY . .
CMD ["gunicorn", "app:app", "-c", "./gunicorn.conf.py"]

View File

@ -0,0 +1,95 @@
import functools
from flask import request
from itsdangerous import Serializer
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
from DBHelper.MongoHelperInstance import DB_GUA
TOKEN_KEY = "P0eAym@&CbaQWWkq"
TOKEN_EXPIRE = 60*60*8
def create_token(param):
"""
创建token
Parameters:
param: 传入参数用于创建token
Returns:
token: 用户访问令牌
"""
s = Serializer(TOKEN_KEY, expires_in=TOKEN_EXPIRE)
token = '' + s.dumps(param).decode('ascii')
return token
def verify_token(func):
"""
校验token
return:
type:str
desc: token被解析后的值
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
# step 1.1
# 解析请求头传送的token
s = Serializer(TOKEN_KEY)
session_id = s.loads(request.headers.get('token'))
# step 1.2
# 请求头中没有token参数 返回错误提示
if session_id is None:
return {"info": "缺少token"}, 401
# step 2.1
# token解析成功 从token记录中查询session_id对应的uid
uid = DB_GUA.find_single_column(
"管理端",
"token记录",
{"session_id": session_id},
"UID"
)
# step 2.2
# 根据session_id没有找到对应的uid 返回错误提示
if not uid:
return {"info": "提示: 账号已在别处登录"}, 401
except TypeError:
return {"info": "异常token"}, 401
except KeyError:
return {"info": "异常token"}, 401
except BadSignature:
return {"info": "错误token"}, 401
except SignatureExpired:
return {"info": "过期token"}, 401
return func(*args, **kwargs, uid=uid)
return internal
def authority_scope(scope):
def decorate(func):
@functools.wraps(func)
def internal(*args, ** kwargs):
records = DB_GUA.find_single_data(
"管理端",
"用户",
{"UID": kwargs['uid']},
['status', 'role']
)
if not records:
return {"info": "提示: 账户不存在"}, 401
if records['status'] != "normal":
return {"info": "提示: 账户已被禁用"}, 401
if records['role'] not in scope:
return {"info": "提示: 没有此项操作权限"}, 401
return func(*args, ** kwargs)
return internal
return decorate

View File

@ -0,0 +1,318 @@
import json
import os
import time
import random
import requests
from werkzeug.security import check_password_hash
from DBHelper.MongoHelperInstance import DB_GUA
from Utils.ErrorUtil import ReturnConditionCheckFailed
from Modules.AdminUser.UserUtils import decrypt_data
from Modules.AdminUser.UserAuthUtils import create_token
from Modules.AdminUser.UserObject import UserManage, ListUser, User, UserLogin, SendLoginVcodeEmail
class ListUserImpl(ListUser):
"""用户列表实现"""
def list(self):
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if "uid" in search_keys:
body['UID'] = param['uid']
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
else:
pass
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
body = {"create_time": -1}
return body
search_body = make_search_body(self.search)
sort = make_sort_body(self.sort)
page_size = 10 if self.page_size > 10 else self.page_size
page_no = int(self.page_no)
total = DB_GUA.find_all_data_with_count(
"管理端",
"用户",
search_body
)
records = DB_GUA.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
table_data = list()
if records:
for record in records:
user = User()
user.uid = record['UID']
user.email = record['email']
user.name = record['name']
user.status = record['status']
user.role = record['role']
user.create_time = record['create_time']
table_data.append(user.dict_to_show())
result = {
"records": table_data,
"total": total
}
return result
class UserLoginImpl(UserLogin):
"""用户登录实现"""
def login(self):
""""""
user_info = DB_GUA.find_single_data(
"管理端",
"用户",
{"email": self.email},
["UID", "name", "pwd", "status", "role"]
)
def check_email_existed():
if not user_info:
raise ReturnConditionCheckFailed("邮箱不存在", 200)
def check_user_is_disable():
if user_info['status'] != 'normal':
raise ReturnConditionCheckFailed("账户已禁用", 200)
def check_vcode_is_correct():
record = DB_GUA.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
["vcode", "timestamp"]
)
if not record:
raise ReturnConditionCheckFailed("验证码不存在", 200)
if record['vcode'] != self.vcode:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - record['timestamp'] > 300:
raise ReturnConditionCheckFailed("验证码过期", 200)
DB_GUA.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email}
)
def check_pwd_is_correct():
try:
if not check_password_hash(user_info['pwd'], decrypt_data(encrypt_msg=self.pwd)):
raise ReturnConditionCheckFailed("密码错误", 200)
except Exception:
raise ReturnConditionCheckFailed("密码错误", 200)
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
self.menus = duties[role]
def make_token():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
DB_GUA.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
self.token = create_token(session_id)
def __main__():
check_email_existed()
check_user_is_disable()
check_pwd_is_correct()
check_vcode_is_correct()
make_menus()
make_token()
self.name = user_info['name']
return self.dict_to_show()
return __main__()
class SendLoginVcodeEmailImpl(SendLoginVcodeEmail):
"""发送登录验证码邮件 实现"""
def send_vcode_email(self):
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
self.vcode = salt
def save_vcode():
DB_GUA.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
{"vcode": self.vcode, "timestamp": round(time.time())}
)
def send_email():
email_api = 'http://116.63.130.34:30001'
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", self.vcode)
}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
def __main__():
gen_vcode()
save_vcode()
send_email()
return __main__()
class UserManageImpl(UserManage):
def create(self):
def check_email_registered():
"""检查邮箱是否已注册"""
email_is_existed = DB_GUA.find_single_column(
"管理端",
"用户",
{"email": self.email},
"email"
)
if email_is_existed:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def generate_new_uid():
"""生成新的用户ID"""
def uid_maker(num):
"""
用户ID生成器
num: ID长度 int
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def uid_checker(__uid):
"""
用户ID重复检查
__uid: 用户ID str
return:
True 用户ID可用
False 用户ID不可用
"""
is_uid_existed = DB_GUA.find_single_column(
"管理端",
"用户",
{"UID": __uid},
"UID"
)
return True if is_uid_existed is None else False
uid = uid_maker(8)
while not uid_checker(uid):
uid = uid_maker(8)
self.uid = uid
def __main__():
check_email_registered()
generate_new_uid()
self.status = "normal"
self.create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
DB_GUA.upsert_single_data(
"管理端",
"用户",
{"UID": self.uid},
self.dict_to_save()
)
__main__()
def disable(self):
"""禁用用户 实现"""
DB_GUA.update_single_data(
"管理端",
"用户",
{"UID": self.uid},
{"status": self.status}
)
def active(self):
"""激活用户 实现"""
DB_GUA.update_single_data(
"管理端",
"用户",
{"UID": self.uid},
{"status": self.status}
)

View File

@ -0,0 +1,287 @@
import re
from Utils.ErrorUtil import ReturnConditionCheckFailed
from Modules.AdminUser.UserUtils import decrypt_data
class User:
"""管理端用户"""
def __init__(self):
self.uid = None # 用户ID str
self.email = None # 邮箱 str
self.name = None # 用户名 str
self.pwd = None # 密码 str
self.status = None # 用户状态 str
self.role = None # 角色 str
self.create_time = None # 创建时间 str
def check_uid(self):
"""用户ID校验"""
if type(self.uid) is not str:
raise ReturnConditionCheckFailed("用户ID格式错误", 200)
if len(self.uid) != 8:
raise ReturnConditionCheckFailed("用户ID格式错误", 200)
def check_email(self):
"""邮箱格式仅允许@fecr.com.cn"""
regex = "^.+\\@fecr.com.cn"
case = (len(self.email) > 7) and (re.match(regex, self.email) is not None)
result = True if case else False
if not result:
raise ReturnConditionCheckFailed("邮箱格式错误", 200)
def check_name(self):
"""用户名格式校验"""
if type(self.name) is not str:
raise ReturnConditionCheckFailed("用户名格式错误", 200)
def check_pwd(self):
"""密码格式校验"""
if type(self.pwd) is not str:
raise ReturnConditionCheckFailed("密码格式错误", 200)
password = decrypt_data(encrypt_msg=self.pwd)
if not password:
raise ReturnConditionCheckFailed("密码格式错误", 200)
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
if not result:
raise ReturnConditionCheckFailed("密码格式错误", 200)
def check_status(self):
"""用户状态校验"""
if self.status not in ['normal', 'disable']:
raise ReturnConditionCheckFailed("用户状态格式错误", 200)
def check_role(self):
"""用户角色校验"""
if self.role not in ['admin', 'analysts', 'developer', 'operator', 'guest']:
raise ReturnConditionCheckFailed("用户角色格式错误", 200)
def check_obj(self, **kwargs):
"""对象字段校验"""
columns = kwargs['columns']
for column in columns:
if column == "uid":
self.check_uid()
elif column == "email":
self.check_email()
elif column == "name":
self.check_name()
elif column == "pwd":
self.check_pwd()
elif column == "status":
self.check_status()
elif column == "role":
self.check_role()
else:
pass
def trans_status(self):
"""用户状态转换成前端显示格式"""
return "正常" if self.status == "normal" else "停用"
def trans_role(self):
"""用户角色转换成前端显示格式"""
role_map = {"admin": "管理员", "analysts": "分析师", "developer": "开发者", "operator": "运营人员", "guest": "访客"}
return role_map[self.role]
def dict_to_save(self, **kwargs):
"""存储对象"""
user_dict = {
"UID": self.uid,
"email": self.email,
"name": self.name,
"pwd": self.pwd,
"status": self.status,
"role": self.role,
"create_time": self.create_time
}
if 'columns' in list(kwargs.keys()):
user_dict = {key: user_dict[key] for key in kwargs['columns']}
return user_dict
def dict_to_show(self, **kwargs):
"""显示对象"""
user_dict = {
"uid": self.uid,
"邮箱": self.email,
"姓名": self.name,
"状态": self.trans_status(),
"角色": self.trans_role(),
"创建时间": self.create_time
}
if 'columns' in list(kwargs.keys()):
user_dict = {key: user_dict[key] for key in kwargs['columns']}
return user_dict
class ListUser:
"""用户列表"""
def __init__(self, search, sort, page_size, page_no):
self.search = search
self.sort = sort
self.page_size = page_size
self.page_no = page_no
def list(self):
"""查询用户列表"""
def check_search(self):
"""
type: dict
desc: 用户搜索条件 支持模糊查询条件
demo: {"姓名": "xxx", "邮箱": "xxx"}
"""
if type(self.search) is not dict:
raise ReturnConditionCheckFailed("参数 search 格式错误", 400)
search_keys = list(self.search.keys())
if "uid" in search_keys:
if type(self.search['UID']) is not str:
raise ReturnConditionCheckFailed("参数 search.uid 格式错误", 400)
if "姓名" in search_keys:
if type(self.search['姓名']) is not str:
raise ReturnConditionCheckFailed("参数 search.姓名 格式错误", 400)
if "邮箱" in search_keys:
if type(self.search['邮箱']) is not str:
raise ReturnConditionCheckFailed("参数 search.邮箱 格式错误", 400)
if "状态" in search_keys:
if type(self.search['状态']) is not str:
raise ReturnConditionCheckFailed("参数 search.状态 格式错误", 400)
if self.search['状态'] not in ["正常", "停用"]:
raise ReturnConditionCheckFailed("参数 search.状态 格式错误", 400)
if "角色" in search_keys:
if type(self.search['角色']) is not list:
raise ReturnConditionCheckFailed("参数 search.角色 格式错误", 400)
for role in self.search['角色']:
if role not in ["管理员", "分析师", "开发者", "运营人员", "访客"]:
raise ReturnConditionCheckFailed("参数 search.角色 格式错误", 400)
def check_sort(self):
"""
type: dict
desc: 排序条件 asc正序 desc倒序
demo: {"姓名": asc}
"""
if type(self.sort) is not dict:
raise ReturnConditionCheckFailed("参数 sort 格式错误", 400)
if len(list(self.sort.values())) > 1:
raise ReturnConditionCheckFailed("参数 sort 格式错误", 400)
if len(list(self.sort.values())) == 1 and list(self.sort.values())[0] not in ["asc", "desc"]:
raise ReturnConditionCheckFailed("参数 sort 格式错误", 400)
def check_page_size(self):
"""
type: int
desc: 每页数据数量
"""
if type(self.page_size) is not int:
raise ReturnConditionCheckFailed("参数 page_size 格式错误", 400)
def check_page_no(self):
"""
type: int
desc: 当前页数
"""
if type(self.page_no) is not int:
raise ReturnConditionCheckFailed("参数 page_no 格式错误", 400)
def check_obj(self):
""" """
self.check_search()
self.check_sort()
self.check_page_size()
self.check_page_no()
class UserLogin(User):
"""用户登录"""
def __init__(self):
super().__init__()
self.vcode = None
self.token = None
self.menus = None
def login(self):
"""登录"""
def check_vcode(self):
"""检查验证码格式"""
if type(self.vcode) is not str:
raise ReturnConditionCheckFailed("验证码格式错误", 200)
def check_obj(self):
""""""
self.check_email()
self.check_vcode()
def dict_to_show(self):
"""显示对象"""
show_dict = {
"name": self.name,
"token": self.token,
"menus": self.menus
}
return show_dict
class SendLoginVcodeEmail(User):
"""发送登录验证码邮件"""
def __init__(self):
super().__init__()
self.vcode = None
def check_obj(self):
self.check_email()
class UserManage(User):
"""用户管理"""
def create(self):
"""创建新用户"""
def disable(self):
"""禁用用户"""
def active(self):
"""激活用户"""
def delete_user(self):
"""删除用户"""
def manage_role_of_user(self):
"""管理用户角色"""

View File

@ -0,0 +1,137 @@
from flask import Blueprint, request
from Utils.ErrorUtil import ReturnConditionCheckFailed
from Modules.AdminUser.UserImpl import UserManageImpl, ListUserImpl, UserLoginImpl, SendLoginVcodeEmailImpl
from Modules.AdminUser.UserAuthUtils import verify_token, authority_scope
user_route = Blueprint('user', __name__)
@user_route.route('/online_check', methods=['GET'])
@verify_token
def online_check_route(**kwargs):
"""
在线检查检查token是否有效
"""
return {"info": "正常"}, 200
@user_route.route('/create', methods=['POST'])
@verify_token
@authority_scope(['admin'])
def create_user_route(**kwargs):
"""新建用户"""
try:
req = request.json
user_manage = UserManageImpl()
user_manage.email, user_manage.name, user_manage.pwd, user_manage.role = req['email'], req['name'], req['pwd'], req['role']
user_manage.check_obj(columns=["email", "name", "pwd", "role"])
user_manage.create()
return {"info": "用户创建成功"}
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/list', methods=['POST'])
@verify_token
@authority_scope(['admin'])
def list_user_route(**kwargs):
"""
用户信息列表接口
"""
try:
req = request.json
list_user = ListUserImpl(
req['search'],
req['sort'],
req['page_size'],
req['page_no']
)
list_user.check_obj()
result = list_user.list()
return {"info": '查询结果', "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/login', methods=['POST'])
def login_route():
"""登录"""
try:
req = request.json
user_login = UserLoginImpl()
user_login.email, user_login.pwd, user_login.vcode = req['email'], req['pwd'], req['vcode']
user_login.check_obj()
user_login.login()
result = user_login.dict_to_show()
return {"info": "登录成功", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/send_vcode', methods=['POST'])
def send_vcode_to_user_route():
"""发送验证码"""
try:
req = request.json
obj = SendLoginVcodeEmailImpl()
obj.email = req['email']
obj.check_obj()
obj.send_vcode_email()
return {"info": "发送成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/disable', methods=['GET'])
@verify_token
@authority_scope(['admin'])
def disable_user_route(**kwargs):
"""停用用户接口"""
try:
user = UserManageImpl()
user.uid = request.args['UID']
user.status = "disable"
user.check_obj(columns=["uid", "status"])
user.disable()
return {"info": "已停用该用户"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/active', methods=['GET'])
@verify_token
@authority_scope(['admin'])
def active_user_route(**kwargs):
"""激活用户接口"""
try:
user = UserManageImpl()
user.uid = request.args['UID']
user.status = "normal"
user.check_obj(columns=["uid", "status"])
user.active()
return {"info": "已激活该用户"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/delete', methods=['GET'])
@verify_token
@authority_scope(['admin'])
def delete_user_route(**kwargs):
"""删除用户接口"""
return {"info": "调整中"}, 200
@user_route.route('/manage_role', methods=['POST'])
@verify_token
@authority_scope(['admin'])
def manage_role_of_user_route(**kwargs):
"""管理用户角色接口"""
return {"info": "调整中"}, 200

View File

@ -0,0 +1,80 @@
import os
import re
import base64
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
def encrypt_data(**kwargs):
"""
用公钥加密
Parameters:
msg str 待加密信息
Returns:
加密后结果
"""
msg = kwargs['msg'] # 待加密信息
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f:
data = f.read()
public_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(public_key)
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
return encrypt_text.decode('utf-8')
def decrypt_data(**kwargs):
"""
用私钥解密
Parameters:
encrypt_msg str 加密信息
Returns:
执行正确 解密后结果
执行错误 False
"""
try:
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f:
data = f.read()
private_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
except Exception:
return False
def check_mail_fmt(email):
"""
邮箱地址格式校验仅允许@fecr.com.cn
Parameters:
email: 邮箱
Returns:
result: 邮箱校验结果正确返回True不正确返回False
"""
regex = "^.+\\@fecr.com.cn"
case = (len(email) > 7) and (re.match(regex, email) is not None)
result = True if case else False
return result
def check_pwd_fmt(pwd):
"""
密码强度校验
Parameters:
pwd: 密码已加密
Returns:
result: 密码强度校验结果正确返回True不正确返回False
"""
password = decrypt_data(encrypt_msg=pwd)
if not password:
return False
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
return result

View File

View File

@ -0,0 +1,6 @@
{
"admin": ["Board", "Company", "Rating", "Setting"],
"Analysts": ["Board", "Company", "Rating"],
"guest": ["Board"]
}

View File

@ -0,0 +1,15 @@
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQDLoijMi4Ng7TkN+Dqc4nHgnnSq7y4AjPGd3C7qLej1mXBvh2wp
jNlpMIlIfhBIsOHW3+H/VmuCpWBtWk73P7VhkAdqiMZOC9OWBEwVuzNOPid+/Yjo
Guppz4YAB/sIhWUdiVoB1866/HnYHbf/+5sVx1Nvh8Vp85sgOZchIdmS/wIDAQAB
AoGBAKr+89W3vc4Rxlxluwps2QWu6fd0O6P4txhBgh/iB4Ldo130US6e+R5sxItc
WN5BspOWkxewgT1HFC8fq7nSBMN0ijG1j/+XIBRyWX6TwHppDPmN6r0MTAjwVNRz
MEVJIyHxlLOpC8B+n+knG5xKMhmks+BtPlYbLmqSf5a4pLZZAkEA+asJMyaJx0lG
yqFClprAWA65UtUlpjbdHI6xQer8NB0/PuSz3FL8b0EfVrAxqM1CpDxcHR3mFoVs
Yyk/4DAGSwJBANDMPpPjoWI3IPiLAdJ5POjIn86CkUSe1ZEucis2NEnC9UDUq0sG
xgG0SsI8UolZQdcnCRDfN92Q7bl1FG3qxZ0CQFKo//jBb6hdaGS6E1PNlJUS+uSj
0T1AuOA1lhZe+HVAoanvCmWNzCoBg6Ct4SMkIkZB/bVeKsmWDxadl5pgDBUCQQDA
EyN96GkNcKlj+nwyolTlz6kyz+nStkrAw3lDRxnSwQXcHcd1vUVpS/F5vQQwVWu8
AKzWWIGL0Ube1FV2yAIBAkBfc2UyKnrJ9zG9l1E199/5l6m9U2Vhexl/b0DNSz4L
v/PNIsO3aeKXvnnRxCBCdiS7/ve4XRh+mBdfyvM93dz9
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,6 @@
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDLoijMi4Ng7TkN+Dqc4nHgnnSq
7y4AjPGd3C7qLej1mXBvh2wpjNlpMIlIfhBIsOHW3+H/VmuCpWBtWk73P7VhkAdq
iMZOC9OWBEwVuzNOPid+/YjoGuppz4YAB/sIhWUdiVoB1866/HnYHbf/+5sVx1Nv
h8Vp85sgOZchIdmS/wIDAQAB
-----END PUBLIC KEY-----

0
Modules/__init__.py Normal file
View File

27
Utils/CommonUtil.py Normal file
View File

@ -0,0 +1,27 @@
def sub_dict(param1, param2):
"""
获取字典的子集
Parameters:
param1: 原字典
param2: 子集字段
Returns:
子集
"""
return dict((key, value) for key, value in param1.items() if key in param2)
def df_iterrows(param):
"""
按行以数组形式返回DataFrame的indexdata
Parameters:
param: DataFrame 某个df对象
Returns:
result: list 遍历df对象每行数据包括index
"""
result = []
for row in param.iterrows():
index, data = row
result.append([index] + data.tolist())
return result

90
Utils/ErrorUtil.py Normal file
View File

@ -0,0 +1,90 @@
import time
import traceback
from flask import request
from werkzeug.exceptions import BadRequest
from DBHelper.MongoHelperInstance import DB_GUA
def get_req_detail(req, param_type):
"""
获取请求参数信息
若异常请求则返回空字符串
"""
try:
if param_type == 'args':
return req.args.__str__()
elif param_type == 'json':
return req.json.__str__()
else:
return ''
except BadRequest:
return ''
class JustThrowError(RuntimeError):
"""自定义抛出异常信息"""
def __init__(self, error_info):
self.error_info = error_info
def __str__(self):
return self.error_info
class APIReturnError(RuntimeError):
"""接口返回异常信息"""
def __init__(self, error_info, status_code):
self.error_info = error_info # 异常信息
self.status_code = status_code # 状态码
def __str__(self):
return self.error_info
class CheckFailed(RuntimeError):
"""检查异常"""
def __init__(self, failed_info, status_code):
self.failed_info = failed_info # 失败信息
self.status_code = status_code # 状态码
def __str__(self):
return self.failed_info
def log_error(self):
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": get_req_detail(request, 'args'),
"json": get_req_detail(request, 'json')
},
"traceback": traceback.format_exc(),
"exception": type(self).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
DB_GUA.insert_single_data(
"日志",
"异常日志",
info
)
class LogConditionCheckFailed(CheckFailed):
"""直接记录检查异常"""
def __init__(self, failed_info, status_code):
self.failed_info = failed_info # 失败信息
self.status_code = status_code # 状态码
self.log_error()
class ReturnConditionCheckFailed(CheckFailed):
"""条件检查失败 抛出异常 接口返回失败原因和状态码"""

83
Utils/ObjUtil.py Normal file
View File

@ -0,0 +1,83 @@
class SpecObject(object):
"""自定义类"""
fields_map = {}
def dict_keys_toggle(self, **kwargs):
"""字典键值切换"""
_dict_ = dict()
default_types = ['str', 'int', 'float', 'dict', 'bool', 'tuple']
for key in self.__dict__.keys():
if key in self.fields_map.keys():
# 常规类型
if type(self.__dict__[key]).__name__ in default_types:
_dict_[self.fields_map[key]] = self.__dict__[key]
# 列表类型
elif type(self.__dict__[key]).__name__ == 'list':
if len(self.__dict__[key]) == 0:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key][0]).__name__ in default_types:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = [item.dict_keys_toggle() for item in self.__dict__[key]]
# 空值类型
elif self.__dict__[key] is None:
_dict_[self.fields_map[key]] = self.__dict__[key]
# 对象类型
else:
_dict_[self.fields_map[key]] = self.__dict__[key].dict_keys_toggle()
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
def dict_to_show(self, **kwargs):
"""字典显示格式"""
return self.dict_keys_toggle(**kwargs)
def dict_to_save(self, **kwargs):
"""字典存储格式"""
return self.dict_keys_toggle(**kwargs)
def dict_to_return(self, **kwargs):
"""字典返回格式"""
@staticmethod
def dict_to_set(**kwargs):
"""实例设值"""
instance = kwargs['instance']()
fields_map = dict([v, k] for k, v in instance.fields_map.items())
for field in list(kwargs['data'].keys()):
instance.__setattr__(fields_map[field], kwargs['data'][field])
return instance
@staticmethod
def instance_list_to_set(**kwargs):
"""实例数组设值"""
list_ = list()
for item in list(kwargs['data']):
instance = kwargs['instance']()
fields_map = dict([v, k] for k, v in instance.fields_map.items())
for field in list(item.keys()):
instance.__setattr__(fields_map[field], item[field])
list_.append(instance)
return list_
@staticmethod
def get_attr(_dict_, _key_, **kwargs):
"""获取字典属性值"""
value = kwargs['default'] if kwargs.__contains__('default') else None
try:
value = _dict_[_key_]
except KeyError:
pass
except TypeError:
pass
return value

21
Utils/RouteUtil.py Normal file
View File

@ -0,0 +1,21 @@
from Utils.ErrorUtil import APIReturnError
class RouteUtil(object):
"""路由工具"""
@staticmethod
def require_params_check(req_body, req_params):
"""必需参数检查"""
if not isinstance(req_body, dict):
raise Exception
if not isinstance(req_params, list):
raise Exception
params_in_req_body = list(req_body.keys())
for param in req_params:
if param not in params_in_req_body:
error_info = "缺失必需参数: {}".format(param)
raise APIReturnError(error_info=error_info, status_code=200)

133
Utils/ValidateUtil.py Normal file
View File

@ -0,0 +1,133 @@
import re
from Utils.ErrorUtil import ReturnConditionCheckFailed
class Validate(object):
"""常用格式检查"""
@staticmethod
def email(param):
"""邮箱格式"""
regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
case = (len(param) > 7) and (re.match(regex, param) is not None)
result = True if case else False
return result
@staticmethod
def password(param):
"""密码格式"""
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(param) >= 8) and (re.match(regex, param) is not None)
return True if case else False
@staticmethod
def telephone(param):
"""手机号格式"""
regex = "(^(13[0-9]|14[01456879]|15[0-3,5-9]|16[2567]|17[0-8]|18[0-9]|19[0-3,5-9])d{8}$)"
case = re.match(regex, param) is not None
return True if case else False
@staticmethod
def year_format(param):
"""yyyy年"""
regex = "\d{4}"
case = re.match(regex, param) is not None
return True if case else False
@staticmethod
def date_format(param):
"""yyyy-mm-dd"""
regex = "\d{4}-\d{1,2}-\d{1,2}"
case = re.match(regex, param) is not None
return True if case else False
@staticmethod
def time_format(param):
"""yyyy-mm-dd hh:mm:ss"""
regex = "\d{4}-\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2}"
case = re.match(regex, param) is not None
return True if case else False
@staticmethod
def image(param):
"""图片格式"""
mimetype = param.mimetype
return True if mimetype in ['image/jpeg', 'image/png'] else False
class ValidateAttr(object):
"""对象属性值检查"""
FIELD_ERROR_INFO = '字段【{}】异常'
FILED_MAP_ERROR = '字段未映射完整'
def __init__(self, **kwargs):
self.kwargs = kwargs
self.code = kwargs['error_code'] if 'error_code' in kwargs else 202
if 'error_info' in kwargs:
self.info = kwargs['error_info']
elif 'mark' in kwargs:
self.info = self.FIELD_ERROR_INFO.format(kwargs['mark'])
else:
self.info = None
def __get__(self, instance, owner):
if self.kwargs['field'] in instance.__dict__:
return instance.__dict__[self.kwargs['field']]
def __set__(self, instance, value):
try:
if not self.info:
if instance.fields_map.__contains__(self.kwargs['field']):
self.info = self.FIELD_ERROR_INFO.format(instance.fields_map[self.kwargs['field']])
except AttributeError:
raise ReturnConditionCheckFailed(self.FILED_MAP_ERROR, self.code)
if 'type' in self.kwargs:
"""检查实例的属性类型"""
if type(self.kwargs['type']) is list:
if type(value) not in self.kwargs['type']:
raise ReturnConditionCheckFailed(self.info, self.code)
else:
if not isinstance(value, self.kwargs['type']):
if value is not None and 'default' not in self.kwargs:
raise ReturnConditionCheckFailed(self.info, self.code)
if 'length' in self.kwargs:
"""检查实例的属性值长度(一般是str类型)"""
if len(value) != self.kwargs['length']:
raise ReturnConditionCheckFailed(self.info, self.code)
if 'in_list' in self.kwargs:
"""检查实例属性是否包含于列表中(属性有列表和非列表两种情况)"""
if type(value) is not list:
if value not in self.kwargs['in_list']:
raise ReturnConditionCheckFailed(self.info, self.code)
else:
for item in value:
if item not in self.kwargs['in_list']:
raise ReturnConditionCheckFailed(self.info, self.code)
if 'instance_list' in self.kwargs:
"""检查实例列表"""
if type(value) is not list:
raise ReturnConditionCheckFailed(self.info, self.code)
else:
for item in value:
if not isinstance(item, self.kwargs['instance_list']):
raise ReturnConditionCheckFailed(self.info, self.code)
if 'func' in self.kwargs:
"""属性检查函数"""
if not list(map(self.kwargs['func'], [value]))[0]:
raise ReturnConditionCheckFailed(self.info, self.code)
if 'default' in self.kwargs:
"""实例属性默认值"""
if value is None:
value = self.kwargs['default']
instance.__dict__[self.kwargs['field']] = value

0
Utils/__init__.py Normal file
View File

20
app.py Normal file
View File

@ -0,0 +1,20 @@
from flask import Flask
from flask_cors import *
from Modules.AdminUser.UserRoutes import user_route
app = Flask(__name__)
CORS(app, supports_credentials=True)
app.config['JSON_SORT_KEYS'] = False
app.register_blueprint(user_route, url_prefix='/admin/user')
@app.route('/admin/version')
def hello_world():
return 'WideRating Guarantee Admin API v0.2'
if __name__ == '__main__':
app.run()

8
gunicorn.conf.py Normal file
View File

@ -0,0 +1,8 @@
APP_PORT = 51014
# 并行工作进程数
workers = 10
# 监听内网端口
bind = '0.0.0.0:{}'.format(APP_PORT)
# 工作模式协程
worker_class = 'gevent'

10
requirements.txt Normal file
View File

@ -0,0 +1,10 @@
markupsafe==2.0.1
itsdangerous==2.0.1
flask
flask_cors
gunicorn
gevent
pymongo~=3.11.0
requests~=2.25.1
pandas~=1.3.5
pycryptodome