Merge branch 'v0.21' into 'master'
V0.21 See merge request root/etl_tfse!15
This commit is contained in:
commit
2993578e70
|
@ -0,0 +1,136 @@
|
|||
import json
|
||||
import requests
|
||||
|
||||
from DBHelper.MongoHelper import MongoHelper
|
||||
from Utils.ErrorUtil import LogConditionCheckFailed
|
||||
from CompanyData.CompanyDataObj import CompanyData, BasicInfo, ShareHolder, MainMember
|
||||
|
||||
|
||||
class CompanyDataImpl(CompanyData):
|
||||
"""企业数据实现类"""
|
||||
|
||||
db_tyc = MongoHelper("tyc")
|
||||
db_tfse = MongoHelper("tfse_v0.21")
|
||||
|
||||
def drag_data_from_tyc_api(self):
|
||||
"""拉取天眼查接口数据"""
|
||||
|
||||
url = "http://api.fecribd.com/api/tyc/drag_data"
|
||||
headers = {'token': "uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM"}
|
||||
data = {"企业名称": self.name}
|
||||
res = requests.post(url=url, headers=headers, data=json.dumps(data))
|
||||
return True if res.status_code == 200 else False
|
||||
|
||||
def prepare_company_init_data(self):
|
||||
""""""
|
||||
|
||||
def prepare_business_data():
|
||||
""""""
|
||||
data = self.db_tyc.find_single_column(
|
||||
"公司背景",
|
||||
"基本信息",
|
||||
{"企业名称": self.name},
|
||||
"基本信息"
|
||||
)
|
||||
|
||||
basic_info = BasicInfo()
|
||||
basic_info.status = data['regStatus']
|
||||
basic_info.legal_person = data['legalPersonName']
|
||||
basic_info.company_type = data['companyOrgType']
|
||||
basic_info.taxpayer_id = data['taxNumber']
|
||||
basic_info.business_scope = data['businessScope']
|
||||
basic_info.registered_capital = data['regCapital']
|
||||
basic_info.paid_capital = data['actualCapital']
|
||||
basic_info.registered_address = data['regLocation']
|
||||
basic_info.registration_authority = data['regInstitute']
|
||||
basic_info.industry = data['industry']
|
||||
basic_info.staff_size = data['staffNumRange']
|
||||
basic_info.people_insured_num = data['socialStaffNum']
|
||||
basic_info.micro_company = "是" if data['regStatus'] == 1 else "否"
|
||||
|
||||
self.basic_info = basic_info
|
||||
|
||||
self.db_tfse.upsert_single_data(
|
||||
"企业数据",
|
||||
"企业数据_更新汇总",
|
||||
{"企业ID": self.cid},
|
||||
self.dict_to_save()
|
||||
)
|
||||
|
||||
def prepare_share_holders():
|
||||
""""""
|
||||
|
||||
self.share_holder = list()
|
||||
|
||||
data = self.db_tyc.find_single_column(
|
||||
"公司背景",
|
||||
"企业股东",
|
||||
{"企业名称": self.name},
|
||||
"企业股东"
|
||||
)
|
||||
|
||||
try:
|
||||
results = data['result']
|
||||
except KeyError:
|
||||
raise LogConditionCheckFailed("企业股东数据异常", 200)
|
||||
|
||||
for result in results:
|
||||
share_holder = ShareHolder()
|
||||
share_holder.name = result['name']
|
||||
share_holder.share_holder_type = "公司" if result['type'] == 1 else ("个人" if result['type'] == 2 else "其他")
|
||||
|
||||
if result['capital']:
|
||||
share_holder.share_holding_ratio = None if result['capital'][0]['percent'] == '' else result['capital'][0]['percent']
|
||||
share_holder.subscription_amount = None if result['capital'][0]['amomon'] == '' else result['capital'][0]['amomon']
|
||||
share_holder.subscription_date = None if result['capital'][0]['time'] == '' else result['capital'][0]['time']
|
||||
|
||||
if result['capitalActl']:
|
||||
share_holder.paid_amount = [capital_actl['amomon'] for capital_actl in result['capitalActl']]
|
||||
share_holder.payment_method = [capital_actl['paymet'] for capital_actl in result['capitalActl']]
|
||||
share_holder.payment_time = [capital_actl['time'] for capital_actl in result['capitalActl']]
|
||||
|
||||
self.share_holder.append(share_holder)
|
||||
|
||||
self.db_tfse.upsert_single_data(
|
||||
"企业数据",
|
||||
"企业数据_更新汇总",
|
||||
{"企业ID": self.cid},
|
||||
self.dict_to_save()
|
||||
)
|
||||
|
||||
def prepare_main_members():
|
||||
""""""
|
||||
|
||||
self.main_members = list()
|
||||
|
||||
data = self.db_tyc.find_single_column(
|
||||
"公司背景",
|
||||
"主要人员",
|
||||
{"企业名称": self.name},
|
||||
"主要人员"
|
||||
)
|
||||
|
||||
try:
|
||||
results = data['result']
|
||||
except KeyError:
|
||||
raise LogConditionCheckFailed("主要人员数据异常", 200)
|
||||
|
||||
for result in results:
|
||||
main_member = MainMember()
|
||||
main_member.name = result['name']
|
||||
main_member.job_title = result['typeJoin']
|
||||
self.main_members.append(main_member)
|
||||
|
||||
self.db_tfse.upsert_single_data(
|
||||
"企业数据",
|
||||
"企业数据_更新汇总",
|
||||
{"企业ID": self.cid},
|
||||
self.dict_to_save()
|
||||
)
|
||||
|
||||
def __main__():
|
||||
prepare_business_data()
|
||||
prepare_share_holders()
|
||||
prepare_main_members()
|
||||
|
||||
__main__()
|
|
@ -0,0 +1,200 @@
|
|||
from Utils.ObjUtil import SpecObject
|
||||
from Utils.ValidateUtil import ValidateAttr
|
||||
|
||||
|
||||
class BasicInfo(SpecObject):
|
||||
"""基本工商信息"""
|
||||
|
||||
status = ValidateAttr(field="status", type=str, default=None)
|
||||
legal_person = ValidateAttr(field="legal_person", type=str, default=None)
|
||||
company_type = ValidateAttr(field="company_type", type=str, default=None)
|
||||
taxpayer_id = ValidateAttr(field="taxpayer_id", type=str, default=None)
|
||||
business_scope = ValidateAttr(field="business_scope", type=str, default=None)
|
||||
registered_capital = ValidateAttr(field="registered_capital", type=str, default=None)
|
||||
paid_capital = ValidateAttr(field="paid_capital", type=str, default=None)
|
||||
registered_address = ValidateAttr(field="registered_address", type=str, default=None)
|
||||
registration_authority = ValidateAttr(field="registration_authority", type=str, default=None)
|
||||
industry = ValidateAttr(field="industry", type=str, default=None)
|
||||
staff_size = ValidateAttr(field="staff_size", type=str, default=None)
|
||||
people_insured_num = ValidateAttr(field="people_insured_num", type=int, default=None)
|
||||
micro_company = ValidateAttr(field="micro_company", type=str, default=None)
|
||||
|
||||
fields_map = {
|
||||
"status": "企业状态",
|
||||
"legal_person": "法定代表人",
|
||||
"company_type": "企业类型",
|
||||
"taxpayer_id": "纳税人识别号",
|
||||
"business_scope": "经营范围",
|
||||
"registered_capital": "注册资本",
|
||||
"paid_capital": "实缴资本",
|
||||
"registered_address": "注册地址",
|
||||
"registration_authority": "登记机关",
|
||||
"industry": "行业",
|
||||
"staff_size": "人员规模",
|
||||
"people_insured_num": "参保人数",
|
||||
"micro_company": "小微企业"
|
||||
}
|
||||
|
||||
|
||||
class ShareHolder(SpecObject):
|
||||
"""股东信息"""
|
||||
|
||||
name = ValidateAttr(field='name', type=str)
|
||||
share_holder_type = ValidateAttr(field='share_holder_type', type=str, default=None)
|
||||
share_holding_ratio = ValidateAttr(field="share_holding_ratio", type=str, default=None)
|
||||
subscription_amount = ValidateAttr(field="subscription_amount", type=str, default=None)
|
||||
subscription_date = ValidateAttr(field="subscription_date", type=str, default=None)
|
||||
paid_amount = ValidateAttr(field="paid_amount", type=list, default=[])
|
||||
payment_method = ValidateAttr(field="payment_method", type=list, default=[])
|
||||
payment_time = ValidateAttr(field="payment_time", type=list, default=[])
|
||||
|
||||
fields_map = {
|
||||
"name": "股东",
|
||||
"share_holder_type": "股东类型",
|
||||
"share_holding_ratio": "持股比例",
|
||||
"subscription_amount": "认缴金额",
|
||||
"subscription_date": "认缴日期",
|
||||
"paid_amount": "实缴金额",
|
||||
"payment_method": "实缴方式",
|
||||
"payment_time": "实缴时间"
|
||||
}
|
||||
|
||||
|
||||
class MainMember(SpecObject):
|
||||
"""主要成员"""
|
||||
|
||||
name = ValidateAttr(field="name", type=str)
|
||||
job_title = ValidateAttr(field="job_title", type=list, default=None)
|
||||
|
||||
fields_map = {
|
||||
"name": "姓名",
|
||||
"job_title": "职务"
|
||||
}
|
||||
|
||||
|
||||
class BalanceSheet(SpecObject):
|
||||
"""资产负债表"""
|
||||
accounts_receivable = ValidateAttr(field='accounts_receivable', type=float)
|
||||
stock = ValidateAttr(field='stock', type=float)
|
||||
total_current_assets = ValidateAttr(field='total_current_assets', type=float)
|
||||
total_assets = ValidateAttr(field='total_assets', type=float)
|
||||
short_loan = ValidateAttr(field='short_loan', type=float)
|
||||
one_year_liabilities = ValidateAttr(field='one_year_liabilities', type=float)
|
||||
total_current_liabilities = ValidateAttr(field='total_current_liabilities', type=float)
|
||||
long_term_loan = ValidateAttr(field='long_term_loan', type=float)
|
||||
total_liabilities = ValidateAttr(field='total_liabilities', type=float)
|
||||
total_owners_equity = ValidateAttr(field='total_owners_equity', type=float)
|
||||
|
||||
fields_map = {
|
||||
'accounts_receivable': '应收账款',
|
||||
'stock': '存货',
|
||||
'total_current_assets': '流动资产合计',
|
||||
'total_assets': '资产总计',
|
||||
'short_loan': '短期借款',
|
||||
'one_year_liabilities': '一年内到期非流动负债',
|
||||
'total_current_liabilities': '流动负债合计',
|
||||
'long_term_loan': '长期借款',
|
||||
'total_liabilities': '负债合计',
|
||||
'total_owners_equity': '所有者权益合计'
|
||||
}
|
||||
|
||||
|
||||
class ProfitSheet(SpecObject):
|
||||
"""利润表"""
|
||||
operating_income = ValidateAttr(field='operating_income', type=float)
|
||||
operating_cost = ValidateAttr(field='operating_cost', type=float)
|
||||
total_profit = ValidateAttr(field='total_profit', type=float)
|
||||
net_profit = ValidateAttr(field='net_profit', type=float)
|
||||
|
||||
fields_map = {
|
||||
'operating_income': '营业收入',
|
||||
'operating_cost': '营业成本',
|
||||
'total_profit': '利润总额',
|
||||
'net_profit': '净利润'
|
||||
}
|
||||
|
||||
|
||||
class AppendixDataSheet(SpecObject):
|
||||
"""补充数据表"""
|
||||
rd_expenses = ValidateAttr(field='rd_expenses', type=float)
|
||||
interest_disbursement = ValidateAttr(field='interest_disbursement', type=float)
|
||||
interest_expense = ValidateAttr(field='interest_expense', type=float)
|
||||
|
||||
fields_map = {
|
||||
'rd_expenses': '研发费用异常',
|
||||
'interest_disbursement': '计入财务费的利息支出异常',
|
||||
'interest_expense': '资本化利息支出异常'
|
||||
}
|
||||
|
||||
|
||||
class FinancialIndex(SpecObject):
|
||||
"""财务指标"""
|
||||
roe = ValidateAttr(field="roe", type=float)
|
||||
inventory_turnover = ValidateAttr(field="inventory_turnover", type=float)
|
||||
interest_multiple = ValidateAttr(field="interest_multiple", type=float)
|
||||
accounts_receivable_turnover = ValidateAttr(field="accounts_receivable_turnover", type=float)
|
||||
total_asset_turnover = ValidateAttr(field="total_asset_turnover", type=float)
|
||||
total_asset_growth_rate = ValidateAttr(field="total_asset_growth_rate", type=float)
|
||||
roa = ValidateAttr(field="roa", type=float)
|
||||
technology_investment_ratio = ValidateAttr(field="technology_investment_ratio", type=float)
|
||||
operating_growth_rate = ValidateAttr(field="operating_growth_rate", type=float)
|
||||
assets_and_liabilities = ValidateAttr(field="assets_and_liabilities", type=float)
|
||||
quick_ratio = ValidateAttr(field="quick_ratio", type=float)
|
||||
|
||||
fields_map = {
|
||||
"roe": "净资产收益率",
|
||||
"inventory_turnover": "存货周转率",
|
||||
"interest_multiple": "已获利息倍数",
|
||||
"accounts_receivable_turnover": "应收账款周转率",
|
||||
"total_asset_turnover": "总资产周转率",
|
||||
"total_asset_growth_rate": "总资产增长率",
|
||||
"roa": "总资产报酬率",
|
||||
"technology_investment_ratio": "技术投入比率",
|
||||
"operating_growth_rate": "营业增长率",
|
||||
"assets_and_liabilities": "资产负债率",
|
||||
"quick_ratio": "速动比率"
|
||||
}
|
||||
|
||||
|
||||
class CompanyData(SpecObject):
|
||||
"""企业数据"""
|
||||
|
||||
cid = ValidateAttr(field='cid', type=str, length=8)
|
||||
name = ValidateAttr(field='name', type=str)
|
||||
industry_l1 = ValidateAttr(field='industry_l1', type=str)
|
||||
industry_l2 = ValidateAttr(field='industry_l2', type=str)
|
||||
basic_info = ValidateAttr(field='basic_info', type=BasicInfo)
|
||||
share_holder = ValidateAttr(field='share_holders', instance_list=ShareHolder)
|
||||
main_members = ValidateAttr(field="main_members", instance_list=MainMember)
|
||||
balance_sheet = ValidateAttr(field='balance_sheet', type=BalanceSheet)
|
||||
profit_sheet = ValidateAttr(field='profit_sheet', type=ProfitSheet)
|
||||
# income_sheet = {}
|
||||
appendix_sheet = ValidateAttr(field='appendix_sheet', type=AppendixDataSheet)
|
||||
fin_index = ValidateAttr(field='fin_index', type=FinancialIndex)
|
||||
cc_rating_result = ValidateAttr(field='cc_rating_result', type=dict)
|
||||
esg_rating_result = ValidateAttr(field='esg_rating_result', type=dict)
|
||||
update_time = ValidateAttr(field='update_time', type=dict)
|
||||
|
||||
fields_map = {
|
||||
"cid": "企业ID",
|
||||
"name": "企业名称",
|
||||
"industry_l1": "一级行业",
|
||||
"industry_l2": "二级行业",
|
||||
"basic_info": "工商信息",
|
||||
"share_holders": "股东信息",
|
||||
"main_members": "主要成员",
|
||||
"balance_sheet": "资产负债表",
|
||||
"profit_sheet": "利润表",
|
||||
"income_sheet": "现金流量表",
|
||||
"appendix_sheet": "财务补充数据",
|
||||
"fin_index": "财务指标",
|
||||
"cc_rating_result": "综信评价结果",
|
||||
"esg_rating_result": "ESG评价结果",
|
||||
"update_time": "更新时间"
|
||||
}
|
||||
|
||||
def drag_data_from_tyc_api(self):
|
||||
"""拉取天眼查接口数据"""
|
||||
|
||||
def prepare_company_init_data(self):
|
||||
"""准备企业初始化数据"""
|
|
@ -1,5 +1,7 @@
|
|||
from flask import Blueprint, request
|
||||
|
||||
from CompanyData.CompanyDataImpl import CompanyDataImpl
|
||||
from Utils.ErrorUtil import ReturnConditionCheckFailed
|
||||
from common.utils import verify_token
|
||||
from CompanyData.job_scripts import *
|
||||
|
||||
|
@ -11,25 +13,20 @@ company_route = Blueprint('company', __name__)
|
|||
def basic_info():
|
||||
"""
|
||||
初始化企业数据
|
||||
包括 天眼查数据库、企业基本信息
|
||||
Parameters:
|
||||
cid 企业ID
|
||||
company_name 企业名称
|
||||
Returns:
|
||||
res: desc
|
||||
"""
|
||||
cid = json.loads(request.data)['cid']
|
||||
company_name = json.loads(request.data)['company_name']
|
||||
|
||||
# 拉取企业数据
|
||||
info1 = drag_company_data_request(company_name)
|
||||
if info1 != "企业数据拉取成功":
|
||||
return {"info": "数据拉取失败"}, 210
|
||||
|
||||
# 清洗基本信息数据
|
||||
basic_info_etl(cid, company_name)
|
||||
|
||||
return {"info": "企业基本信息初始化成功"}
|
||||
try:
|
||||
req = request.json
|
||||
company_data = CompanyDataImpl()
|
||||
company_data.cid = req['cid']
|
||||
company_data.name = req['company_name']
|
||||
company_data.drag_data_from_tyc_api()
|
||||
company_data.prepare_company_init_data()
|
||||
return {"info": "企业基本信息初始化成功"}
|
||||
except ReturnConditionCheckFailed as e:
|
||||
e.log_error()
|
||||
return {"info": e.failed_info}, e.status_code
|
||||
except KeyError:
|
||||
return {"info": "参数错误"}, 400
|
||||
|
||||
|
||||
@company_route.route('/company_index_etl', methods=['POST'])
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"MongoDB": {
|
||||
"tfse_v0.21": "root:UTlC9cCoglD1cI1*@116.63.130.34:27021",
|
||||
"tyc": "root:gP@DwMSVd5Sh6EiH@116.63.130.34:27019"
|
||||
},
|
||||
"Mysql": {
|
||||
|
||||
},
|
||||
"Redis": {
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,231 @@
|
|||
import re
|
||||
import os
|
||||
import json
|
||||
|
||||
import gridfs
|
||||
import pymongo
|
||||
|
||||
from urllib import parse
|
||||
from bson import ObjectId
|
||||
from gridfs import GridFS
|
||||
|
||||
|
||||
class MongoHelper:
|
||||
|
||||
def __init__(self, param):
|
||||
"""
|
||||
param:
|
||||
type:str
|
||||
desc: 选择连接哪个MongoDB数据库
|
||||
"""
|
||||
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
|
||||
db_configs = json.load(f)
|
||||
this_mongo_cfg = db_configs['MongoDB'][param]
|
||||
m = re.match('([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg)
|
||||
parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)), m.group(3))
|
||||
self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config))
|
||||
|
||||
def find_single_column(self, param1, param2, param3, param4):
|
||||
"""
|
||||
查询符合条件的第一条数据的某个指定字段值
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: str 一个指定查询字段
|
||||
return:
|
||||
type: None or dict
|
||||
desc: 查询结果为空,返回None; 查询结果正常,返回查询结果的第一条数据;
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
column = {**{'_id': False}, **{param4: 1}}
|
||||
record = list(collection.find(param3, column))
|
||||
return None if record == [] else record[0][param4]
|
||||
|
||||
def find_single_data(self, param1, param2, param3, param4):
|
||||
"""
|
||||
查询符合条件的第一条数据
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: list 查询字段
|
||||
return:
|
||||
type: bool or dict
|
||||
desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据;
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
|
||||
record = list(collection.find(param3, columns))
|
||||
return False if record == [] else record[0]
|
||||
|
||||
def find_single_data_with_single_sort(self, param1, param2, param3, param4, param5):
|
||||
"""
|
||||
查询符合条件的第一条数据,按单个排序条件返回
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: list 查询字段
|
||||
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
|
||||
return:
|
||||
type: bool or dict
|
||||
desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据;
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
|
||||
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(1))
|
||||
return False if record == [] else record[0]
|
||||
|
||||
def find_all_data(self, param1, param2, param3, param4):
|
||||
"""
|
||||
查询符合条件的所有数据
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: list 查询字段
|
||||
return:
|
||||
type: list
|
||||
desc: 查询结果
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
|
||||
record = list(collection.find(param3, columns))
|
||||
return record
|
||||
|
||||
def find_all_data_with_count(self, param1, param2, param3):
|
||||
"""
|
||||
查询所有符合条件的数据,并返回统计数量
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: str 查询条件
|
||||
return: int 符合条件的数据数量
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
num = collection.find(param3).count()
|
||||
return num
|
||||
|
||||
def find_all_data_with_single_sort(self, param1, param2, param3, param4, param5):
|
||||
"""
|
||||
查询符合条件的数据,按单个排序条件返回
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: list 查询字段
|
||||
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
|
||||
return:
|
||||
type: bool or dict
|
||||
desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据;
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
|
||||
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]))
|
||||
return False if record == [] else record
|
||||
|
||||
def find_data_with_aggregate(self, param1, param2, param3):
|
||||
"""
|
||||
根据聚合条件查询
|
||||
param1: str 数据库
|
||||
param2: str 数据集
|
||||
param3:
|
||||
type: list
|
||||
desc: 聚合条件
|
||||
demo: [{'$match':{'price':{'$gte':50}}}, {'$group': {'_id': "$fName", 'count': {'$sum': 1}}}]
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
data = list(collection.aggregate(param3))
|
||||
return data
|
||||
|
||||
def find_data_by_page_with_sort(self, param1, param2, param3, param4, param5, param6, param7):
|
||||
"""
|
||||
根据聚合翻页查询,且按照需求字段排序返回
|
||||
param1: str 数据库
|
||||
param2: str 数据集
|
||||
param3: dict 查询条件
|
||||
param4: list 显示字段
|
||||
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
|
||||
param6: int 即 page_size 每页数据条数
|
||||
param7: int 即 page_no 当前页码
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
|
||||
page_size = int(param6)
|
||||
page_no = int(param7)
|
||||
skip_num = page_size * (page_no - 1)
|
||||
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(page_size).skip(skip_num))
|
||||
return False if record == [] else record
|
||||
|
||||
def insert_single_data(self, param1, param2, param3):
|
||||
"""
|
||||
插入一条数据
|
||||
param1: str 数据库
|
||||
param2: str 数据集
|
||||
param3: obj 插入数据
|
||||
return: None
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
collection.insert_one(param3)
|
||||
|
||||
def upsert_single_data(self, param1, param2, param3, param4):
|
||||
"""
|
||||
插入单条数据
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: dict 更新或新插入的数据
|
||||
return:
|
||||
None
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
collection.update_one(param3, {"$set": param4}, upsert=True)
|
||||
|
||||
def update_single_data(self, param1, param2, param3, param4):
|
||||
"""
|
||||
插入单条数据
|
||||
param1: str 数据库
|
||||
param2: str 数据表
|
||||
param3: dict 查询条件
|
||||
param4: dict 更新或新插入的数据
|
||||
return:
|
||||
None
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
collection.update_one(param3, {"$set": param4})
|
||||
|
||||
def delete_single_data(self, param1, param2, param3):
|
||||
"""
|
||||
根据查询条件删除一条文档
|
||||
param1: str 数据库
|
||||
param2: str 数据集
|
||||
param3: obj 查询条件
|
||||
return: None
|
||||
"""
|
||||
collection = self.client[param1][param2]
|
||||
collection.delete_one(param3)
|
||||
return True
|
||||
|
||||
def find_file(self, param1, param2, param3):
|
||||
"""
|
||||
读取一个文件
|
||||
param1: str 数据库
|
||||
param2: str 存储桶
|
||||
param3: str 文件id
|
||||
return:
|
||||
type: binary?
|
||||
desc: 二进制文件流
|
||||
"""
|
||||
try:
|
||||
# 实例化一个文件存储器
|
||||
gfs = GridFS(self.client[param1], collection=param2)
|
||||
# 二进制读取文件
|
||||
data_stream = gfs.get(ObjectId(param3)).read()
|
||||
# 返回文件二进制流
|
||||
return data_stream
|
||||
except gridfs.errors.NoFile:
|
||||
return False
|
||||
|
||||
def delete_file(self, param1, param2, param3):
|
||||
"""
|
||||
根据id删除文件
|
||||
param1: str 数据库
|
||||
param2: str 存储桶
|
||||
param3: str 文件fid
|
||||
"""
|
||||
fs = GridFS(self.client[param1], param2)
|
||||
fs.delete(ObjectId(param3))
|
|
@ -1,3 +1,5 @@
|
|||
import time
|
||||
|
||||
import pymongo
|
||||
|
||||
HOST = "116.63.130.34"
|
||||
|
@ -8,26 +10,120 @@ PORT = "27021"
|
|||
mongo_client = pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format(USER, PASS, HOST, PORT))
|
||||
|
||||
|
||||
class CompanyData:
|
||||
|
||||
def __init__(self):
|
||||
self.cid = None
|
||||
self.name = None
|
||||
self.industry_l1 = None
|
||||
self.basic_info = {}
|
||||
self.share_holders = []
|
||||
self.main_members = []
|
||||
self.balance_sheet = {}
|
||||
self.profit_sheet = {}
|
||||
self.income_sheet = {}
|
||||
self.appendix_sheet = {}
|
||||
self.fin_index = {}
|
||||
self.cc_rating_result = {}
|
||||
self.esg_rating_result = {}
|
||||
self.update_time = {
|
||||
"工商信息": None,
|
||||
"财务信息": None,
|
||||
"综信评价": None,
|
||||
"ESG评价": None
|
||||
}
|
||||
|
||||
def dict_to_save(self):
|
||||
_dict_ = {
|
||||
"企业ID": self.cid,
|
||||
"企业名称": self.name,
|
||||
"一级行业": self.industry_l1,
|
||||
"工商信息": self.basic_info,
|
||||
"股东信息": self.share_holders,
|
||||
"主要成员": self.main_members,
|
||||
"资产负债表": self.balance_sheet,
|
||||
"利润表": self.profit_sheet,
|
||||
"现金流量表": self.income_sheet,
|
||||
"财务补充数据": self.appendix_sheet,
|
||||
"财务指标": self.fin_index,
|
||||
"综信评价结果": self.cc_rating_result,
|
||||
"ESG评价结果": self.esg_rating_result,
|
||||
"更新时间": {
|
||||
"工商信息": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||||
"财务信息": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||||
"综信评价": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||||
"ESG评价": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
}
|
||||
}
|
||||
|
||||
return _dict_
|
||||
|
||||
|
||||
def all_data():
|
||||
collection = mongo_client['应用端']['企业用户']
|
||||
|
||||
col_basic_info = mongo_client['企业数据']['基本工商信息']
|
||||
col_tyc_industry = mongo_client['模型端']['天眼查公司行业分类']
|
||||
data = list(collection.find({}, {'_id': False}))
|
||||
col_fin_data = mongo_client['企业数据']['基本财务数据']
|
||||
col_cc_result = mongo_client['综信评价数据']['评价结果']
|
||||
col_esg_result = mongo_client['ESG评价数据']['评价结果']
|
||||
|
||||
records = list(collection.find({}, {'_id': False}))
|
||||
|
||||
###
|
||||
for d in data:
|
||||
cid = d['企业ID']
|
||||
try:
|
||||
tyc_industry_l2 = list(col_basic_info.find({"企业ID": cid}, {'_id': False, "工商信息.行业": 1}))[0]["工商信息"]['行业']
|
||||
yd_industry_l1 = list(col_tyc_industry.find({"二级行业": tyc_industry_l2}, {'_id': False, "远东_一级行业": 1}))[0][
|
||||
"远东_一级行业"]
|
||||
|
||||
collection.update_one({"企业ID": cid}, {"$set": {"一级行业": yd_industry_l1}}, upsert=True)
|
||||
except Exception:
|
||||
pass
|
||||
for data in records:
|
||||
company_data = CompanyData()
|
||||
cid = data['企业ID']
|
||||
basic_info = list(col_basic_info.find({"企业ID": cid}, {'_id': False}))
|
||||
fin_info = list(col_fin_data.find({"企业ID": cid}, {'_id': False}).sort('报告期', -1).limit(1))
|
||||
cc_result = list(col_cc_result.find({"企业ID": cid}, {'_id': False}))
|
||||
esg_result = list(col_esg_result.find({"企业ID": cid}, {'_id': False}))
|
||||
|
||||
company_data.cid = cid
|
||||
company_data.name = data['企业名称']
|
||||
company_data.industry_l1 = data['一级行业']
|
||||
|
||||
if basic_info:
|
||||
company_data.basic_info = basic_info[0]['工商信息']
|
||||
company_data.share_holders = basic_info[0]['股东信息']
|
||||
company_data.main_members = basic_info[0]['主要成员']
|
||||
|
||||
if fin_info:
|
||||
company_data.balance_sheet = fin_info[0]['资产负债表']
|
||||
company_data.profit_sheet = fin_info[0]['利润表']
|
||||
company_data.income_sheet = {}
|
||||
company_data.appendix_sheet = fin_info[0]['补充数据表']
|
||||
company_data.fin_index = fin_info[0]['财务指标']
|
||||
|
||||
if cc_result:
|
||||
company_data.cc_rating_result = {
|
||||
"评价ID": cc_result[0]["评价ID"],
|
||||
"信用等级": cc_result[0]['信用等级'],
|
||||
"评价年度": cc_result[0]["评价年度"],
|
||||
"信用评分": cc_result[0]['信用评分'],
|
||||
"经营评分": cc_result[0]["经营评分"],
|
||||
"财务评分": cc_result[0]["财务评分"],
|
||||
"风险评分": cc_result[0]["风险评分"],
|
||||
"评价时间": cc_result[0]["评价时间"],
|
||||
}
|
||||
|
||||
if esg_result:
|
||||
company_data.esg_rating_result = {
|
||||
"评价ID": esg_result[0]["评价ID"],
|
||||
"评价等级": esg_result[0]['评价等级'],
|
||||
"评价年度": esg_result[0]["评价年度"],
|
||||
"ESG得分": esg_result[0]['ESG得分'],
|
||||
"环境得分": esg_result[0]["环境得分"],
|
||||
"社会得分": esg_result[0]["社会得分"],
|
||||
"公司治理得分": esg_result[0]["公司治理得分"],
|
||||
"评价时间": esg_result[0]["评价时间"],
|
||||
}
|
||||
|
||||
col = mongo_client['企业数据']['企业数据_更新汇总']
|
||||
col.update_one({"企业ID": company_data.cid}, {"$set": company_data.dict_to_save()}, upsert=True)
|
||||
|
||||
###
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
all_data()
|
||||
all_data()
|
|
@ -0,0 +1,54 @@
|
|||
import time
|
||||
import traceback
|
||||
|
||||
from flask import request
|
||||
|
||||
from DBHelper.MongoHelper import MongoHelper
|
||||
|
||||
|
||||
class CheckFailed(RuntimeError):
|
||||
"""检查异常"""
|
||||
|
||||
def __init__(self, failed_info, status_code):
|
||||
self.failed_info = failed_info # 失败信息
|
||||
self.status_code = status_code # 状态码
|
||||
|
||||
def __str__(self):
|
||||
return self.failed_info
|
||||
|
||||
def log_error(self):
|
||||
db = MongoHelper("tfse_v0.21")
|
||||
|
||||
info = {
|
||||
"ip": request.remote_addr,
|
||||
"request_info": {
|
||||
"path": request.path,
|
||||
"method": request.method,
|
||||
"headers": request.headers.__str__(),
|
||||
"args": request.args.__str__(),
|
||||
"json": request.json.__str__()
|
||||
},
|
||||
"traceback": traceback.format_exc(),
|
||||
"exception": type(self).__name__,
|
||||
"is_solved": "no",
|
||||
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
||||
}
|
||||
|
||||
db.insert_single_data(
|
||||
"日志",
|
||||
"异常日志",
|
||||
info
|
||||
)
|
||||
|
||||
|
||||
class LogConditionCheckFailed(CheckFailed):
|
||||
"""直接记录检查异常"""
|
||||
|
||||
def __init__(self, failed_info, status_code):
|
||||
self.failed_info = failed_info # 失败信息
|
||||
self.status_code = status_code # 状态码
|
||||
self.log_error()
|
||||
|
||||
|
||||
class ReturnConditionCheckFailed(CheckFailed):
|
||||
"""条件检查失败 抛出异常 接口返回失败原因和状态码"""
|
|
@ -0,0 +1,33 @@
|
|||
|
||||
|
||||
class SpecObject(object):
|
||||
"""自定义类"""
|
||||
|
||||
fields_map = {}
|
||||
|
||||
def dict_to_show(self, **kwargs):
|
||||
"""显示对象"""
|
||||
|
||||
def dict_to_save(self, **kwargs):
|
||||
"""存储对象"""
|
||||
_dict_ = dict()
|
||||
|
||||
for key in self.__dict__.keys():
|
||||
if type(self.__dict__[key]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
|
||||
_dict_[self.fields_map[key]] = self.__dict__[key]
|
||||
elif type(self.__dict__[key]).__name__ == 'list':
|
||||
if len(self.__dict__[key]) == 0:
|
||||
_dict_[self.fields_map[key]] = self.__dict__[key]
|
||||
elif type(self.__dict__[key][0]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
|
||||
_dict_[self.fields_map[key]] = self.__dict__[key]
|
||||
else:
|
||||
_dict_[self.fields_map[key]] = [item.dict_to_save() for item in self.__dict__[key]]
|
||||
elif self.__dict__[key] is None:
|
||||
_dict_[self.fields_map[key]] = self.__dict__[key]
|
||||
else:
|
||||
_dict_[self.fields_map[key]] = self.__dict__[key].dict_to_save()
|
||||
|
||||
if 'columns' in kwargs:
|
||||
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
|
||||
|
||||
return _dict_
|
|
@ -0,0 +1,92 @@
|
|||
import re
|
||||
|
||||
from Utils.ErrorUtil import ReturnConditionCheckFailed
|
||||
|
||||
|
||||
class Validate(object):
|
||||
|
||||
@staticmethod
|
||||
def email(param):
|
||||
""""""
|
||||
regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
|
||||
case = (len(param) > 7) and (re.match(regex, param) is not None)
|
||||
result = True if case else False
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def password(param):
|
||||
""""""
|
||||
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
|
||||
case = (len(param) >= 8) and (re.match(regex, param) is not None)
|
||||
return True if case else False
|
||||
|
||||
@staticmethod
|
||||
def image(param):
|
||||
""""""
|
||||
mimetype = param.mimetype
|
||||
return True if mimetype in ['image/jpeg', 'image/png'] else False
|
||||
|
||||
|
||||
class ValidateAttr(object):
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
""""""
|
||||
self.kwargs = kwargs
|
||||
self.error_info = kwargs['error_info'] if 'error_info' in kwargs else ('{}异常'.format(kwargs['mark']) if 'mark' in kwargs else ('{}异常'.format(self.__dict__['fields_map'][self.kwargs['field']]) if 'fields_map' in self.__dict__ else None))
|
||||
self.error_code = kwargs['error_code'] if 'error_code' in kwargs else 200
|
||||
|
||||
def __get__(self, instance, owner):
|
||||
if self.kwargs['field'] in instance.__dict__:
|
||||
return instance.__dict__[self.kwargs['field']]
|
||||
|
||||
def __set__(self, instance, value):
|
||||
""""""
|
||||
|
||||
def not_default():
|
||||
"""无默认值时属性设值检查"""
|
||||
|
||||
if 'type' in self.kwargs:
|
||||
"""检查实例的属性类型"""
|
||||
if not isinstance(value, self.kwargs['type']):
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
|
||||
if 'length' in self.kwargs:
|
||||
"""检查实例的属性值长度(一般是str类型)"""
|
||||
if len(value) != self.kwargs['length']:
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
|
||||
if 'in_list' in self.kwargs:
|
||||
"""检查实例属性是否包含于列表中(属性有列表和非列表两种情况)"""
|
||||
if type(value) is not list:
|
||||
if value not in self.kwargs['in_list']:
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
else:
|
||||
for item in value:
|
||||
if item not in self.kwargs['in_list']:
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
|
||||
if 'instance_list' in self.kwargs:
|
||||
"""检查实例列表"""
|
||||
if type(value) is not list:
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
else:
|
||||
for item in value:
|
||||
if not isinstance(item, self.kwargs['instance_list']):
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
|
||||
if 'func' in self.kwargs:
|
||||
"""属性检查函数"""
|
||||
if not list(map(self.kwargs['func'], [value]))[0]:
|
||||
raise ReturnConditionCheckFailed(self.error_info, self.error_code)
|
||||
|
||||
instance.__dict__[self.kwargs['field']] = value
|
||||
|
||||
if 'default' in self.kwargs:
|
||||
"""实例属性默认值"""
|
||||
if value is None:
|
||||
value = self.kwargs['default']
|
||||
instance.__dict__[self.kwargs['field']] = value
|
||||
else:
|
||||
not_default()
|
||||
else:
|
||||
not_default()
|
Loading…
Reference in New Issue