企业数据重构

This commit is contained in:
王思川 2022-04-08 16:11:56 +08:00
parent 176c24b5e2
commit 409e51765f
12 changed files with 625 additions and 18 deletions

View File

View File

@ -0,0 +1,109 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr
class BasicInfo(SpecObject):
"""基本工商信息"""
status = ValidateAttr(field="status", type=str)
legal_person = ValidateAttr(field="legal_person", type=str)
company_type = ValidateAttr(field="company_type", type=str)
taxpayer_id = ValidateAttr(field="taxpayer_id", type=str)
business_scope = ValidateAttr(field="business_scope", type=str)
registered_capital = ValidateAttr(field="registered_capital", type=str)
paid_capital = ValidateAttr(field="paid_capital", type=str)
registered_address = ValidateAttr(field="registered_address", type=str)
registration_authority = ValidateAttr(field="registration_authority", type=str)
industry = ValidateAttr(field="industry", type=str)
staff_size = ValidateAttr(field="staff_size", type=str)
people_insured_num = ValidateAttr(field="people_insured_num", type=int)
micro_company = ValidateAttr(field="micro_company", type=str)
fields_map = {
"status": "企业状态",
"legal_person": "法定代表人",
"company_type": "企业类型",
"taxpayer_id": "纳税人识别号",
"business_scope": "经营范围",
"registered_capital": "注册资本",
"paid_capital": "实缴资本",
"registered_address": "注册地址",
"registration_authority": "登记机关",
"industry": "行业",
"staff_size": "人员规模",
"people_insured_num": "参保人数",
"micro_company": "小微企业"
}
class ShareHolder(SpecObject):
"""股东信息"""
name = ValidateAttr(field='name', type=str)
share_holder_type = ValidateAttr(field='share_holder_type', type=str)
share_holding_ratio = ValidateAttr(field="share_holding_ratio", type=str)
subscription_amount = ValidateAttr(field="subscription_amount", type=str)
subscription_date = ValidateAttr(field="subscription_date", type=str)
paid_amount = ValidateAttr(field="paid_amount", type=list)
payment_method = ValidateAttr(field="payment_method", type=list)
payment_time = ValidateAttr(field="payment_time", type=list)
fields_map = {
"name": "股东",
"share_holder_type": "股东类型",
"share_holding_ratio": "持股比例",
"subscription_amount": "认缴金额",
"subscription_date": "认缴日期",
"paid_amount": "实缴金额",
"payment_method": "实缴方式",
"payment_time": "实缴时间"
}
class CompanyData(SpecObject):
"""企业数据"""
cid = ValidateAttr(field='cid', type=str, length=8)
name = ValidateAttr(field='name', type=str)
industry_l1 = ValidateAttr(field='industry_l1', type=str)
industry_l2 = ValidateAttr(field='industry_l2', type=str)
basic_info = ValidateAttr(field='basic_info', type=BasicInfo)
share_holder = ValidateAttr(field='share_holders', instance_list=ShareHolder)
main_members = []
balance_sheet = {}
profit_sheet = {}
income_sheet = {}
appendix_sheet = {}
fin_index = {}
cc_rating_result = {}
esg_rating_result = {}
update_time = {}
fields_map = {
"cid": "企业ID",
"name": "企业名称",
"industry_l1": "一级行业",
"industry_l2": "二级行业",
"basic_info": "工商信息",
"share_holders": "股东信息",
"main_members": "主要成员",
"balance_sheet": "资产负债表",
"profit_sheet": "利润表",
"income_sheet": "现金流量表",
"appendix_sheet": "财务补充数据",
"fin_index": "财务指标",
"cc_rating_result": "综信评价结果",
"esg_rating_result": "ESG评价结果",
"update_time": "更新时间"
}
if __name__ == '__main__':
company_data = CompanyData()
basic_info = BasicInfo()
share_holder = ShareHolder()
basic_info.status = "存续"
share_holder.name = '123'
company_data.basic_info = basic_info
company_data.share_holders = [share_holder, share_holder]
print(company_data.dict_to_save())

View File

@ -11,15 +11,10 @@ company_route = Blueprint('company', __name__)
def basic_info(): def basic_info():
""" """
初始化企业数据 初始化企业数据
包括 天眼查数据库企业基本信息
Parameters:
cid 企业ID
company_name 企业名称
Returns:
res: desc
""" """
cid = json.loads(request.data)['cid'] req = request.json
company_name = json.loads(request.data)['company_name'] cid = req['cid']
company_name = req['company_name']
# 拉取企业数据 # 拉取企业数据
info1 = drag_company_data_request(company_name) info1 = drag_company_data_request(company_name)

