tfse-app-api-v0.2/input/input_utils.py

117 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import json
import random
import requests
from input.input_db import FIND_RATING_RECORDS, find_rating_inputs
def get_data_template(template_name):
data_template_path = '/static/template/{}.json'.format(template_name)
with open(os.path.dirname(__file__) + data_template_path, "r", encoding='utf-8') as f:
data_template = json.load(f)
return data_template
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def update_process_status(rid, update_data):
"""
调用更新流程接口
Parameters:
update_data dict 需要更新的数据
Returns:
result str 更新结果
"""
url = "http://api.fecribd.com/etl_tfse/company/update_process_data"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'}
data = json.dumps({"rid": rid, "types": "综合评价流程", "renew_data": update_data})
requests.post(url, headers=headers, data=data)
def prepare_risk_data(cid, rid, company_name):
# 调用清洗数据接口,执行风险数据清洗程序
url = "http://api.fecribd.com/etl_tfse/rating/risk_data"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'}
data = json.dumps({"company_id": cid, "company_name": company_name, "evaluation_id": rid})
requests.post(url, headers=headers, data=data)
# 风险数据清洗完成后更新流程状态
update_process_status(rid, {"评价流程.清洗风险数据": 1})
def get_current_year():
"""
获取当前年度
Parameters:
-
Returns:
current_year 当前年度
"""
current_year = time.strftime("%Y", time.localtime()) + ""
return current_year
def make_3_report_dates():
"""
生成最近三年报告期 日期
Parameters:
-
Returns:
report_dates 三年报告期
"""
current_year = int(time.strftime("%Y", time.localtime()))-1
date_01 = str(current_year) + '-12-31'
date_02 = str(current_year-1) + '-12-31'
date_03 = str(current_year-2) + '-12-31'
report_dates = [date_01, date_02, date_03]
return report_dates
def gen_new_rid(param):
"""
生成新的评价ID如果该ID存在则重新生成
Parameters:
param: 生成长度
Returns:
new_id: 生成的企业ID
"""
# 生成新ID
new_id = make_id(param)
# 检查新ID是否存在如果存在则继续生成新ID
case = FIND_RATING_RECORDS({"评价ID": new_id}) is []
while case:
new_id = make_id(param)
# 新ID可使用返回新ID
return new_id
def get_company_type(cid):
"""
获取企业填报的企业类型
Parameters:
cid: 企业ID
Returns:
industry
若没有查询到行业 返回空值
"""
# 根据企业ID获取企业当年填报的企业类型
choose_industry = find_rating_inputs({"企业ID": cid, "评价年度": get_current_year()})[0]['行业选择']
if not len(choose_industry) > 1:
return None
else:
return choose_industry[0]