urban-investment-research/ThirdPartyApi/Yujingtong/Apis.py

127 lines
4.1 KiB
Python

from typing import List
from . import Schemas
from Utils.DataUtils import JsDict
class YujingtongConfig:
# 公司股东信息获取接口
shareholder_info_url = "http://open.api.tianyancha.com/services/open/ic/holder/2.0"
# 主体评级信息获取接口
main_rating_url = "https://openapi.qyyjt.cn/getDateInfo/credit/mainrating"
# 被担保明细接口
guarantor_details_url = "https://openapi.qyyjt.cn/getDateInfo/business_info/guarantordetails"
# 对外担保明细
guarantee_details_url = "https://openapi.qyyjt.cn/getDateInfo/business_info/guaranteedetails"
# 裁判文书
judgement_doc_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/judgement/doc"
# 区域经济
area_economic_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/area/economic"
# 区域财政
area_finance_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/area/finance"
# 区域债务
area_debt_url = "https://openapi.qyyjt.cn/getDateInfo/dpd/area/debt"
class YujingtongApi:
"""
预警通接口
"""
def __init__(self, token: str, config: YujingtongConfig = YujingtongConfig):
self.config = config
self.token = token
def get_list(self, api_url, keyword):
"""
获取列表
"""
headers = {'Authorization': self.token}
skip = 0
size = 20
item_list = []
# 避免查询太多次
max_time = 4
for i in range(max_time):
url = f"{api_url}?objs={keyword}&from={skip}&size={size}"
skip += size
res = self.rq.get(url, headers=headers)
data = res.json()
if data['returncode'] == 0:
temp_list = [JsDict(item) for item in data['data']['data']]
item_list.extend(temp_list)
if len(temp_list) < size:
break
else:
break
return item_list
def get_data(self, api_url, keyword):
url = f"{api_url}?objs={keyword}"
return self.get_data_core(url)
def get_data_core(self, full_url):
headers = {'Authorization': self.token}
res = self.rq.get(full_url, headers=headers)
data = res.json()
if data['returncode'] == 0:
main_rating_data = data['data']['data']
return JsDict(main_rating_data)
def get_company_share_holder(self, keyword) -> List[Schemas.ShareHolderItemInfo]:
"""
返回公司股东信息列表
"""
item_list = self.get_list(self.config.shareholder_info_url, keyword)
return item_list
def get_main_rating(self, keyword) -> Schemas.MainRatingDataInfo:
"""
获取主体评级信息
"""
return self.get_data(self.config.main_rating_url, keyword)
def get_guarantor_details(self, keyword) -> List[Schemas.GuarantorDetailsItemInfo]:
"""
返回被担保信息列表
"""
item_list = self.get_list(self.config.guarantor_details_url, keyword)
return item_list
def get_guarantee_details(self, keyword) -> List[Schemas.GuaranteeDetailsItemInfo]:
"""
返回被对外信息列表
"""
item_list = self.get_list(self.config.guarantee_details_url, keyword)
return item_list
def get_judgement_doc(self, keyword) -> List[Schemas.JudgementDocItemInfo]:
"""
获取裁判文书信息列表
"""
item_list = self.get_list(self.config.judgement_doc_url, keyword)
return item_list
def get_area_economic(self, area_id: int, year: int) -> Schemas.AreaEconomicInfo:
full_url = f"{self.config.area_economic_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url)
return data
def get_area_finance(self, area_id: int, year: int) -> Schemas.AreaFinanceInfo:
full_url = f"{self.config.area_finance_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url)
return data
def get_area_debt(self, area_id: int, year: int) -> Schemas.AreaDebtInfo:
full_url = f"{self.config.area_debt_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url)
return data