11
DBHelper/DBConfig.json Normal file
View File

@ -0,0 +1,11 @@
{
"MongoDB": {
"tfse_v0.21": "root:UTlC9cCoglD1cI1*@116.63.130.34:27021"
},
"Mysql": {
},
"Redis": {
}
}

226
DBHelper/MongoHelper.py Normal file
View File

@ -0,0 +1,226 @@
import os
import json
import gridfs
import pymongo
from bson import ObjectId
from gridfs import GridFS
class MongoHelper:
def __init__(self, param):
"""
param:
typestr
desc: 选择连接哪个MongoDB数据库
"""
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
db_configs = json.load(f)
this_mongo_cfg = db_configs['MongoDB'][param]
self.client = pymongo.MongoClient('mongodb://{}'.format(this_mongo_cfg))
def find_single_column(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据的某个指定字段值
param2: str 数据表
param3: dict 查询条件
param4: str 一个指定查询字段
return:
type: None or dict
desc: 查询结果为空返回None; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
column = {**{'_id': False}, **{param4: 1}}
record = list(collection.find(param3, column))
return None if record == [] else record[0][param4]
def find_single_data(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns))
return False if record == [] else record[0]
def find_single_data_with_single_sort(self, param1, param2, param3, param4, param5):
"""
查询符合条件的第一条数据,按单个排序条件返回
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(1))
return False if record == [] else record[0]
def find_all_data(self, param1, param2, param3, param4):
"""
查询符合条件的所有数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: list
desc: 查询结果
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns))
return record
def find_all_data_with_count(self, param1, param2, param3):
"""
查询所有符合条件的数据并返回统计数量
param1: str 数据库
param2: str 数据表
param3: str 查询条件
return: int 符合条件的数据数量
"""
collection = self.client[param1][param2]
num = collection.find(param3).count()
return num
def find_all_data_with_single_sort(self, param1, param2, param3, param4, param5):
"""
查询符合条件的数据,按单个排序条件返回
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]))
return False if record == [] else record
def find_data_with_aggregate(self, param1, param2, param3):
"""
根据聚合条件查询
param1: str 数据库
param2: str 数据集
param3:
type: list
desc: 聚合条件
demo: [{'$match':{'price':{'$gte':50}}}, {'$group': {'_id': "$fName", 'count': {'$sum': 1}}}]
"""
collection = self.client[param1][param2]
data = list(collection.aggregate(param3))
return data
def find_data_by_page_with_sort(self, param1, param2, param3, param4, param5, param6, param7):
"""
根据聚合翻页查询,且按照需求字段排序返回
param1: str 数据库
param2: str 数据集
param3: dict 查询条件
param4: list 显示字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
param6: int page_size 每页数据条数
param7: int page_no 当前页码
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
page_size = int(param6)
page_no = int(param7)
skip_num = page_size * (page_no - 1)
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(page_size).skip(skip_num))
return False if record == [] else record
def insert_single_data(self, param1, param2, param3):
"""
插入一条数据
param1: str 数据库
param2: str 数据集
param3: obj 插入数据
return: None
"""
collection = self.client[param1][param2]
collection.insert_one(param3)
def upsert_single_data(self, param1, param2, param3, param4):
"""
插入单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$set": param4}, upsert=True)
def update_single_data(self, param1, param2, param3, param4):
"""
插入单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$set": param4})
def delete_single_data(self, param1, param2, param3):
"""
根据查询条件删除一条文档
param1: str 数据库
param2: str 数据集
param3: obj 查询条件
return: None
"""
collection = self.client[param1][param2]
collection.delete_one(param3)
return True
def find_file(self, param1, param2, param3):
"""
读取一个文件
param1: str 数据库
param2: str 存储桶
param3: str 文件id
return:
type: binary?
desc: 二进制文件流
"""
try:
# 实例化一个文件存储器
gfs = GridFS(self.client[param1], collection=param2)
# 二进制读取文件
data_stream = gfs.get(ObjectId(param3)).read()
# 返回文件二进制流
return data_stream
except gridfs.errors.NoFile:
return False
def delete_file(self, param1, param2, param3):
"""
根据id删除文件
param1: str 数据库
param2: str 存储桶
param3: str 文件fid
"""
fs = GridFS(self.client[param1], param2)
fs.delete(ObjectId(param3))

