import json import os import re import pandas as pd import requests import xlrd import cpca from DBHelper.MongoHelperInstance import DB_GUA, DB_TYC from Modules.Company.CompanyObject import BasicBusinessInfo, FinancialData, CustomerInfo, RegionalDistribution, \ IndustryDistribution, GuaranteedBalanceDistribution, GuaranteeInfo from Modules.Company.CompanyUtils import CompanyUtils, ExcelSheetParser from Modules.Company.static.province_map import p_map from Utils.ObjUtil import SpecObject class BasicBusinessInfoImpl(object): @staticmethod def parse_excel_and_create_company(file): """解析excel模板数据 创建公司信息""" new_cid = CompanyUtils.make_new_cid() # work_book = xlrd.open_workbook(file_contents=file.read()) work_book = xlrd.open_workbook(file) basic_business_info = BasicBusinessInfo() financial_data_list = list() customer_info_list = list() guarantee_info_list = list() records_dict = dict() region_distribution_list = list() industry_distribution_list = list() guarantee_distribution_list = list() for sheet in work_book.sheets(): basic = ExcelSheetParser(sheet=work_book.sheet_by_name('企业信息')).parse_sheet1() if sheet.name == '企业信息': def get_basic(name): # 获取tyc工商信息 def basic_info_api(): """工商信息接口""" url = "http://api.fecribd.com/api/tyc/basic_info" headers = {'token': 'uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM'} parameter = {"企业名称": name} res = requests.post(url=url, headers=headers, data=json.dumps(parameter)) return res record = DB_TYC.find_single_data( '公司背景', '基本信息', {"企业名称": name}, ['企业名称', '基本信息'] ) if not record: res_ = basic_info_api() if res_.status_code == 200: record = DB_TYC.find_single_data( '公司背景', '基本信息', {"企业名称": name}, ['企业名称', '基本信息'] ) return record basic_data = get_basic(basic['企业名称']) basic_business_info.cid = new_cid basic['企业类型'] = basic_data['基本信息']['companyOrgType'] basic['联系电话'] = str(basic['联系电话']) if isinstance(basic['联系电话'], float) else basic['联系电话'] basic['统一社会信用代码'] = basic_data['基本信息']['companyOrgType'] basic['企业性质'] = '国有企业' if '国企' in basic_data['基本信息']['tags'] else '民营企业' try: basic['注册资本(万元)'] = float(basic['注册资本(万元)']) if basic['注册资本(万元)'] else float( (basic_data['基本信息']['regCapital']).replace('万人民币', '')) basic['实缴资本(万元)'] = float(basic['实缴资本(万元)']) if basic['实缴资本(万元)'] else float( (basic_data['基本信息']['actualCapital']).replace('万人民币', '')) except AttributeError: basic['注册资本(万元)'] = None basic['实缴资本(万元)'] = None basic['参保人数'] = int(basic['参保人数']) if basic['参保人数'] else basic['参保人数'] basic_business_info.set_instance(data=basic) elif sheet.name == '高管股东信息': def get_shareholders(name): # 获取tyc股东信息 def basic_info_api(): """工商信息接口""" url = "http://api.fecribd.com/api/tyc/shareholders_info" headers = {'token': 'uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM'} parameter = {"企业名称": name} res = requests.post(url=url, headers=headers, data=json.dumps(parameter)) return res basic_info = DB_TYC.find_single_data( '公司背景', '基本信息', {"企业名称": name}, ['基本信息'] ) share_list = list() if '上市' in basic_info['基本信息']['companyOrgType']: shareholder_info = DB_TYC.find_single_data( '公司背景', '十大股东', {"企业名称": name}, ['十大股东'] ) if not shareholder_info: res_ = basic_info_api() if res_.status_code == 200: shareholder_info = DB_TYC.find_single_data( '公司背景', '十大股东', {"企业名称": name}, ['十大股东'] ) # 遍历数据修改格式 if shareholder_info['十大股东']: for share in shareholder_info['十大股东']['holderList']: share_dict = dict() share_dict['股东名称'] = share['name'] share_dict['持股比例'] = share['proportion'] share_dict['股东性质'] = '-' share_dict['持股数量(股)'] = share['holdingNum'] share_list.append(share_dict) else: shareholder_info = DB_TYC.find_single_data( '公司背景', '企业股东', {"企业名称": name}, ['企业股东'] ) if not shareholder_info: res_ = basic_info_api() if res_.status_code == 200: shareholder_info = DB_TYC.find_single_data( '公司背景', '企业股东', {"企业名称": name}, ['企业股东'] ) # 遍历数据修改格式 for share in shareholder_info['企业股东']['result']: share_dict = dict() share_dict['股东名称'] = share['name'] share_dict['持股比例'] = share['capital'][0]['percent'] share_dict['股东性质'] = '-' share_dict['持股数量(股)'] = share['capital'][0]['amomon'] share_list.append(share_dict) return share_list def get_education(name): """ 获取企业员工学历信息 1.根据名称获取该公司对应数据文档 2.获取年份最大的excel 3.获取机构人员情况sheet 4.获取其实学历信息 """ edu_dict = dict() a_path = f'E:\Project\guarantee-admin-api-v0.2\Modules\Company\补充数据' path = os.listdir(a_path) for p in path: company = re.sub(r'[0-9]+.', '', p) if company == name: f_path = a_path + '\\' + p c_path = os.listdir(f_path) file_path = f_path + '\\' + c_path[-1] edu_data = pd.read_excel(file_path, sheet_name='机构人员情况') edu_data = edu_data.dropna(axis=1, how='all') edu_data = edu_data.dropna(axis=0, how='any') edu_data.loc[:, '融资担保公司及人员情况'] = edu_data['融资担保公司及人员情况'].str.strip() edu_data = edu_data.set_index('融资担保公司及人员情况') edu_dict['本科'] = int(edu_data.loc['本科', 'Unnamed: 2']) edu_dict['研究生'] = int(edu_data.loc['其中:研究生', 'Unnamed: 2']) edu_dict['大专及以下'] = int(edu_data.loc['大专及以下', 'Unnamed: 2']) return edu_dict tyc_shareholders = get_shareholders(basic['企业名称']) employee_education = get_education(basic['企业名称']) executives, shareholders = ExcelSheetParser(sheet=sheet).parse_sheet2() shareholders = shareholders if shareholders else tyc_shareholders basic_business_info.shareholder_information = list() basic_business_info.executive_information = list() for item in executives: executive = SpecObject.set_specify_instance( instance=BasicBusinessInfo.ExecutiveInformation, data=item ) basic_business_info.executive_information.append(executive) for item in shareholders: shareholder = SpecObject.set_specify_instance( instance=BasicBusinessInfo.ShareholderInformation, data=item ) basic_business_info.shareholder_information.append(shareholder) basic_business_info.employee_education = SpecObject.set_specify_instance( instance=BasicBusinessInfo.EmployeeEducation, data=employee_education ) elif sheet.name == '客户信息': # 企业信息 data = ExcelSheetParser(sheet=work_book.sheet_by_name('企业信息')).parse_sheet1() # 客户信息 client_info = ExcelSheetParser(sheet=work_book.sheet_by_name('客户信息')).parse_sheet4() # 区域分布 area_info = ExcelSheetParser(sheet=work_book.sheet_by_name('区域分布')).parse_sheet5() # 行业分布 industry_info = ExcelSheetParser(sheet=work_book.sheet_by_name('行业分布')).parse_sheet5() # 担保金额分布 guarantee_amount = ExcelSheetParser(sheet=work_book.sheet_by_name('担保金额分布')).parse_sheet6() def tyc_basic_info(): """调用天眼查api保存工商信息""" def tyc_api(name): """天眼查接口调用""" url = "http://api.fecribd.com/api/tyc/basic_info" headers = {'token': 'uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM'} parameter = {"企业名称": name} res = requests.post(url=url, headers=headers, data=json.dumps(parameter)) return res for client in client_info: company = client['客户名称'] # 查询数据是否存在 record_ = DB_TYC.find_single_data( '公司背景', '基本信息', {"企业名称": company}, ['企业名称', '基本信息'] ) if not record_: # 不存在则调用接口抓取 result = tyc_api(company) if result.status_code == 200: record = DB_TYC.find_single_data( '公司背景', '基本信息', {"企业名称": company}, ['企业名称', '基本信息'] ) records_dict[record['企业名称']] = record['基本信息'] else: records_dict[record_['企业名称']] = record_['基本信息'] def history_data(): """历史数据""" # 客户信息 for client in client_info: client['在保余额'] = None if isinstance(client['在保余额'], str) else client['在保余额'] customer_info = CustomerInfo() customer_info.cid = new_cid customer_info.company_name = basic_business_info.company_name customer_info.set_instance(data=client) customer_info_list.append(customer_info) # 区域分布 for area in area_info: region_distribution = RegionalDistribution() region_distribution.cid = new_cid region_distribution.company_name = basic_business_info.company_name region_distribution.set_instance(data=area) region_distribution_list.append(region_distribution) # 行业分布 for industry in industry_info: industry_distribution = IndustryDistribution() industry_distribution.cid = new_cid industry_distribution.company_name = basic_business_info.company_name industry_distribution.set_instance(data=industry) industry_distribution_list.append(industry_distribution) # 担保金额分布 for amount in guarantee_amount: guarantee_distribution = GuaranteedBalanceDistribution() guarantee_distribution.cid = new_cid guarantee_distribution.company_name = basic_business_info.company_name guarantee_distribution.report_period = amount["报告期"] guarantee_distribution.guarantee_account = SpecObject.set_specify_instance( instance=GuaranteedBalanceDistribution.GuaranteeBalance, data=amount["担保户数"] ) guarantee_distribution.guarantee_number = SpecObject.set_specify_instance( instance=GuaranteedBalanceDistribution.GuaranteeBalance, data=amount["担保笔数"] ) guarantee_distribution.guarantee_liability_balance = SpecObject.set_specify_instance( instance=GuaranteedBalanceDistribution.GuaranteeBalance, data=amount["担保责任余额"] ) guarantee_distribution_list.append(guarantee_distribution) def guarantee_data(): """担保信息""" for info in client_info: guarantee_info = GuaranteeInfo() guarantee_info.cid = new_cid guarantee_info.guarantee_id = CompanyUtils.make_new_cid() guarantee_info.warrantee = info['客户名称'] guarantee_info.guarantee_type = '融资担保' guarantee_info.guarantee_method = '银行借款担保' guarantee_info.guarantee_balance = None if isinstance(info['在保余额'], str) else info['在保余额'] guarantee_info.release_amount = 0 guarantee_info.liability_guarantee_balance = info['融资担保责任余额'] guarantee_info.guarantee_start_date = info['担保起始日'] guarantee_info.guarantee_end_date = info['担保截止日'] guarantee_info.remark = None try: guarantee_info.project_name = records_dict[info['客户名称']]['alias'] + ( info['担保起始日'].replace('-', '')) guarantee_info.industry = records_dict[info['客户名称']]['industry'] if '公司' in info['客户名称']: loc_list = list() loc_list.append(info['客户名称']) province = records_dict[info['客户名称']]['base'] if province == '': guarantee_info.province = None else: guarantee_info.province = [v for k, v in p_map.items() if k == province][0] guarantee_info.city = records_dict[info['客户名称']]['city'] else: guarantee_info.province = data['所在省份'] guarantee_info.city = data['所在城市'] except KeyError: guarantee_info.project_name = info['客户名称'] + ( info['担保起始日'].replace('-', '')) if '公司' in info['客户名称']: loc_list = list() loc_list.append(info['客户名称']) df = cpca.transform(loc_list) guarantee_info.province = df.loc[0, '省'] guarantee_info.city = df.loc[0, '市'] else: guarantee_info.province = data['所在省份'] guarantee_info.city = data['所在城市'] guarantee_info_list.append(guarantee_info) def __main__(): tyc_basic_info() history_data() guarantee_data() __main__() elif sheet.name == '资产负债表': # 数据源 balance = ExcelSheetParser(sheet=work_book.sheet_by_name('资产负债表')).parse_sheet7() income = ExcelSheetParser(sheet=work_book.sheet_by_name('利润表')).parse_sheet8() appendix = ExcelSheetParser(sheet=work_book.sheet_by_name('补充数据表')).parse_sheet3() length = len(balance) def balance_script(sheet_data): """资产负债表""" # 各类对象key bs_obj = financial_data.BalanceSheet() current_assets_dict = [_item for _item in bs_obj.CurrentAssets.fields_map.values()] non_current_assets_dict = [_item for _item in bs_obj.NonCurrentAssets.fields_map.values()] current_liabilities_dict = [_item for _item in bs_obj.CurrentLiabilities.fields_map.values()] non_current_liabilities_dict = [_item for _item in bs_obj.NonCurrentLiabilities.fields_map.values()] owner_equity_dict = [_item for _item in bs_obj.OwnerEquity.fields_map.values()] # 资产负债表分组 current_assets_dict_ = dict() non_current_assets_dict_ = dict() current_liabilities_dict_ = dict() non_current_liabilities_dict_ = dict() owner_equity_dict_ = dict() for key, value in sheet_data.items(): if key in current_assets_dict: current_assets_dict_[key] = value elif key in non_current_assets_dict: non_current_assets_dict_[key] = value elif key in current_liabilities_dict: current_liabilities_dict_[key] = value elif key in non_current_liabilities_dict: non_current_liabilities_dict_[key] = value elif key in owner_equity_dict: owner_equity_dict_[key] = value bs_obj.current_assets = SpecObject.set_specify_instance( instance=bs_obj.CurrentAssets, data=current_assets_dict_ ) bs_obj.non_current_assets = SpecObject.set_specify_instance( instance=bs_obj.NonCurrentAssets, data=non_current_assets_dict_ ) bs_obj.total_assets = sheet_data['资产总计'] bs_obj.current_liabilities = SpecObject.set_specify_instance( instance=bs_obj.CurrentLiabilities, data=current_liabilities_dict_ ) bs_obj.non_current_liabilities = SpecObject.set_specify_instance( instance=bs_obj.NonCurrentLiabilities, data=non_current_liabilities_dict_ ) bs_obj.total_liabilities = sheet_data['负债合计'] bs_obj.owner_equity = SpecObject.set_specify_instance( instance=bs_obj.OwnerEquity, data=owner_equity_dict_ ) return bs_obj def income_script(sheet_data): """利润表""" sheet_data.pop('报告期') ic_obj = SpecObject.set_specify_instance( instance=financial_data.IncomeSheet, data=sheet_data ) return ic_obj def appendix_script(sheet_data): """补充数据表""" sheet_data.pop('报告期') ap_obj = SpecObject.set_specify_instance( instance=financial_data.AppendixSheet, data=sheet_data ) return ap_obj for num in range(length): # 财务数据obj financial_data = FinancialData() financial_data.cid = new_cid financial_data.company_name = basic_business_info.company_name financial_data.report_period = balance[num]['报告期'] financial_data.balance_sheet = balance_script(balance[num]) financial_data.income_sheet = income_script(income[num]) financial_data.appendix_sheet = appendix_script(appendix[num]) financial_data_list.append(financial_data) DB_GUA.insert_single_data( "企业数据", "工商信息", basic_business_info.fields_toggle() ) DB_GUA.insert_many_data( "企业数据", "财务数据", [item.fields_toggle() for item in financial_data_list] ) if guarantee_info_list: DB_GUA.insert_many_data( "企业数据", "担保数据", [item.fields_toggle() for item in guarantee_info_list] ) if customer_info_list: DB_GUA.insert_many_data( "历史数据", "客户信息", [item.fields_toggle() for item in customer_info_list] ) if region_distribution_list: DB_GUA.insert_many_data( "历史数据", "担保区域分布", [item.fields_toggle() for item in region_distribution_list] ) if industry_distribution_list: DB_GUA.insert_many_data( "历史数据", "担保行业分布", [item.fields_toggle() for item in industry_distribution_list] ) if guarantee_distribution_list: DB_GUA.insert_many_data( "历史数据", "担保金额分布", [item.fields_toggle() for item in guarantee_distribution_list] ) if __name__ == '__main__': for root_dir, sub_dir, files in os.walk(r"E:\Project\text"): for file in files: if file.endswith(".xlsx"): company_name = re.sub(r'[0-9]+.', '', file) company_name = company_name.replace('.xlsx', '') record_name = DB_GUA.find_single_data( '企业数据', '工商信息', {'企业名称': company_name}, ['企业ID'] ) short_name = DB_GUA.find_single_data( '企业数据', '工商信息', {'企业简称': company_name}, ['企业ID'] ) if record_name or short_name: print('数据已存在', file) continue else: file_name = os.path.join(root_dir, file) impl = BasicBusinessInfoImpl() impl.parse_excel_and_create_company(file_name) print('储存完成', file) else: continue