Merge branch 'v0.21' into 'master'

V0.21

See merge request root/tfse_admin!27
This commit is contained in:
FECR-IBD 2022-04-13 19:12:23 +00:00
commit 086ee98567
13 changed files with 422 additions and 397 deletions

View File

@ -1,8 +1,11 @@
import re
import os
import json
import gridfs
import pymongo
from urllib import parse
from bson import ObjectId
from gridfs import GridFS
@ -18,7 +21,9 @@ class MongoHelper:
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
db_configs = json.load(f)
this_mongo_cfg = db_configs['MongoDB'][param]
self.client = pymongo.MongoClient('mongodb://{}'.format(this_mongo_cfg))
m = re.match('([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg)
parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)), m.group(3))
self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config))
def find_single_column(self, param1, param2, param3, param4):
"""

View File

@ -0,0 +1,64 @@
from DBHelper.MongoHelper import MongoHelper
from Manage.Company.CompanyObj import SearchCompanyBody, SearchCompanyResult
class SearchCompanyBodyImpl(SearchCompanyBody):
""""""
db = MongoHelper("tfse_v0.21")
def conditions_search(self):
""""""
search_body = self.search.make_search_body()
sort = self.sort.make_sort_body()
page_size = 10 if self.page_size > 10 else self.page_size
page_no = self.page_no
total = self.db.find_all_data_with_count(
"企业数据",
"企业数据_更新汇总",
search_body
)
records = self.db.find_data_by_page_with_sort(
"企业数据",
"企业数据_更新汇总",
search_body,
["企业ID", "企业名称", "一级行业", "综信评价结果.信用等级", "ESG评价结果.评价等级"],
sort,
page_size,
page_no
)
table_data = list()
if records:
for record in records:
company = SearchCompanyResult()
company.cid = record['企业ID']
company.name = record['企业名称']
company.industry = record['一级行业']
# if record.__contains__('综信评价结果'):
try:
company.credit_level = record['综信评价结果']['信用等级']
# else:
except KeyError:
company.credit_level = None
# if record.__contains__('ESG评价结果'):
try:
company.esg_level = record['ESG评价结果']['评价等级']
# else:
except KeyError:
company.esg_level = None
data = company.dict_to_show(columns=["企业ID", "企业名称", "一级行业", "信用等级", "ESG等级"])
table_data.append(data)
result = {
"records": table_data,
"total": total
}
return result

View File

@ -0,0 +1,104 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr
class SearchCompanyBody(SpecObject):
""""""
class SearchBody(SpecObject):
""""""
cid = ValidateAttr(field='cid', type=str)
name = ValidateAttr(field='name', type=str)
industry = ValidateAttr(field='industry', type=str)
credit_level = ValidateAttr(field='credit_level', in_list=["AAA", "AA+", "AA", "AA-", "A+", "A", "A-", "BBB+", "BBB", "BBB-", "BB+", "BB", "BB-", "B+", "B", "B-", "CCC+", "CCC", "CCC-", "CC+", "CC", "CC-", "C+", "C", "C-"])
esg_level = ValidateAttr(field='esg_level', in_list=["A", "B", "C", "D", "E"])
fields_map = {
"cid": "企业ID",
"name": "企业名称",
"industry": "一级行业",
"credit_level": "综信评价",
"esg_level": "ESG评价"
}
def make_search_body(self):
""""""
search_keys = list(self.__dict__.keys())
body = dict()
if "cid" in search_keys:
body['企业ID'] = self.cid
if "name" in search_keys:
body['企业名称'] = {"$regex": self.name}
if "industry" in search_keys:
body['一级行业'] = {"$in": self.industry}
if "credit_level" in search_keys:
body['综信评价结果.信用等级'] = {"$in": self.credit_level}
if "esg_level" in search_keys:
body['ESG评价结果.评价等级'] = {"$in": self.esg_level}
return body
class SortBody(SpecObject):
""""""
field = ValidateAttr(field='field', type=str)
sort = ValidateAttr(field='sort', in_list=["asc", "desc"])
fields_map = {
"field": "排序字段",
"sort": "排序方式"
}
def make_sort_body(self):
""""""
if self.__dict__ != {}:
columns_map = {
"企业ID": "企业ID",
"企业名称": "企业名称",
"一级行业": "一级行业",
"信用等级": "综信评价结果.信用等级",
"ESG等级": "ESG评价结果.评价等级"
}
asc_or_desc = 1 if self.sort == "asc" else -1
sort_column = columns_map[self.field]
body = {sort_column: asc_or_desc}
else:
body = {"更新时间.工商信息": -1}
return body
search = ValidateAttr(field='search', type=SearchBody)
sort = ValidateAttr(field='sort', type=SortBody)
page_size = ValidateAttr(field='page_size', type=int)
page_no = ValidateAttr(field='page_no', type=int)
fields_map = {
"search": "搜索体",
"sort": "排序",
"page_size": "显示数量",
"page_no": "页码"
}
def condition_search(self):
"""条件查询"""
class SearchCompanyResult(SpecObject):
""""""
cid = ValidateAttr(field='cid', type=str)
name = ValidateAttr(field='name', type=str, default=None)
industry = ValidateAttr(field='industry', type=str, default=None)
credit_level = ValidateAttr(field='credit_level', type=str, default=None)
esg_level = ValidateAttr(field='esg_level', type=str, default=None)
fields_map = {
"cid": "企业ID",
"name": "企业名称",
"industry": "一级行业",
"credit_level": "信用等级",
"esg_level": "ESG等级"
}

View File

@ -0,0 +1,80 @@
from flask import Blueprint, request
from Manage.Company.CompanyImpl import SearchCompanyBodyImpl
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_auth import verify_token
company_route = Blueprint('company', __name__)
@company_route.route('/search', methods=['POST'])
@verify_token
def search_company(**kwargs):
"""企业搜索"""
try:
req = request.json
search_body_impl = SearchCompanyBodyImpl()
search_body = SearchCompanyBodyImpl().SearchBody()
sort_body = SearchCompanyBodyImpl().SortBody()
search = req['search'] if req.__contains__('search') else {}
sort = req['sort'] if req.__contains__('sort') else {}
if search.__contains__('企业ID'):
search_body.cid = search['企业ID']
if search.__contains__('企业名称'):
search_body.name = search['企业名称']
if search.__contains__('一级行业'):
search_body.industry = search['一级行业']
if search.__contains__('综信评价'):
search_body.credit_level = search['综信评价']
if search.__contains__('ESG评价'):
search_body.esg_level = search['ESG评价']
if sort != {}:
sort_body.field = list(sort.keys())[0]
sort_body.sort = list(sort.values())[0]
search_body_impl.search = search_body
search_body_impl.sort = sort_body
search_body_impl.page_size = req['page_size']
search_body_impl.page_no = req['page_no']
result = search_body_impl.conditions_search()
return {"info": '查询结果', "result": result}, 200
except ReturnConditionCheckFailed as e:
return {"info": e.failed_info}, e.status_code
@company_route.route('/index', methods=['POST'])
@verify_token
def company_index(**kwargs):
"""
企业主页
Parameters:
-
Returns:
info: 返回信息
result: 返回数据
"""
return {"info": '调整中'}, 200
@company_route.route('/edit', methods=['POST'])
@verify_token
def company_edit(**kwargs):
"""
企业信息编辑
Parameters:
-
Returns:
result: 编辑结果
"""
return {"info": '调整中'}, 200

View File

@ -1,86 +0,0 @@
import re
import pymongo
client = pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format('root', 'sromitdTW569kC#M', '116.63.130.34', 27018))
def find_company_records(query, skip, limit):
"""
查询企业信息
Parameters:
query: 查询条件
skip: 默认0
limit: 查询条数
Returns:
record: 查询结果
"""
collection = client['用户']['用户信息']
if query:
data = collection.find({"企业ID": query}, {'企业ID': True, '企业名称': True, '_id': False}).skip(skip).limit(limit)
record = list(data)
if len(record) == 0:
data = collection.find({"企业名称": re.compile(query)}, {'企业ID': True, '企业名称': True, '_id': False}).skip(skip).limit(limit)
record = list(data)
return record
else:
data = collection.find({}, {'企业ID': True, '企业名称': True, '_id': False}).skip(skip).limit(limit)
record = list(data)
return record
def find_user_info(cid):
"""
查询用户信息
Parameters:
cid: 企业ID
Returns:
record: 查询结果
"""
collection = client['用户']['用户信息']
data = collection.find({"企业ID": cid}, {'企业ID': True, '企业名称': True, '已认证': True, '_id': False})
record = list(data)
return record
def find_company_basic_info(cid):
"""
查询公司基本信息
Parameters:
cid: 企业ID
Returns:
record: 查询结果
"""
collection = client['企业']['公司基本信息']
data = collection.find({"企业ID": cid}, {'_id': False})
record = list(data)
return record
def find_company_evaluate_info(cid):
"""
查询评价记录
Parameters:
cid: 企业ID
Returns:
record: 查询结果
"""
collection = client['评价']['评价记录']
data = collection.find({"企业ID": cid}, {'_id': False, '企业ID': False})
record = list(data)
return record
def update_company_info(cid, data):
"""
编辑企业信息
Parameters:
cid: 企业ID
data: 编辑后数据
Returns:
record: 编辑结果
"""
collection = client['企业']['公司基本信息']
res = collection.update_one({"企业ID": cid}, {"$set": data})
info = res.raw_result['updatedExisting']
return info

View File

@ -1,148 +0,0 @@
from DBHelper.MongoHelper import MongoHelper
from Manage.company.company_db import *
from Manage.company.company_obj import SearchCompany, Company
class SearchCompanyImpl(SearchCompany):
""""""
db = MongoHelper("tfse_v0.21")
def conditions_search(self):
""""""
def make_search_body(param):
search_keys = list(param.keys())
body = dict()
if "企业ID" in search_keys:
body['企业ID'] = param['企业ID']
if "企业名称" in search_keys:
body['企业名称'] = {"$regex": param['企业名称']}
if "一级行业" in search_keys:
body['一级行业'] = {"$in": param['一级行业']}
if "信用等级" in search_keys:
body['综信评价结果.信用等级'] = {"$in": param['信用等级']}
if "ESG等级" in search_keys:
body['ESG评价结果.评价等级'] = {"$in": param['ESG等级']}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {
"企业ID": "企业ID",
"企业名称": "企业名称",
"一级行业": "一级行业",
"信用等级": "综信评价结果.信用等级",
"ESG等级": "ESG评价结果.评价等级"
}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
body = {"更新时间.工商信息": -1}
return body
search_body = make_search_body(self.search)
sort = make_sort_body(self.sort)
page_size = 10 if self.page_size > 10 else self.page_size
page_no = int(self.page_no)
total = self.db.find_all_data_with_count(
"企业数据",
"企业数据_更新汇总",
search_body
)
records = self.db.find_data_by_page_with_sort(
"企业数据",
"企业数据_更新汇总",
search_body,
["企业ID", "企业名称", "一级行业", "综信评价结果.信用等级", "ESG评价结果.评价等级"],
sort,
page_size,
page_no
)
table_data = list()
if records:
for record in records:
company = Company()
company.cid = record['企业ID']
company.name = record['企业名称']
company.industry_l1 = record['一级行业']
if record['综信评价结果'] != {}:
company.cc_rating_result = record['综信评价结果']['信用等级']
else:
company.cc_rating_result = None
if record['ESG评价结果'] != {}:
company.esg_rating_result = record['ESG评价结果']['评价等级']
else:
company.esg_rating_result = None
data = company.dict_to_show(columns=["企业ID", "企业名称", "一级行业", "综信评价结果", "ESG评价结果"])
data['信用等级'] = data.pop('综信评价结果')
data['ESG等级'] = data.pop('ESG评价结果')
table_data.append(data)
result = {
"records": table_data,
"total": total
}
return result
def company_index_script(cid):
"""
查询企业相关信息
Parameters:
cid: 企业ID
Returns:
result: 返回数据
"""
user_info = find_user_info(cid)
basic_info = find_company_basic_info(cid)
evaluate_info = find_company_evaluate_info(cid)
result = dict()
if len(user_info) > 0:
result['企业名称'] = user_info[0]['企业名称']
result['企业ID'] = cid
result['已认证'] = user_info[0]['已认证']
if len(basic_info) > 0:
result['基本信息'] = dict()
result['基本信息']['工商信息'] = basic_info[0]['工商信息']
result['基本信息']['股东信息'] = basic_info[0]['股东信息']
result['基本信息']['主要成员'] = basic_info[0]['主要成员']
if len(evaluate_info) > 0:
result['评价记录'] = list()
result['评价记录'] = evaluate_info
for item in evaluate_info:
item['报告'] = '/file/get_company_report?file_id={}'.format(item['报告fid'])
item['证书'] = '/file/get_certification?file_id={}'.format(item['证书fid'])
item.pop('报告fid')
item.pop('证书fid')
return result
def company_edit_script(cid, data):
"""
编辑企业信息
Parameters:
cid: 企业ID
data: 编辑后数据
Returns:
result: 编辑数据
"""
result = update_company_info(cid, data)
return result

View File

@ -1,82 +0,0 @@
class Company:
def __init__(self):
self.cid = None
self.name = None
self.industry_l1 = None
self.industry_l2 = None
self.basic_info = None
self.share_holders = None
self.main_members = None
self.balance_sheet = None
self.profit_sheet = None
self.income_sheet = None
self.appendix_sheet = None
self.fin_index = None
self.cc_rating_result = None
self.esg_rating_result = None
self.update_time = None
def check_obj(self, **kwargs):
"""检查对象字段"""
def dict_to_save(self, **kwargs):
"""存储对象"""
def dict_to_show(self, **kwargs):
"""显示对象"""
_dict_ = {
"企业ID": self.cid,
"企业名称": self.name,
"一级行业": self.industry_l1,
"二级行业": self.industry_l2,
"工商信息": self.basic_info,
"股东信息": self.share_holders,
"主要成员": self.main_members,
"资产负债表": self.balance_sheet,
"利润表": self.profit_sheet,
"现金流量表": self.income_sheet,
"财务补充数据": self.appendix_sheet,
"财务指标": self.fin_index,
"综信评价结果": self.cc_rating_result,
"ESG评价结果": self.esg_rating_result,
}
if 'columns' in list(kwargs.keys()):
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
class SearchCompany(Company):
""""""
def __init__(self, search, sort, page_size, page_no):
super().__init__()
self.search = search
self.sort = sort
self.page_size = page_size
self.page_no = page_no
def conditions_search(self):
"""条件查询"""
def check_search(self):
""""""
def check_sort(self):
""""""
def check_page_size(self):
""""""
def check_page_no(self):
""""""
def check_obj(self, **kwargs):
""""""
def dict_to_show(self, **kwargs):
"""显示对象"""

View File

@ -1,57 +0,0 @@
from flask import Blueprint, request
from Manage.company.company_impl import SearchCompanyImpl
from user.user_auth import verify_token
company_route = Blueprint('company', __name__)
@company_route.route('/search', methods=['POST'])
@verify_token
def search_company(**kwargs):
"""
企业搜索
Parameters:
-
Returns:
info: 返回信息
result: 返回数据
"""
req = request.json
search_impl = SearchCompanyImpl(
req['search'],
req['sort'],
req['page_size'],
req['page_no']
)
search_impl.check_obj()
result = search_impl.conditions_search()
return {"info": '查询结果', "result": result}, 200
@company_route.route('/index', methods=['POST'])
@verify_token
def company_index(**kwargs):
"""
企业主页
Parameters:
-
Returns:
info: 返回信息
result: 返回数据
"""
return {"info": '调整中'}, 200
@company_route.route('/edit', methods=['POST'])
@verify_token
def company_edit(**kwargs):
"""
企业信息编辑
Parameters:
-
Returns:
result: 编辑结果
"""
return {"info": '调整中'}, 200

View File

@ -6,18 +6,8 @@ from flask import request
from DBHelper.MongoHelper import MongoHelper
class ConditionCheckFailed(RuntimeError):
"""条件检查失败 抛出异常"""
def __init__(self, error_info):
self.error_info = error_info
def __str__(self):
return self.error_info
class ReturnConditionCheckFailed(RuntimeError):
"""条件检查失败 抛出异常 接口返回失败原因和状态码"""
class CheckFailed(RuntimeError):
"""检查异常"""
def __init__(self, failed_info, status_code):
self.failed_info = failed_info # 失败信息
@ -49,3 +39,16 @@ class ReturnConditionCheckFailed(RuntimeError):
"异常日志",
info
)
class LogConditionCheckFailed(CheckFailed):
"""直接记录检查异"""
def __init__(self, failed_info, status_code):
self.failed_info = failed_info # 失败信息
self.status_code = status_code # 状态码
self.log_error()
class ReturnConditionCheckFailed(CheckFailed):
"""条件检查失败抛出异常 接口返回失败原因和状态码"""

View File

@ -1,13 +1,59 @@
class SpecObject(object):
"""自定义类"""
class ObjUtil:
"""对象实用程序"""
fields_map = {}
@staticmethod
def sub_dict(base_dict, sub_keys):
"""
字典子集
base_dict: 原字典
sub_keys: 子集键值
"""
return {key: base_dict[key] for key in sub_keys}
def dict_to_show(self, **kwargs):
"""显示对象"""
_dict_ = dict()
for key in self.__dict__.keys():
if type(self.__dict__[key]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key]).__name__ == 'list':
if len(self.__dict__[key]) == 0:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key][0]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = [item.dict_to_save() for item in self.__dict__[key]]
elif self.__dict__[key] is None:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = self.__dict__[key].dict_to_show()
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
def dict_to_save(self, **kwargs):
"""存储对象"""
_dict_ = dict()
for key in self.__dict__.keys():
if type(self.__dict__[key]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key]).__name__ == 'list':
if len(self.__dict__[key]) == 0:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key][0]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = [item.dict_to_save() for item in self.__dict__[key]]
elif self.__dict__[key] is None:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = self.__dict__[key].dict_to_save()
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
# def dict_to_post(self, **kwargs):
# if kwargs.__contains__('data'):
# fields_map = dict([v, k] for k, v in self.fields_map.items())
# for field in list(kwargs['data'].keys()):
# self.__setattr__(fields_map[field], kwargs['data'][field])

96
Utils/ValidateUtil.py Normal file
View File

@ -0,0 +1,96 @@
import re
from Utils.ErrorUtil import ReturnConditionCheckFailed
class Validate(object):
@staticmethod
def email(param):
""""""
regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
case = (len(param) > 7) and (re.match(regex, param) is not None)
result = True if case else False
return result
@staticmethod
def password(param):
""""""
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(param) >= 8) and (re.match(regex, param) is not None)
return True if case else False
@staticmethod
def image(param):
""""""
mimetype = param.mimetype
return True if mimetype in ['image/jpeg', 'image/png'] else False
class ValidateAttr(object):
def __init__(self, **kwargs):
""""""
self.kwargs = kwargs
self.error_info = kwargs['error_info'] if 'error_info' in kwargs else ('{}异常'.format(kwargs['mark']) if 'mark' in kwargs else None)
self.error_code = kwargs['error_code'] if 'error_code' in kwargs else 200
def __get__(self, instance, owner):
if self.kwargs['field'] in instance.__dict__:
return instance.__dict__[self.kwargs['field']]
def __set__(self, instance, value):
""""""
try:
self.error_info = '{}异常'.format(instance.fields_map[self.kwargs['field']])
except AttributeError:
pass
def not_default():
"""无默认值时属性设值检查"""
if 'type' in self.kwargs:
"""检查实例的属性类型"""
if not isinstance(value, self.kwargs['type']):
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'length' in self.kwargs:
"""检查实例的属性值长度(一般是str类型)"""
if len(value) != self.kwargs['length']:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'in_list' in self.kwargs:
"""检查实例属性是否包含于列表中(属性有列表和非列表两种情况)"""
if type(value) is not list:
if value not in self.kwargs['in_list']:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
else:
for item in value:
if item not in self.kwargs['in_list']:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'instance_list' in self.kwargs:
"""检查实例列表"""
if type(value) is not list:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
else:
for item in value:
if not isinstance(item, self.kwargs['instance_list']):
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'func' in self.kwargs:
"""属性检查函数"""
if not list(map(self.kwargs['func'], [value]))[0]:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
instance.__dict__[self.kwargs['field']] = value
if 'default' in self.kwargs:
"""实例属性默认值"""
if value is None:
value = self.kwargs['default']
instance.__dict__[self.kwargs['field']] = value
else:
not_default()
else:
not_default()

2
app.py
View File

@ -5,7 +5,7 @@ from Board.board_routes import board_route
from Manage.industry.industry_routes import industry_route
from Model.model_routes import model_route
from TestCenter.tc_routes import test_route
from Manage.company.company_routes import company_route
from Manage.Company.CompanyRoutes import company_route
from Manage.rating.routes import rating_route
from user.user_routes import user_route
from Manage.macro.routes import macro_route