0
DBHelper/__init__.py Normal file
View File

View File

@ -1,3 +1,5 @@
import time
import pymongo import pymongo
HOST = "116.63.130.34" HOST = "116.63.130.34"
@ -8,18 +10,118 @@ PORT = "27021"
mongo_client = pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format(USER, PASS, HOST, PORT)) mongo_client = pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format(USER, PASS, HOST, PORT))
class CompanyData:
def __init__(self):
self.cid = None
self.name = None
self.industry_l1 = None
self.basic_info = {}
self.share_holders = []
self.main_members = []
self.balance_sheet = {}
self.profit_sheet = {}
self.income_sheet = {}
self.appendix_sheet = {}
self.fin_index = {}
self.cc_rating_result = {}
self.esg_rating_result = {}
self.update_time = {
"工商信息": None,
"财务信息": None,
"综信评价": None,
"ESG评价": None
}
def dict_to_save(self):
_dict_ = {
"企业ID": self.cid,
"企业名称": self.name,
"一级行业": self.industry_l1,
"工商信息": self.basic_info,
"股东信息": self.share_holders,
"主要成员": self.main_members,
"资产负债表": self.balance_sheet,
"利润表": self.profit_sheet,
"现金流量表": self.income_sheet,
"财务补充数据": self.appendix_sheet,
"财务指标": self.fin_index,
"综信评价结果": self.cc_rating_result,
"ESG评价结果": self.esg_rating_result,
"更新时间": {
"工商信息": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"财务信息": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"综信评价": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"ESG评价": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
}
return _dict_
def all_data(): def all_data():
collection = mongo_client['模型端']['天眼查公司行业分类'] collection = mongo_client['应用端']['企业用户']
data = list(collection.find({}, {'_id': False}))
col_basic_info = mongo_client['企业数据']['基本工商信息']
col_fin_data = mongo_client['企业数据']['基本财务数据']
col_cc_result = mongo_client['综信评价数据']['评价结果']
col_esg_result = mongo_client['ESG评价数据']['评价结果']
records = list(collection.find({}, {'_id': False}))
### ###
pass
for d in data: for data in records:
if d['一级行业'] == '制造业': company_data = CompanyData()
d['远东_一级行业'] = '制造业' cid = data['企业ID']
if d['一级行业'] == '信息传输、软件和信息技术服务业': basic_info = list(col_basic_info.find({"企业ID": cid}, {'_id': False}))
d['远东_一级行业'] = '信息技术业' fin_info = list(col_fin_data.find({"企业ID": cid}, {'_id': False}).sort('报告期', -1).limit(1))
collection.update_one({"三级行业": d['三级行业']}, {"$set": d}) cc_result = list(col_cc_result.find({"企业ID": cid}, {'_id': False}))
esg_result = list(col_esg_result.find({"企业ID": cid}, {'_id': False}))
company_data.cid = cid
company_data.name = data['企业名称']
company_data.industry_l1 = data['一级行业']
if basic_info:
company_data.basic_info = basic_info[0]['工商信息']
company_data.share_holders = basic_info[0]['股东信息']
company_data.main_members = basic_info[0]['主要成员']
if fin_info:
company_data.balance_sheet = fin_info[0]['资产负债表']
company_data.profit_sheet = fin_info[0]['利润表']
company_data.income_sheet = {}
company_data.appendix_sheet = fin_info[0]['补充数据表']
company_data.fin_index = fin_info[0]['财务指标']
if cc_result:
company_data.cc_rating_result = {
"评价ID": cc_result[0]["评价ID"],
"信用等级": cc_result[0]['信用等级'],
"评价年度": cc_result[0]["评价年度"],
"信用评分": cc_result[0]['信用评分'],
"经营评分": cc_result[0]["经营评分"],
"财务评分": cc_result[0]["财务评分"],
"风险评分": cc_result[0]["风险评分"],
"评价时间": cc_result[0]["评价时间"],
}
if esg_result:
company_data.esg_rating_result = {
"评价ID": esg_result[0]["评价ID"],
"评价等级": esg_result[0]['评价等级'],
"评价年度": esg_result[0]["评价年度"],
"ESG得分": esg_result[0]['ESG得分'],
"环境得分": esg_result[0]["环境得分"],
"社会得分": esg_result[0]["社会得分"],
"公司治理得分": esg_result[0]["公司治理得分"],
"评价时间": esg_result[0]["评价时间"],
}
col = mongo_client['企业数据']['企业数据_更新汇总']
col.update_one({"企业ID": company_data.cid}, {"$set": company_data.dict_to_save()}, upsert=True)
### ###

41
Utils/ErrorUtil.py Normal file
View File

@ -0,0 +1,41 @@
import time
import traceback
from flask import request
from DBHelper.MongoHelper import MongoHelper
class ReturnConditionCheckFailed(RuntimeError):
"""条件检查失败 抛出异常 接口返回失败原因和状态码"""
def __init__(self, failed_info, status_code):
self.failed_info = failed_info # 失败信息
self.status_code = status_code # 状态码
def __str__(self):
return self.failed_info
def log_error(self):
db = MongoHelper("tfse_v0.21")
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": request.args.__str__(),
"json": request.json.__str__()
},
"traceback": traceback.format_exc(),
"exception": type(self).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.insert_single_data(
"日志",
"异常日志",
info
)

