wd-smebiz/mods/smebiz_rate/company_rate/crud.py

237 lines
7.8 KiB
Python

import datetime
import json
import sys
from copy import deepcopy
from typing import List
from fastapi import HTTPException
from sqlalchemy.orm import Session
from context.common import tianyancha_api
from utils.data_utils import JsDict
from utils.print_utils import print_error
from . import schemas
from .models import CompanyRate
from ..form_file import crud as FormFileCrud
from .pytypes.BjRateRes import BjRateRes
from .rate_utils.utils import load_api_data, load_question_excel, sme_rate_interface, pre_process_data, RateRes
bj_model_dict = {}
man_data_dict = {}
def load_bj_model_dict():
data = json.load(open('static/bj_model.json', 'rb'))
for item in data['model']['columns']:
bj_model_dict[item['key']] = None
for item in data['尽调核实数据']['定性指标']['columns']:
man_data_dict[item['key']] = None
bj_model_dict[item['key']] = None
# load_bj_model_dict()
def company_rate_add(db: Session, data: schemas.CompanyRateAddInfo):
item = CompanyRate(**data.dict())
item.file_data_state = "未上传"
item.rate_state = '未评级'
item.publish_state = '未披露'
db.add(item)
db.commit()
db.refresh(item)
marge_data_to_db(db, item.id)
db.refresh(item)
return item
def company_rate_file_data_load(db: Session, item_id, file_md):
item = db.query(CompanyRate).filter(CompanyRate.id == item_id).first()
file_info = FormFileCrud.file_get(db, file_md)
wj_data = None
try:
file_path = file_info.file_url
# wj_data = json.dumps(parse_file_data_interface(file_path[1:]))
wj_data = json.dumps(load_question_excel(file_path[1:]), ensure_ascii=False)
except Exception as e:
print_error(e)
sys.stderr.write(str(e))
raise HTTPException(detail='文件解析错误,请重新上传', status_code=301)
item.wj_data = wj_data
item.man_data = deepcopy(wj_data)
item.file_md = file_md
item.file_data_state = "已就绪"
db.commit()
return item
def company_rate_delete(db: Session, item_id: int):
db.query(CompanyRate).filter_by(id=item_id).delete()
db.commit()
def company_rate_update(db: Session, data: schemas.CompanyRateUpdateInfo):
db.query(CompanyRate).filter_by(id=data.id).update({key: v for key, v in data.dict().items() if v is not None})
db.commit()
item = db.query(CompanyRate).filter_by(id=data.id).first()
marge_data_by_item(db, item)
db.commit()
db.refresh(item)
return item
def company_rate_get(db: Session, item_id: int):
item = db.query(CompanyRate).filter_by(id=item_id).first()
return item
def company_rate_query(db: Session, params: schemas.CompanyRateQuery):
params_dict = params.dict()
query = db.query(CompanyRate)
db_model = CompanyRate
for key, value in params_dict.items():
if key not in ['page', 'page_size'] and value is not None:
if key == 'rate_level':
query = query.filter(getattr(db_model, key) == value)
continue
if type(value) == str:
query = query.filter(getattr(db_model, key).like(f'%{value}%'))
elif type(value) in [int, float, bool]:
query = query.filter_by(**{key: value})
else:
query = query.filter(getattr(db_model, key) == value)
count = query.count()
page = None
page_size = None
if 'page' in params_dict:
page = params_dict['page']
if 'page_size' in params_dict:
page_size = params_dict['page_size']
# 页数不超过100
if page is not None and page_size is not None:
page_size = min(page_size, 100)
query = query.offset((page - 1) * page_size).limit(page_size).all()
return count, query
def json_parse(data):
parse_data = {}
try:
if data:
parse_data = json.loads(data)
except Exception as e:
print(e)
raise HTTPException(detail="数据解析错误", status_code=301)
finally:
return parse_data
def load_api_data_to_db(db: Session, item_id):
item = company_rate_get(db, item_id)
if item:
item.api_data = json.dumps(load_api_data(item.company_name))
marge_data_by_item(db, item)
item.api_data_state = "已就绪"
db.commit()
db.refresh(item)
return item
def marge_data_to_db(db: Session, item_id):
"""
合并数据
"""
item = company_rate_get(db, item_id)
if item:
marge_data_by_item(db, item)
def check_none_in_dict_recursive(dictionary):
for key, value in dictionary.items():
if value is None:
return True
elif isinstance(value, dict):
if check_none_in_dict_recursive(value):
return True
return False
def marge_data_by_item(db: Session, item):
# wj_data = json_parse(item.wj_data)
api_data = json_parse(item.api_data)
# man_data = json_parse(item.man_data)
# mgr_data = {**bj_model_dict, **wj_data, **api_data, **{k: v for k, v in man_data.items() if v is not None}}
# item.mgr_data = json.dumps(mgr_data)
mgr_data_state = '已就绪'
item.api_data_state = '未就绪' if check_none_in_dict_recursive(api_data) else "已就绪"
# item.man_data_state = '未就绪' if check_none_in_dict_recursive(man_data) else "已就绪"
item.mgr_data_state = "已就绪" if (item.api_data_state == '已就绪' and item.file_data_state == '已就绪') else "未就绪"
db.commit()
# {
# "message": "计算成功",
# "data": {
# "评级结果": {
# "级别": "AA",
# "总分": 71.03
# },
# "指标数值与得分": [
# {
# "指标": "经营异常",
# "数值": 0,
# "单位": "条",
# "权重": 2.5,
# "得分": 2.5
# },
def company_rate(db: Session, item_id):
item = company_rate_get(db, item_id)
if item.mgr_data_state == '已就绪':
man_data = json.loads(item.man_data)
经营问卷 = man_data["经营问卷"]
财务问卷 = man_data["财务问卷"]
背调接口 = json.loads(item.api_data)
data = pre_process_data(经营问卷, 财务问卷, 背调接口)
data = sme_rate_interface(data)
data: RateRes = JsDict(data)
if data:
# mgr_data = json.loads(item.mgr_data)
# tzx_dic = cal_tiaozhengxiang(mgr_data)
# data.content['调整项'] = tzx_dic
final_scope = data.data.评级结果.总分
# for key in tzx_dic:
# final_scope += tzx_dic[key]['scope']
data.data['final_scope'] = final_scope
level = data.data.评级结果.级别
# if final_scope >= 90:
# level = "AAAʀ"
# elif 80 <= final_scope < 90:
# level = "AAʀ"
# elif 65 <= final_scope < 80:
# level = "Aʀ"
# elif 50 <= final_scope < 65:
# level = "Bʀ"
# else:
# level = "CR"
data.data['final_level'] = level
item.rate_data = json.dumps(data, ensure_ascii=False)
item.rate_level = level
item.rate_time = datetime.datetime.now()
item.rate_state = '已评级'
db.commit()
else:
raise HTTPException(detail='数据不完整,无法评分', status_code=301)
################
def get_full_info(db: Session, item_id: int):
item = company_rate_get(db, item_id)
if not item:
return {}
data = json.loads(item.rate_data)
radar_data = data.get('radar_data', {})
# for key in radar_data['数值']:
# radar_data_new.append({'label': key, 'value': radar_data['数值'][key] / radar_data['阈值'][key] * 100})
baseinfo, _ = tianyancha_api.get('企业基本信息', {'keyword': item.company_name})
return {'baseinfo': baseinfo, 'radar_data': radar_data, 'rate_full_data': item.to_dict()}