diff --git a/CompanyData/CompanyDataImpl.py b/CompanyData/CompanyDataImpl.py new file mode 100644 index 0000000..0cbec2e --- /dev/null +++ b/CompanyData/CompanyDataImpl.py @@ -0,0 +1,136 @@ +import json +import requests + +from DBHelper.MongoHelper import MongoHelper +from Utils.ErrorUtil import LogConditionCheckFailed +from CompanyData.CompanyDataObj import CompanyData, BasicInfo, ShareHolder, MainMember + + +class CompanyDataImpl(CompanyData): + """企业数据实现类""" + + db_tyc = MongoHelper("tyc") + db_tfse = MongoHelper("tfse_v0.21") + + def drag_data_from_tyc_api(self): + """拉取天眼查接口数据""" + + url = "http://api.fecribd.com/api/tyc/drag_data" + headers = {'token': "uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM"} + data = {"企业名称": self.name} + res = requests.post(url=url, headers=headers, data=json.dumps(data)) + return True if res.status_code == 200 else False + + def prepare_company_init_data(self): + """""" + + def prepare_business_data(): + """""" + data = self.db_tyc.find_single_column( + "公司背景", + "基本信息", + {"企业名称": self.name}, + "基本信息" + ) + + basic_info = BasicInfo() + basic_info.status = data['regStatus'] + basic_info.legal_person = data['legalPersonName'] + basic_info.company_type = data['companyOrgType'] + basic_info.taxpayer_id = data['taxNumber'] + basic_info.business_scope = data['businessScope'] + basic_info.registered_capital = data['regCapital'] + basic_info.paid_capital = data['actualCapital'] + basic_info.registered_address = data['regLocation'] + basic_info.registration_authority = data['regInstitute'] + basic_info.industry = data['industry'] + basic_info.staff_size = data['staffNumRange'] + basic_info.people_insured_num = data['socialStaffNum'] + basic_info.micro_company = "是" if data['regStatus'] == 1 else "否" + + self.basic_info = basic_info + + self.db_tfse.upsert_single_data( + "企业数据", + "企业数据_更新汇总", + {"企业ID": self.cid}, + self.dict_to_save() + ) + + def prepare_share_holders(): + """""" + + self.share_holder = list() + + data = self.db_tyc.find_single_column( + "公司背景", + "企业股东", + {"企业名称": self.name}, + "企业股东" + ) + + try: + results = data['result'] + except KeyError: + raise LogConditionCheckFailed("企业股东数据异常", 200) + + for result in results: + share_holder = ShareHolder() + share_holder.name = result['name'] + share_holder.share_holder_type = "公司" if result['type'] == 1 else ("个人" if result['type'] == 2 else "其他") + + if result['capital']: + share_holder.share_holding_ratio = None if result['capital'][0]['percent'] == '' else result['capital'][0]['percent'] + share_holder.subscription_amount = None if result['capital'][0]['amomon'] == '' else result['capital'][0]['amomon'] + share_holder.subscription_date = None if result['capital'][0]['time'] == '' else result['capital'][0]['time'] + + if result['capitalActl']: + share_holder.paid_amount = [capital_actl['amomon'] for capital_actl in result['capitalActl']] + share_holder.payment_method = [capital_actl['paymet'] for capital_actl in result['capitalActl']] + share_holder.payment_time = [capital_actl['time'] for capital_actl in result['capitalActl']] + + self.share_holder.append(share_holder) + + self.db_tfse.upsert_single_data( + "企业数据", + "企业数据_更新汇总", + {"企业ID": self.cid}, + self.dict_to_save() + ) + + def prepare_main_members(): + """""" + + self.main_members = list() + + data = self.db_tyc.find_single_column( + "公司背景", + "主要人员", + {"企业名称": self.name}, + "主要人员" + ) + + try: + results = data['result'] + except KeyError: + raise LogConditionCheckFailed("主要人员数据异常", 200) + + for result in results: + main_member = MainMember() + main_member.name = result['name'] + main_member.job_title = result['typeJoin'] + self.main_members.append(main_member) + + self.db_tfse.upsert_single_data( + "企业数据", + "企业数据_更新汇总", + {"企业ID": self.cid}, + self.dict_to_save() + ) + + def __main__(): + prepare_business_data() + prepare_share_holders() + prepare_main_members() + + __main__() diff --git a/CompanyData/CompanyDataObj.py b/CompanyData/CompanyDataObj.py new file mode 100644 index 0000000..5e6d7ad --- /dev/null +++ b/CompanyData/CompanyDataObj.py @@ -0,0 +1,200 @@ +from Utils.ObjUtil import SpecObject +from Utils.ValidateUtil import ValidateAttr + + +class BasicInfo(SpecObject): + """基本工商信息""" + + status = ValidateAttr(field="status", type=str, default=None) + legal_person = ValidateAttr(field="legal_person", type=str, default=None) + company_type = ValidateAttr(field="company_type", type=str, default=None) + taxpayer_id = ValidateAttr(field="taxpayer_id", type=str, default=None) + business_scope = ValidateAttr(field="business_scope", type=str, default=None) + registered_capital = ValidateAttr(field="registered_capital", type=str, default=None) + paid_capital = ValidateAttr(field="paid_capital", type=str, default=None) + registered_address = ValidateAttr(field="registered_address", type=str, default=None) + registration_authority = ValidateAttr(field="registration_authority", type=str, default=None) + industry = ValidateAttr(field="industry", type=str, default=None) + staff_size = ValidateAttr(field="staff_size", type=str, default=None) + people_insured_num = ValidateAttr(field="people_insured_num", type=int, default=None) + micro_company = ValidateAttr(field="micro_company", type=str, default=None) + + fields_map = { + "status": "企业状态", + "legal_person": "法定代表人", + "company_type": "企业类型", + "taxpayer_id": "纳税人识别号", + "business_scope": "经营范围", + "registered_capital": "注册资本", + "paid_capital": "实缴资本", + "registered_address": "注册地址", + "registration_authority": "登记机关", + "industry": "行业", + "staff_size": "人员规模", + "people_insured_num": "参保人数", + "micro_company": "小微企业" + } + + +class ShareHolder(SpecObject): + """股东信息""" + + name = ValidateAttr(field='name', type=str) + share_holder_type = ValidateAttr(field='share_holder_type', type=str, default=None) + share_holding_ratio = ValidateAttr(field="share_holding_ratio", type=str, default=None) + subscription_amount = ValidateAttr(field="subscription_amount", type=str, default=None) + subscription_date = ValidateAttr(field="subscription_date", type=str, default=None) + paid_amount = ValidateAttr(field="paid_amount", type=list, default=[]) + payment_method = ValidateAttr(field="payment_method", type=list, default=[]) + payment_time = ValidateAttr(field="payment_time", type=list, default=[]) + + fields_map = { + "name": "股东", + "share_holder_type": "股东类型", + "share_holding_ratio": "持股比例", + "subscription_amount": "认缴金额", + "subscription_date": "认缴日期", + "paid_amount": "实缴金额", + "payment_method": "实缴方式", + "payment_time": "实缴时间" + } + + +class MainMember(SpecObject): + """主要成员""" + + name = ValidateAttr(field="name", type=str) + job_title = ValidateAttr(field="job_title", type=list, default=None) + + fields_map = { + "name": "姓名", + "job_title": "职务" + } + + +class BalanceSheet(SpecObject): + """资产负债表""" + accounts_receivable = ValidateAttr(field='accounts_receivable', type=float) + stock = ValidateAttr(field='stock', type=float) + total_current_assets = ValidateAttr(field='total_current_assets', type=float) + total_assets = ValidateAttr(field='total_assets', type=float) + short_loan = ValidateAttr(field='short_loan', type=float) + one_year_liabilities = ValidateAttr(field='one_year_liabilities', type=float) + total_current_liabilities = ValidateAttr(field='total_current_liabilities', type=float) + long_term_loan = ValidateAttr(field='long_term_loan', type=float) + total_liabilities = ValidateAttr(field='total_liabilities', type=float) + total_owners_equity = ValidateAttr(field='total_owners_equity', type=float) + + fields_map = { + 'accounts_receivable': '应收账款', + 'stock': '存货', + 'total_current_assets': '流动资产合计', + 'total_assets': '资产总计', + 'short_loan': '短期借款', + 'one_year_liabilities': '一年内到期非流动负债', + 'total_current_liabilities': '流动负债合计', + 'long_term_loan': '长期借款', + 'total_liabilities': '负债合计', + 'total_owners_equity': '所有者权益合计' + } + + +class ProfitSheet(SpecObject): + """利润表""" + operating_income = ValidateAttr(field='operating_income', type=float) + operating_cost = ValidateAttr(field='operating_cost', type=float) + total_profit = ValidateAttr(field='total_profit', type=float) + net_profit = ValidateAttr(field='net_profit', type=float) + + fields_map = { + 'operating_income': '营业收入', + 'operating_cost': '营业成本', + 'total_profit': '利润总额', + 'net_profit': '净利润' + } + + +class AppendixDataSheet(SpecObject): + """补充数据表""" + rd_expenses = ValidateAttr(field='rd_expenses', type=float) + interest_disbursement = ValidateAttr(field='interest_disbursement', type=float) + interest_expense = ValidateAttr(field='interest_expense', type=float) + + fields_map = { + 'rd_expenses': '研发费用异常', + 'interest_disbursement': '计入财务费的利息支出异常', + 'interest_expense': '资本化利息支出异常' + } + + +class FinancialIndex(SpecObject): + """财务指标""" + roe = ValidateAttr(field="roe", type=float) + inventory_turnover = ValidateAttr(field="inventory_turnover", type=float) + interest_multiple = ValidateAttr(field="interest_multiple", type=float) + accounts_receivable_turnover = ValidateAttr(field="accounts_receivable_turnover", type=float) + total_asset_turnover = ValidateAttr(field="total_asset_turnover", type=float) + total_asset_growth_rate = ValidateAttr(field="total_asset_growth_rate", type=float) + roa = ValidateAttr(field="roa", type=float) + technology_investment_ratio = ValidateAttr(field="technology_investment_ratio", type=float) + operating_growth_rate = ValidateAttr(field="operating_growth_rate", type=float) + assets_and_liabilities = ValidateAttr(field="assets_and_liabilities", type=float) + quick_ratio = ValidateAttr(field="quick_ratio", type=float) + + fields_map = { + "roe": "净资产收益率", + "inventory_turnover": "存货周转率", + "interest_multiple": "已获利息倍数", + "accounts_receivable_turnover": "应收账款周转率", + "total_asset_turnover": "总资产周转率", + "total_asset_growth_rate": "总资产增长率", + "roa": "总资产报酬率", + "technology_investment_ratio": "技术投入比率", + "operating_growth_rate": "营业增长率", + "assets_and_liabilities": "资产负债率", + "quick_ratio": "速动比率" + } + + +class CompanyData(SpecObject): + """企业数据""" + + cid = ValidateAttr(field='cid', type=str, length=8) + name = ValidateAttr(field='name', type=str) + industry_l1 = ValidateAttr(field='industry_l1', type=str) + industry_l2 = ValidateAttr(field='industry_l2', type=str) + basic_info = ValidateAttr(field='basic_info', type=BasicInfo) + share_holder = ValidateAttr(field='share_holders', instance_list=ShareHolder) + main_members = ValidateAttr(field="main_members", instance_list=MainMember) + balance_sheet = ValidateAttr(field='balance_sheet', type=BalanceSheet) + profit_sheet = ValidateAttr(field='profit_sheet', type=ProfitSheet) + # income_sheet = {} + appendix_sheet = ValidateAttr(field='appendix_sheet', type=AppendixDataSheet) + fin_index = ValidateAttr(field='fin_index', type=FinancialIndex) + cc_rating_result = ValidateAttr(field='cc_rating_result', type=dict) + esg_rating_result = ValidateAttr(field='esg_rating_result', type=dict) + update_time = ValidateAttr(field='update_time', type=dict) + + fields_map = { + "cid": "企业ID", + "name": "企业名称", + "industry_l1": "一级行业", + "industry_l2": "二级行业", + "basic_info": "工商信息", + "share_holders": "股东信息", + "main_members": "主要成员", + "balance_sheet": "资产负债表", + "profit_sheet": "利润表", + "income_sheet": "现金流量表", + "appendix_sheet": "财务补充数据", + "fin_index": "财务指标", + "cc_rating_result": "综信评价结果", + "esg_rating_result": "ESG评价结果", + "update_time": "更新时间" + } + + def drag_data_from_tyc_api(self): + """拉取天眼查接口数据""" + + def prepare_company_init_data(self): + """准备企业初始化数据""" diff --git a/CompanyData/routes.py b/CompanyData/CompanyDataRoutes.py similarity index 74% rename from CompanyData/routes.py rename to CompanyData/CompanyDataRoutes.py index b14bd43..aa81308 100644 --- a/CompanyData/routes.py +++ b/CompanyData/CompanyDataRoutes.py @@ -1,5 +1,7 @@ from flask import Blueprint, request +from CompanyData.CompanyDataImpl import CompanyDataImpl +from Utils.ErrorUtil import ReturnConditionCheckFailed from common.utils import verify_token from CompanyData.job_scripts import * @@ -11,25 +13,20 @@ company_route = Blueprint('company', __name__) def basic_info(): """ 初始化企业数据 - 包括 天眼查数据库、企业基本信息 - Parameters: - cid 企业ID - company_name 企业名称 - Returns: - res: desc """ - cid = json.loads(request.data)['cid'] - company_name = json.loads(request.data)['company_name'] - - # 拉取企业数据 - info1 = drag_company_data_request(company_name) - if info1 != "企业数据拉取成功": - return {"info": "数据拉取失败"}, 210 - - # 清洗基本信息数据 - basic_info_etl(cid, company_name) - - return {"info": "企业基本信息初始化成功"} + try: + req = request.json + company_data = CompanyDataImpl() + company_data.cid = req['cid'] + company_data.name = req['company_name'] + company_data.drag_data_from_tyc_api() + company_data.prepare_company_init_data() + return {"info": "企业基本信息初始化成功"} + except ReturnConditionCheckFailed as e: + e.log_error() + return {"info": e.failed_info}, e.status_code + except KeyError: + return {"info": "参数错误"}, 400 @company_route.route('/company_index_etl', methods=['POST']) diff --git a/DBHelper/DBConfig.json b/DBHelper/DBConfig.json new file mode 100644 index 0000000..0e3d1de --- /dev/null +++ b/DBHelper/DBConfig.json @@ -0,0 +1,12 @@ +{ + "MongoDB": { + "tfse_v0.21": "root:UTlC9cCoglD1cI1*@116.63.130.34:27021", + "tyc": "root:gP@DwMSVd5Sh6EiH@116.63.130.34:27019" + }, + "Mysql": { + + }, + "Redis": { + + } +} \ No newline at end of file diff --git a/DBHelper/MongoHelper.py b/DBHelper/MongoHelper.py new file mode 100644 index 0000000..dc25953 --- /dev/null +++ b/DBHelper/MongoHelper.py @@ -0,0 +1,231 @@ +import re +import os +import json + +import gridfs +import pymongo + +from urllib import parse +from bson import ObjectId +from gridfs import GridFS + + +class MongoHelper: + + def __init__(self, param): + """ + param: + type:str + desc: 选择连接哪个MongoDB数据库 + """ + with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f: + db_configs = json.load(f) + this_mongo_cfg = db_configs['MongoDB'][param] + m = re.match('([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg) + parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)), m.group(3)) + self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config)) + + def find_single_column(self, param1, param2, param3, param4): + """ + 查询符合条件的第一条数据的某个指定字段值 + param2: str 数据表 + param3: dict 查询条件 + param4: str 一个指定查询字段 + return: + type: None or dict + desc: 查询结果为空,返回None; 查询结果正常,返回查询结果的第一条数据; + """ + collection = self.client[param1][param2] + column = {**{'_id': False}, **{param4: 1}} + record = list(collection.find(param3, column)) + return None if record == [] else record[0][param4] + + def find_single_data(self, param1, param2, param3, param4): + """ + 查询符合条件的第一条数据 + param1: str 数据库 + param2: str 数据表 + param3: dict 查询条件 + param4: list 查询字段 + return: + type: bool or dict + desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据; + """ + collection = self.client[param1][param2] + columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} + record = list(collection.find(param3, columns)) + return False if record == [] else record[0] + + def find_single_data_with_single_sort(self, param1, param2, param3, param4, param5): + """ + 查询符合条件的第一条数据,按单个排序条件返回 + param1: str 数据库 + param2: str 数据表 + param3: dict 查询条件 + param4: list 查询字段 + param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回 + return: + type: bool or dict + desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据; + """ + collection = self.client[param1][param2] + columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} + record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(1)) + return False if record == [] else record[0] + + def find_all_data(self, param1, param2, param3, param4): + """ + 查询符合条件的所有数据 + param1: str 数据库 + param2: str 数据表 + param3: dict 查询条件 + param4: list 查询字段 + return: + type: list + desc: 查询结果 + """ + collection = self.client[param1][param2] + columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} + record = list(collection.find(param3, columns)) + return record + + def find_all_data_with_count(self, param1, param2, param3): + """ + 查询所有符合条件的数据,并返回统计数量 + param1: str 数据库 + param2: str 数据表 + param3: str 查询条件 + return: int 符合条件的数据数量 + """ + collection = self.client[param1][param2] + num = collection.find(param3).count() + return num + + def find_all_data_with_single_sort(self, param1, param2, param3, param4, param5): + """ + 查询符合条件的数据,按单个排序条件返回 + param1: str 数据库 + param2: str 数据表 + param3: dict 查询条件 + param4: list 查询字段 + param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回 + return: + type: bool or dict + desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据; + """ + collection = self.client[param1][param2] + columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} + record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0])) + return False if record == [] else record + + def find_data_with_aggregate(self, param1, param2, param3): + """ + 根据聚合条件查询 + param1: str 数据库 + param2: str 数据集 + param3: + type: list + desc: 聚合条件 + demo: [{'$match':{'price':{'$gte':50}}}, {'$group': {'_id': "$fName", 'count': {'$sum': 1}}}] + """ + collection = self.client[param1][param2] + data = list(collection.aggregate(param3)) + return data + + def find_data_by_page_with_sort(self, param1, param2, param3, param4, param5, param6, param7): + """ + 根据聚合翻页查询,且按照需求字段排序返回 + param1: str 数据库 + param2: str 数据集 + param3: dict 查询条件 + param4: list 显示字段 + param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回 + param6: int 即 page_size 每页数据条数 + param7: int 即 page_no 当前页码 + """ + collection = self.client[param1][param2] + columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} + page_size = int(param6) + page_no = int(param7) + skip_num = page_size * (page_no - 1) + record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(page_size).skip(skip_num)) + return False if record == [] else record + + def insert_single_data(self, param1, param2, param3): + """ + 插入一条数据 + param1: str 数据库 + param2: str 数据集 + param3: obj 插入数据 + return: None + """ + collection = self.client[param1][param2] + collection.insert_one(param3) + + def upsert_single_data(self, param1, param2, param3, param4): + """ + 插入单条数据 + param1: str 数据库 + param2: str 数据表 + param3: dict 查询条件 + param4: dict 更新或新插入的数据 + return: + None + """ + collection = self.client[param1][param2] + collection.update_one(param3, {"$set": param4}, upsert=True) + + def update_single_data(self, param1, param2, param3, param4): + """ + 插入单条数据 + param1: str 数据库 + param2: str 数据表 + param3: dict 查询条件 + param4: dict 更新或新插入的数据 + return: + None + """ + collection = self.client[param1][param2] + collection.update_one(param3, {"$set": param4}) + + def delete_single_data(self, param1, param2, param3): + """ + 根据查询条件删除一条文档 + param1: str 数据库 + param2: str 数据集 + param3: obj 查询条件 + return: None + """ + collection = self.client[param1][param2] + collection.delete_one(param3) + return True + + def find_file(self, param1, param2, param3): + """ + 读取一个文件 + param1: str 数据库 + param2: str 存储桶 + param3: str 文件id + return: + type: binary? + desc: 二进制文件流 + """ + try: + # 实例化一个文件存储器 + gfs = GridFS(self.client[param1], collection=param2) + # 二进制读取文件 + data_stream = gfs.get(ObjectId(param3)).read() + # 返回文件二进制流 + return data_stream + except gridfs.errors.NoFile: + return False + + def delete_file(self, param1, param2, param3): + """ + 根据id删除文件 + param1: str 数据库 + param2: str 存储桶 + param3: str 文件fid + """ + fs = GridFS(self.client[param1], param2) + fs.delete(ObjectId(param3)) diff --git a/DBHelper/__init__.py b/DBHelper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/DBInitial/demo.py b/DBInitial/demo.py index 8c2bf88..e8a1aca 100644 --- a/DBInitial/demo.py +++ b/DBInitial/demo.py @@ -1,3 +1,5 @@ +import time + import pymongo HOST = "116.63.130.34" @@ -8,26 +10,120 @@ PORT = "27021" mongo_client = pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format(USER, PASS, HOST, PORT)) +class CompanyData: + + def __init__(self): + self.cid = None + self.name = None + self.industry_l1 = None + self.basic_info = {} + self.share_holders = [] + self.main_members = [] + self.balance_sheet = {} + self.profit_sheet = {} + self.income_sheet = {} + self.appendix_sheet = {} + self.fin_index = {} + self.cc_rating_result = {} + self.esg_rating_result = {} + self.update_time = { + "工商信息": None, + "财务信息": None, + "综信评价": None, + "ESG评价": None + } + + def dict_to_save(self): + _dict_ = { + "企业ID": self.cid, + "企业名称": self.name, + "一级行业": self.industry_l1, + "工商信息": self.basic_info, + "股东信息": self.share_holders, + "主要成员": self.main_members, + "资产负债表": self.balance_sheet, + "利润表": self.profit_sheet, + "现金流量表": self.income_sheet, + "财务补充数据": self.appendix_sheet, + "财务指标": self.fin_index, + "综信评价结果": self.cc_rating_result, + "ESG评价结果": self.esg_rating_result, + "更新时间": { + "工商信息": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "财务信息": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "综信评价": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "ESG评价": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + } + } + + return _dict_ + + def all_data(): collection = mongo_client['应用端']['企业用户'] + col_basic_info = mongo_client['企业数据']['基本工商信息'] - col_tyc_industry = mongo_client['模型端']['天眼查公司行业分类'] - data = list(collection.find({}, {'_id': False})) + col_fin_data = mongo_client['企业数据']['基本财务数据'] + col_cc_result = mongo_client['综信评价数据']['评价结果'] + col_esg_result = mongo_client['ESG评价数据']['评价结果'] + + records = list(collection.find({}, {'_id': False})) ### - for d in data: - cid = d['企业ID'] - try: - tyc_industry_l2 = list(col_basic_info.find({"企业ID": cid}, {'_id': False, "工商信息.行业": 1}))[0]["工商信息"]['行业'] - yd_industry_l1 = list(col_tyc_industry.find({"二级行业": tyc_industry_l2}, {'_id': False, "远东_一级行业": 1}))[0][ - "远东_一级行业"] - collection.update_one({"企业ID": cid}, {"$set": {"一级行业": yd_industry_l1}}, upsert=True) - except Exception: - pass + for data in records: + company_data = CompanyData() + cid = data['企业ID'] + basic_info = list(col_basic_info.find({"企业ID": cid}, {'_id': False})) + fin_info = list(col_fin_data.find({"企业ID": cid}, {'_id': False}).sort('报告期', -1).limit(1)) + cc_result = list(col_cc_result.find({"企业ID": cid}, {'_id': False})) + esg_result = list(col_esg_result.find({"企业ID": cid}, {'_id': False})) + + company_data.cid = cid + company_data.name = data['企业名称'] + company_data.industry_l1 = data['一级行业'] + + if basic_info: + company_data.basic_info = basic_info[0]['工商信息'] + company_data.share_holders = basic_info[0]['股东信息'] + company_data.main_members = basic_info[0]['主要成员'] + + if fin_info: + company_data.balance_sheet = fin_info[0]['资产负债表'] + company_data.profit_sheet = fin_info[0]['利润表'] + company_data.income_sheet = {} + company_data.appendix_sheet = fin_info[0]['补充数据表'] + company_data.fin_index = fin_info[0]['财务指标'] + + if cc_result: + company_data.cc_rating_result = { + "评价ID": cc_result[0]["评价ID"], + "信用等级": cc_result[0]['信用等级'], + "评价年度": cc_result[0]["评价年度"], + "信用评分": cc_result[0]['信用评分'], + "经营评分": cc_result[0]["经营评分"], + "财务评分": cc_result[0]["财务评分"], + "风险评分": cc_result[0]["风险评分"], + "评价时间": cc_result[0]["评价时间"], + } + + if esg_result: + company_data.esg_rating_result = { + "评价ID": esg_result[0]["评价ID"], + "评价等级": esg_result[0]['评价等级'], + "评价年度": esg_result[0]["评价年度"], + "ESG得分": esg_result[0]['ESG得分'], + "环境得分": esg_result[0]["环境得分"], + "社会得分": esg_result[0]["社会得分"], + "公司治理得分": esg_result[0]["公司治理得分"], + "评价时间": esg_result[0]["评价时间"], + } + + col = mongo_client['企业数据']['企业数据_更新汇总'] + col.update_one({"企业ID": company_data.cid}, {"$set": company_data.dict_to_save()}, upsert=True) ### if __name__ == '__main__': - all_data() + all_data() \ No newline at end of file diff --git a/Utils/ErrorUtil.py b/Utils/ErrorUtil.py new file mode 100644 index 0000000..575f26b --- /dev/null +++ b/Utils/ErrorUtil.py @@ -0,0 +1,54 @@ +import time +import traceback + +from flask import request + +from DBHelper.MongoHelper import MongoHelper + + +class CheckFailed(RuntimeError): + """检查异常""" + + def __init__(self, failed_info, status_code): + self.failed_info = failed_info # 失败信息 + self.status_code = status_code # 状态码 + + def __str__(self): + return self.failed_info + + def log_error(self): + db = MongoHelper("tfse_v0.21") + + info = { + "ip": request.remote_addr, + "request_info": { + "path": request.path, + "method": request.method, + "headers": request.headers.__str__(), + "args": request.args.__str__(), + "json": request.json.__str__() + }, + "traceback": traceback.format_exc(), + "exception": type(self).__name__, + "is_solved": "no", + "time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + } + + db.insert_single_data( + "日志", + "异常日志", + info + ) + + +class LogConditionCheckFailed(CheckFailed): + """直接记录检查异常""" + + def __init__(self, failed_info, status_code): + self.failed_info = failed_info # 失败信息 + self.status_code = status_code # 状态码 + self.log_error() + + +class ReturnConditionCheckFailed(CheckFailed): + """条件检查失败 抛出异常 接口返回失败原因和状态码""" diff --git a/Utils/ObjUtil.py b/Utils/ObjUtil.py new file mode 100644 index 0000000..a31e258 --- /dev/null +++ b/Utils/ObjUtil.py @@ -0,0 +1,33 @@ + + +class SpecObject(object): + """自定义类""" + + fields_map = {} + + def dict_to_show(self, **kwargs): + """显示对象""" + + def dict_to_save(self, **kwargs): + """存储对象""" + _dict_ = dict() + + for key in self.__dict__.keys(): + if type(self.__dict__[key]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']: + _dict_[self.fields_map[key]] = self.__dict__[key] + elif type(self.__dict__[key]).__name__ == 'list': + if len(self.__dict__[key]) == 0: + _dict_[self.fields_map[key]] = self.__dict__[key] + elif type(self.__dict__[key][0]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']: + _dict_[self.fields_map[key]] = self.__dict__[key] + else: + _dict_[self.fields_map[key]] = [item.dict_to_save() for item in self.__dict__[key]] + elif self.__dict__[key] is None: + _dict_[self.fields_map[key]] = self.__dict__[key] + else: + _dict_[self.fields_map[key]] = self.__dict__[key].dict_to_save() + + if 'columns' in kwargs: + _dict_ = {key: _dict_[key] for key in kwargs['columns']} + + return _dict_ diff --git a/Utils/ValidateUtil.py b/Utils/ValidateUtil.py new file mode 100644 index 0000000..7a58e56 --- /dev/null +++ b/Utils/ValidateUtil.py @@ -0,0 +1,92 @@ +import re + +from Utils.ErrorUtil import ReturnConditionCheckFailed + + +class Validate(object): + + @staticmethod + def email(param): + """""" + regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$" + case = (len(param) > 7) and (re.match(regex, param) is not None) + result = True if case else False + return result + + @staticmethod + def password(param): + """""" + regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$" + case = (len(param) >= 8) and (re.match(regex, param) is not None) + return True if case else False + + @staticmethod + def image(param): + """""" + mimetype = param.mimetype + return True if mimetype in ['image/jpeg', 'image/png'] else False + + +class ValidateAttr(object): + + def __init__(self, **kwargs): + """""" + self.kwargs = kwargs + self.error_info = kwargs['error_info'] if 'error_info' in kwargs else ('{}异常'.format(kwargs['mark']) if 'mark' in kwargs else ('{}异常'.format(self.__dict__['fields_map'][self.kwargs['field']]) if 'fields_map' in self.__dict__ else None)) + self.error_code = kwargs['error_code'] if 'error_code' in kwargs else 200 + + def __get__(self, instance, owner): + if self.kwargs['field'] in instance.__dict__: + return instance.__dict__[self.kwargs['field']] + + def __set__(self, instance, value): + """""" + + def not_default(): + """无默认值时属性设值检查""" + + if 'type' in self.kwargs: + """检查实例的属性类型""" + if not isinstance(value, self.kwargs['type']): + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + + if 'length' in self.kwargs: + """检查实例的属性值长度(一般是str类型)""" + if len(value) != self.kwargs['length']: + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + + if 'in_list' in self.kwargs: + """检查实例属性是否包含于列表中(属性有列表和非列表两种情况)""" + if type(value) is not list: + if value not in self.kwargs['in_list']: + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + else: + for item in value: + if item not in self.kwargs['in_list']: + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + + if 'instance_list' in self.kwargs: + """检查实例列表""" + if type(value) is not list: + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + else: + for item in value: + if not isinstance(item, self.kwargs['instance_list']): + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + + if 'func' in self.kwargs: + """属性检查函数""" + if not list(map(self.kwargs['func'], [value]))[0]: + raise ReturnConditionCheckFailed(self.error_info, self.error_code) + + instance.__dict__[self.kwargs['field']] = value + + if 'default' in self.kwargs: + """实例属性默认值""" + if value is None: + value = self.kwargs['default'] + instance.__dict__[self.kwargs['field']] = value + else: + not_default() + else: + not_default() diff --git a/Utils/__init__.py b/Utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app.py b/app.py index e6bb5c1..b0468c6 100644 --- a/app.py +++ b/app.py @@ -1,7 +1,7 @@ from flask import Flask from flask_cors import * -from CompanyData.routes import company_route +from CompanyData.CompanyDataRoutes import company_route from RatingData.routes import rating_route app = Flask(__name__)