31
Utils/ObjUtil.py Normal file
View File

@ -0,0 +1,31 @@
class SpecObject(object):
"""自定义类"""
fields_map = {}
def dict_to_show(self, **kwargs):
"""显示对象"""
def dict_to_save(self, **kwargs):
"""存储对象"""
_dict_ = dict()
for key in self.__dict__.keys():
if type(self.__dict__[key]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key]).__name__ == 'list':
if len(self.__dict__[key]) == 0:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key][0]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = [item.dict_to_save() for item in self.__dict__[key]]
else:
_dict_[self.fields_map[key]] = self.__dict__[key].dict_to_save()
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_

92
Utils/ValidateUtil.py Normal file
View File

@ -0,0 +1,92 @@
import re
from Utils.ErrorUtil import ReturnConditionCheckFailed
class Validate(object):
@staticmethod
def email(param):
""""""
regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
case = (len(param) > 7) and (re.match(regex, param) is not None)
result = True if case else False
return result
@staticmethod
def password(param):
""""""
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(param) >= 8) and (re.match(regex, param) is not None)
return True if case else False
@staticmethod
def image(param):
""""""
mimetype = param.mimetype
return True if mimetype in ['image/jpeg', 'image/png'] else False
class ValidateAttr(object):
def __init__(self, **kwargs):
""""""
self.kwargs = kwargs
self.error_info = kwargs['error_info'] if 'error_info' in kwargs else ('{}异常'.format(kwargs['mark']) if 'mark' in kwargs else ('{}异常'.format(self.__dict__['fields_map'][self.kwargs['field']]) if 'fields_map' in self.__dict__ else None))
self.error_code = kwargs['error_code'] if 'error_code' in kwargs else 200
def __get__(self, instance, owner):
if self.kwargs['field'] in instance.__dict__:
return instance.__dict__[self.kwargs['field']]
def __set__(self, instance, value):
""""""
def not_default():
"""无默认值时属性设值检查"""
if 'type' in self.kwargs:
"""检查实例的属性类型"""
if not isinstance(value, self.kwargs['type']):
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'length' in self.kwargs:
"""检查实例的属性值长度(一般是str类型)"""
if len(value) != self.kwargs['length']:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'in_list' in self.kwargs:
"""检查实例属性是否包含于列表中(属性有列表和非列表两种情况)"""
if type(value) is not list:
if value not in self.kwargs['in_list']:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
else:
for item in value:
if item not in self.kwargs['in_list']:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'instance_list' in self.kwargs:
"""检查实例列表"""
if type(value) is not list:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
else:
for item in value:
if not isinstance(item, self.kwargs['instance_list']):
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
if 'func' in self.kwargs:
"""属性检查函数"""
if not list(map(self.kwargs['func'], [value]))[0]:
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
instance.__dict__[self.kwargs['field']] = value
if 'default' in self.kwargs:
"""实例属性默认值"""
if value is None:
value = self.kwargs['default']
instance.__dict__[self.kwargs['field']] = value
else:
not_default()
else:
not_default()

0
Utils/__init__.py Normal file
View File

2
app.py
View File

@ -1,7 +1,7 @@
from flask import Flask from flask import Flask
from flask_cors import * from flask_cors import *
from CompanyData.routes import company_route from CompanyData.CompanyDataRoutes import company_route
from RatingData.routes import rating_route from RatingData.routes import rating_route
app = Flask(__name__) app = Flask(__name__)