urban-investment-research/ThirdPartyApi/Yujingtong/Apis.py

295 lines
11 KiB
Python

from datetime import datetime
from typing import List
import requests as rq
from . import Schemas
from Utils.DataUtils import JsDict
from Utils.MongoUtils import MongoConnect
error_code_dic = {
0: "查询成功",
1001: "无权限访问此数据接口",
1002: "URL不存在",
1003: "token异常",
1004: "返回数据为空",
1005: "参数设置错误或缺少参数,请重新设置",
1006: "此数据接口剩余流量不足",
1007: "请求数据的条目超过上限",
1008: "请求超过系统限制",
1009: "接口正在维护中",
1010: "接口已下线停用",
1011: "访问频率过快",
1012: "请求超时",
1013: "网络超时",
1014: "网络错误",
1015: "服务器拒绝请求",
1016: "数据接收失败",
1017: "错误的应答",
1018: "操作系统原因",
1019: "后台服务器不存在",
1020: "后台服务器不可用",
1099: "系统未知错误,请联系技术客服"
}
class YujingtongConfig:
# 公司股东信息获取接口
shareholder_info_url = "https://openapi.qyyjt.cn/getDateInfo/business_info/shareholder_info"
# 主体评级信息获取接口
main_rating_url = "https://openapi.qyyjt.cn/getDateInfo/credit/mainrating"
# 被担保明细接口
guarantor_details_url = "https://openapi.qyyjt.cn/getDateInfo/business_info/guarantordetails"
# 对外担保明细
guarantee_details_url = "https://openapi.qyyjt.cn/getDateInfo/business_info/guaranteedetails"
# 裁判文书
judgement_doc_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/judgement/doc"
# 区域经济
area_economic_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/area/economic"
# 区域财政
area_finance_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/area/finance"
# 区域债务
area_debt_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/area/debt"
# 创投融资
financing_PEVC_url = "https://openapi.qyyjt.cn/getDateInfo/financing/PEVC"
# 股票融资
share_financing_url = "https://openapi.qyyjt.cn/getDateInfo/financing/Share_financing"
# 银行结款
bank_loan_url = "https://openapi.qyyjt.cn/getDateInfo/financing/Bank_loan"
# 信托融资
trust_financing_url = "https://openapi.qyyjt.cn/getDateInfo/financing/Trust_financing"
# 租赁融资
leasing_financing_url = "https://openapi.qyyjt.cn/getDateInfo/financing/Leasing_financing"
# 其他融资
other_financing_url = "https://openapi.qyyjt.cn/getDateInfo/financing/Other_financing"
class YujingtongApiDataType:
公司股东信息列表 = "公司股东信息列表"
主体评级信息 = "主体评级信息"
被担保信息列表 = "被担保信息列表"
被对外信息列表 = "被对外信息列表"
裁判文书信息列表 = "裁判文书信息列表"
区域经济 = "区域经济"
区域财政 = "区域财政"
区域债务 = "区域债务"
创投融资 = "创投融资"
股票融资 = "股票融资"
银行借款 = "银行借款"
信托融资 = "信托融资"
租赁融资 = "租赁融资"
其他融资 = "其他融资"
class YujingtongApi:
"""
预警通接口
"""
def __init__(self, token: str, config: YujingtongConfig = YujingtongConfig, mongo_connect: MongoConnect = None,
mongo_db_name="预警通"):
self.config = config
self.token = token
self.mongo_connect = mongo_connect
self.mongo_db_name = mongo_db_name
self.rq = rq
def get_list(self, api_url, keyword, query_info={}):
"""
获取列表
"""
skip = 0
size = 20
item_list = []
# 避免查询太多次
max_time = 1
for i in range(max_time):
url = f"{api_url}?objs={keyword}&from={skip}&size={size}"
skip += size
data = self.get_data_core(url, query_info)
if data:
temp_list = data['data']
item_list.extend(temp_list)
if len(temp_list) < size:
break
else:
break
return item_list
def get_data(self, api_url, keyword, query_info={}):
url = f"{api_url}?objs={keyword}"
return self.get_data_core(url, query_info)
def get_data_core(self, full_url, query_info={}):
if self.mongo_connect:
item = self.mongo_connect.db[self.mongo_db_name].find_one({"url": full_url})
if item:
data_value = item['data']
print("缓存请求成功")
return JsDict(data_value)
headers = {'token': self.token}
res = self.rq.get(full_url + f"&token={self.token}", headers=headers)
data = res.json()
if data['returncode'] == 0:
data_value = data
if self.mongo_connect:
self.mongo_connect.db[self.mongo_db_name].insert_one({
"url": full_url,
"data": data_value,
"query_info": query_info,
"create_time": datetime.now(),
"update_time": datetime.now()
})
print("请求成功")
return JsDict(data_value)
else:
print(full_url)
print(error_code_dic[data['returncode']])
# raise Exception(error_code_dic[data['returncode']])
def get_company_shareholder(self, keyword) -> List[Schemas.ShareHolderItemInfo]:
"""
返回公司股东信息列表
"""
item_list = self.get_list(self.config.shareholder_info_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.公司股东信息列表})
return item_list
def get_main_rating(self, keyword) -> Schemas.MainRatingDataInfo:
"""
获取主体评级信息
"""
return self.get_data(self.config.main_rating_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.主体评级信息})
def get_guarantor_details(self, keyword) -> List[Schemas.GuarantorDetailsItemInfo]:
"""
返回被担保信息列表
"""
item_list = self.get_list(self.config.guarantor_details_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.被担保信息列表})
return item_list
def get_guarantee_details(self, keyword) -> List[Schemas.GuaranteeDetailsItemInfo]:
"""
返回被对外信息列表
"""
item_list = self.get_list(self.config.guarantee_details_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.被对外信息列表})
return item_list
def get_judgement_doc(self, keyword) -> List[Schemas.JudgementDocItemInfo]:
"""
获取裁判文书信息列表
"""
item_list = self.get_list(self.config.judgement_doc_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.裁判文书信息列表})
return item_list
def get_area_economic(self, area_id: int, year: int) -> Schemas.AreaEconomicInfo:
"""
区域经济
@param area_id:
@param year:
@return:
"""
full_url = f"{self.config.area_economic_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url,
{"区域编码": area_id, "年度": year, "数据类型": YujingtongApiDataType.区域经济})
return data
def get_area_finance(self, area_id: int, year: int) -> Schemas.AreaFinanceInfo:
"""
区域财政
@param area_id:
@param year:
@return:
"""
full_url = f"{self.config.area_finance_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url,
{"区域编码": area_id, "年度": year, "数据类型": YujingtongApiDataType.区域财政})
return data
def get_area_debt(self, area_id: int, year: int) -> Schemas.AreaDebtInfo:
"""
区域债务
@param area_id:
@param year:
@return:
"""
full_url = f"{self.config.area_debt_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url,
{"区域编码": area_id, "年度": year, "数据类型": YujingtongApiDataType.区域债务})
return data
def get_financing_PEVC_list(self, keyword) -> List[Schemas.FinancingPEVCItemInfo]:
"""
创投融资
@param keyword:
@return:
"""
item_list = self.get_list(self.config.financing_PEVC_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.创投融资})
return item_list
def get_share_financing_list(self, keyword) -> List[Schemas.ShareFinancingItemInfo]:
"""
股票融资
@param keyword:
@return:
"""
item_list = self.get_list(self.config.share_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.股票融资})
return item_list
def get_bank_loan_list(self, keyword) -> List[Schemas.BankLoanItemInfo]:
"""
银行借款
@param keyword:
@return:
"""
item_list = self.get_list(self.config.bank_loan_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.银行借款})
return item_list
def get_trust_financing_list(self, keyword) -> List[Schemas.TrustFinancingItemInfo]:
"""
信托融资
@param keyword:
@return:
"""
item_list = self.get_list(self.config.trust_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.信托融资})
return item_list
def get_leasing_financing_list(self, keyword) -> List[Schemas.LeasingFinancingItemInfo]:
"""
租赁融资
@param keyword:
@return:
"""
item_list = self.get_list(self.config.leasing_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.租赁融资})
return item_list
def get_other_financing_list(self, keyword) -> List[Schemas.OtherFinancingItemInfo]:
"""
其他融资
@param keyword:
@return:
"""
item_list = self.get_list(self.config.other_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.其他融资})
return item_list