tfse-app-api-v0.2/Implements/Company/Index.py

233 lines
12 KiB
Python

from DBHelper.MongoHelper import MongoHelper
from ObjectsCommon.Company.FECRCompany import BasicBusinessInfo, CCRatingAnalysis, ESGRatingAnalysis
from ObjectsCommon.Industry.FECRIndustry import IndustryAnalysis
from ObjectsInProject.Company.Index import CompanyIndex, ThisFinancialElementsAnalysis, ThisOperatingRiskAnalysis
class CompanyIndexImpl(CompanyIndex):
"""企业主页实现类"""
db = MongoHelper("tfse_v0.21")
def get_head_info(self):
"""企业主要 头部信息"""
verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
company_data = self.db.find_single_data(
"企业数据",
"企业数据_更新汇总",
{"企业ID": self.cid},
["企业名称", "综信评价结果", "ESG评价结果"]
)
head_info = CompanyIndex.HeadInfo()
head_info.name = company_data['企业名称']
head_info.verify_status = verify_status
latest_cc_rating = CompanyIndex.HeadInfo.LatestCCRating()
latest_esg_rating = CompanyIndex.HeadInfo.LatestESGRating()
try:
cc_rating = company_data['综信评价结果']
latest_cc_rating.level = cc_rating['信用等级']
latest_cc_rating.score = str(cc_rating['信用评分'])
latest_cc_rating.rating_time = cc_rating['评价时间']
except KeyError:
latest_cc_rating.level = None
latest_cc_rating.score = None
latest_cc_rating.rating_time = None
try:
esg_rating = company_data['ESG评价结果']
latest_esg_rating.level = esg_rating['评价等级']
latest_esg_rating.score = str(esg_rating['ESG得分'])
latest_esg_rating.rating_time = esg_rating['评价时间']
except KeyError:
latest_esg_rating.level = None
latest_esg_rating.score = None
latest_esg_rating.rating_time = None
head_info.latest_cc_rating = latest_cc_rating
head_info.latest_esg_rating = latest_esg_rating
self.head_info = head_info
def get_business_info(self):
"""企业主页 基本信息"""
data = self.db.find_single_data(
"企业数据",
"企业数据_更新汇总",
{"企业ID": self.cid},
["更新时间", "企业名称", "工商信息", "股东信息", "主要成员"]
)
business_info = BasicBusinessInfo()
business_info.name = data['企业名称']
business_info.update_time = data['更新时间']['工商信息']
business_info.business_info = self.dict_to_set(instance=BasicBusinessInfo.BusinessInfo, data=data['工商信息'])
business_info.share_holder = self.instance_list_to_set(instance=BasicBusinessInfo.ShareHolder, data=data['股东信息'])
business_info.main_member = self.instance_list_to_set(instance=BasicBusinessInfo.MainMember, data=data['主要成员'])
business_info.update_time = data['更新时间']['工商信息']
self.business_info = business_info
def get_industry_analysis(self):
"""企业主页 行业分析"""
industry = self.db.find_single_column(
"企业数据",
"企业数据_更新汇总",
{"企业ID": self.cid},
"一级行业"
)
result = self.db.find_single_data(
"行业数据",
"行业分析",
{"行业": industry},
["行业", "行业简介", "更新时间", "正面因素", "负面因素", "增长趋势", "规模分布"]
)
industry_analysis = IndustryAnalysis()
industry_analysis.name = result['行业']
industry_analysis.introduction = result['行业简介']
industry_analysis.update_time = result['更新时间']
industry_analysis.positive_factor = self.instance_list_to_set(instance=IndustryAnalysis.PositiveFactor, data=result['正面因素'])
industry_analysis.negative_factor = self.instance_list_to_set(instance=IndustryAnalysis.NegativeFactor, data=result['负面因素'])
result['增长趋势']['数据'] = self.instance_list_to_set(instance=IndustryAnalysis.IncreaseTrend.DataItem, data=result['增长趋势']['数据'])
industry_analysis.increase_trend = self.dict_to_set(instance=IndustryAnalysis.IncreaseTrend, data=result["增长趋势"])
result['规模分布']['数据'] = self.instance_list_to_set(instance=IndustryAnalysis.IndustrySize.DataItem, data=result['规模分布']['数据'])
industry_analysis.industry_size = self.dict_to_set(instance=IndustryAnalysis.IndustrySize, data=result["规模分布"])
self.industry_analysis = industry_analysis
def get_cc_rating_analysis(self):
"""综合信用评价"""
result = self.db.find_single_data_with_single_sort(
"企业数据",
"综合信用分析",
{"企业ID": self.cid},
["更新时间", "综合信用表现", "信用分析", "指标表格", "历史级别"],
{"更新时间": -1}
)
if result:
cc_rating_analysis = CCRatingAnalysis()
cc_rating_analysis.update_time = result['更新时间']
performance = CCRatingAnalysis.CreditPerformance()
indicator_radar = CCRatingAnalysis.CreditPerformance.IndicatorRadar()
indicator_radar.index_score = self.dict_to_set(instance=CCRatingAnalysis.CreditPerformance.IndicatorRadar.IndexScore, data=result['综合信用表现']['指标雷达']['指标得分'])
indicator_radar.max_score = self.dict_to_set(instance=CCRatingAnalysis.CreditPerformance.IndicatorRadar.MaxScore, data=result['综合信用表现']['指标雷达']['最大分数'])
performance.indicator_radar = indicator_radar
cc_rating_analysis.performance = performance
cc_rating_analysis.credit_analysis = self.dict_to_set(instance=CCRatingAnalysis.CreditAnalysis, data=result['信用分析'])
financial_indicators = self.dict_to_set(instance=CCRatingAnalysis.IndexTable.FinancialIndicators, data=result['指标表格']['财务指标'])
risk_indicators = self.dict_to_set(instance=CCRatingAnalysis.IndexTable.RiskIndicators, data=result['指标表格']['风险指标'])
operating_indicators = self.dict_to_set(instance=CCRatingAnalysis.IndexTable.OperatingIndicators, data=result['指标表格']['经营指标'])
index_table = cc_rating_analysis.IndexTable()
index_table.financial_indicators = financial_indicators
index_table.risk_indicators = risk_indicators
index_table.operating_indicators = operating_indicators
cc_rating_analysis.index_table = index_table
history_level = self.instance_list_to_set(instance=CCRatingAnalysis.HistoryLevel, data=result['历史级别'])
cc_rating_analysis.history_level = history_level
self.cc_rating_analysis = cc_rating_analysis
def get_esg_rating_analysis(self):
"""ESG评价分析"""
result = self.db.find_single_data_with_single_sort(
"企业数据",
"ESG评价分析",
{"企业ID": self.cid},
["评价ID", "更新日期", "ESG评级", "维度得分情况"],
{"更新日期": -1}
)
esg_rating_analysis = ESGRatingAnalysis()
if result:
esg_rating_analysis.rid = result['评价ID']
esg_rating_analysis.update_time = result["更新日期"]
esg_rating_analysis.esg_rating = self.dict_to_set(instance=ESGRatingAnalysis.EsgRating, data=result['ESG评级'])
esg_rating_analysis.dimension_score = self.instance_list_to_set(instance=ESGRatingAnalysis.DimensionScore, data=result['维度得分情况'])
self.esg_rating_analysis = esg_rating_analysis
def get_financial_analysis(self):
"""财务要素分析"""
result = self.db.find_single_data_with_single_sort(
"企业数据",
"财务要素分析",
{"企业ID": self.cid},
["评价ID", '更新日期', "财报期", "财务得分年度比较", "财务得分同行比较", "财务得分较去年变化", "财务得分较同行差异", "指标详情"],
{"更新日期": -1}
)
if result:
instance = ThisFinancialElementsAnalysis
financial_analysis = instance()
financial_analysis.rid = result['评价ID']
financial_analysis.report_date = result['财报期']
financial_analysis.update_time = result['更新日期']
for item in result['财务得分年度比较']:
item["指标"] = self.dict_to_set(instance=instance.CompareToYears.FinancialIndex, data=item['指标'])
financial_analysis.compare_to_years = self.instance_list_to_set(instance=instance.CompareToYears, data=result['财务得分年度比较'])
result['财务得分同行比较']['公司水平'] = self.dict_to_set(instance=instance.CompareToIndustry.FinancialIndex, data=result['财务得分同行比较']['公司水平'])
result['财务得分同行比较']['平均水平'] = self.dict_to_set(instance=instance.CompareToIndustry.FinancialIndex, data=result['财务得分同行比较']['平均水平'])
financial_analysis.compare_to_industry = self.dict_to_set(instance=instance.CompareToIndustry, data=result['财务得分同行比较'])
financial_analysis.delta_to_years = self.dict_to_set(instance=instance.CompareToYears.FinancialIndex, data=result['财务得分较去年变化'])
financial_analysis.delta_to_industry = self.dict_to_set(instance=instance.CompareToIndustry.FinancialIndex, data=result['财务得分较同行差异'])
financial_analysis.index_details = self.instance_list_to_set(instance=instance.FinancialAnalysisDetails, data=result['指标详情'])
self.financial_analysis = financial_analysis
def get_operating_risk_analysis(self):
"""获取经营风险分析数据"""
result = self.db.find_single_data_with_single_sort(
"企业数据",
"经营风险分析",
{"企业ID": self.cid},
["评价ID", "更新日期", "风险级别", "风险数量", "风险分数", "列入失信名单", "合规风险", "经营风险", "周边风险", "变更记录"],
{"更新日期": -1}
)
if result:
instance = ThisOperatingRiskAnalysis
operating_risk = instance()
operating_risk.rid = result['评价ID']
operating_risk.update_time = result["更新日期"]
operating_risk.risk_level = result['风险级别']
operating_risk.risk_score = result['风险分数']
operating_risk.risk_num = result['风险数量']
operating_risk.in_untrustworthy_list = result['列入失信名单']
operating_risk.compliance_risk = self.instance_list_to_set(instance=instance.InfluenceObj, data=result['合规风险'])
operating_risk.operating_risk = self.instance_list_to_set(instance=instance.InfluenceObj, data=result['经营风险'])
operating_risk.peripheral_risk = self.instance_list_to_set(instance=instance.DetailObj, data=result['周边风险'])
operating_risk.change_records = self.instance_list_to_set(instance=instance.InfluenceObj, data=result['变更记录'])
self.operating_risk_analysis = operating_risk