三方api接口缓存加上mongo查询

This commit is contained in:
wcq 2023-03-31 11:15:24 +08:00
parent 94ffbffb1a
commit 98329a5ff0
6 changed files with 116 additions and 71 deletions

View File

@ -91,8 +91,7 @@ def yujingtong_api_to_company_shareholder_list(res: List[YujingtongSchemas.Share
new_item.company_id = company_id
new_item.name = item.ShareholderName
new_item.type = item.ShareholderType
# todo 单位换算
new_item.amount = item.ShouldCapi
new_item.amount = item.ShouldCapi/10000
new_item.number = item.ShareholdeCode
new_item.percent = float(item.ShareholdingRatio)
new_item.reg_date = date_value_to_date(item.ContributionTime)

View File

@ -56,8 +56,8 @@ def company_data_load(db: Session, company_name, require_info: CompanyPlusProper
db.add(new_company_control_item)
# 公司股东信息录入
company_share_holder_res = yujingtong_api.get_company_share_holder(company_name)
new_company_shareholder_item_list = yujingtong_api_to_company_shareholder_list(company_share_holder_res,
company_shareholder_res = yujingtong_api.get_company_shareholder(company_name)
new_company_shareholder_item_list = yujingtong_api_to_company_shareholder_list(company_shareholder_res,
company_id)
db.add_all(new_company_shareholder_item_list)

View File

@ -106,16 +106,16 @@ def auto_create_company(db: Session, req: Schemas.AutoCreateCompanyReq):
# area=area_item.area_id if area_item else None
)
db.add(company_plus_property)
add_company_share_holder_from_api(db, company_id, company_name)
add_company_shareholder_from_api(db, company_id, company_name)
add_company_control_from_api(db, company_id, company_name)
db.commit()
def add_company_share_holder_from_api(db: Session, company_id, company_name):
def add_company_shareholder_from_api(db: Session, company_id, company_name):
"""
从接口导入股东信息
"""
holder_list = yujingtong_api.get_company_share_holder(company_name)
holder_list = yujingtong_api.get_company_shareholder(company_name)
for holder in holder_list:
if not db.query(CompanyShareholder).filter(CompanyShareholder.company_id == company_id,
CompanyShareholder.name == holder.ShareholderName,

View File

@ -11,7 +11,7 @@ class CompanyShareholder(common_db.Base):
company_id = Column(Integer, comment="企业ID")
name = Column(String(255), comment="股东名称")
type = Column(String(50), comment="股东类型")
amount = Column(Double, comment="认缴金额")
amount = Column(Double, comment="认缴金额(万)")
reg_date = Column(Date, comment="认缴时间")
percent = Column(Double, comment="持股比例")
number = Column(Double, comment="持股数量")

View File

@ -1,3 +1,4 @@
from datetime import datetime
from typing import List
import requests as rq
from . import Schemas
@ -31,13 +32,22 @@ error_code_dic = {
}
class TianyanchaApiDataType:
工商信息 = "工商信息"
股东信息列表 = "股东信息列表"
公司实控人列表 = "公司实控人列表"
法律诉讼信息列表 = "法律诉讼信息列表"
舆情信息列表 = "舆情信息列表"
组合工商信息 = "组合工商信息"
class TianyanchaConfig:
# 公司工商信息接口
company_base_info_url = "http://open.api.tianyancha.com/services/open/ic/baseinfo/normal"
# 公司搜索接口
search_url = "http://open.api.tianyancha.com/services/open/search/2.0"
# 公司股东信息获取接口
company_share_holder_url = "http://open.api.tianyancha.com/services/open/ic/holder/2.0"
company_shareholder_url = "http://open.api.tianyancha.com/services/open/ic/holder/2.0"
# 实控人
actual_control_url = "http://open.api.tianyancha.com/services/open/ic/actualControl/3.0"
# 法律诉讼
@ -63,62 +73,63 @@ class TianyanchaApi:
self.mongo_connect = mongo_connect
self.mongo_db_name = mongo_db_name
def get_data(self, api_url, keyword):
def get_data(self, api_url, keyword, query_info={}):
url = f"{api_url}?keyword={keyword}"
return self.get_data_core(url)
return self.get_data_core(url, query_info)
def get_data_core(self, full_url):
def get_data_core(self, full_url, query_info={}):
if self.mongo_connect:
item = self.mongo_connect.db[self.mongo_db_name].find_one({"url": full_url})
return JsDict(item['data'])
if item:
print("请求缓存成功")
return JsDict(item['data'])
headers = {'Authorization': self.token}
res = self.rq.get(full_url, headers=headers)
data = res.json()
if data['error_code'] == 0:
print("请求成功")
if self.mongo_connect:
self.mongo_connect.db[self.mongo_db_name].insert_one({
"url": full_url,
"data": data['result']
"data": data['result'],
"query_info": query_info,
"create_time": datetime.now(),
"update_time": datetime.now()
})
return JsDict(data['result'])
else:
raise Exception(error_code_dic[data['error_code']])
def get_list(self, api_url, keyword):
def get_list(self, api_url, keyword, query_info={}):
"""
获取列表
"""
query_url = f"{api_url}?keyword={keyword}"
return self.get_list_core(query_url)
return self.get_list_core(query_url, query_info)
def get_list_core(self, query_url):
def get_list_core(self, query_url, query_info={}):
headers = {'Authorization': self.token}
page_num = 1
page_size = 20
item_list = []
# 查询第一页的获取total
url = f"{query_url}&pageNum={page_num}&pageSize={page_size}"
res = self.rq.get(url, headers=headers)
data = res.json()
if data['error_code'] == 0:
total = data['result']['total']
temp_list = [JsDict(item) for item in data['result']['items']]
item_list.extend(temp_list)
# todo:是否获取所有列表
# if total > len(item_list):
# for i in range(1, (total - len(item_list)) // page_size + 1):
# url = f"{api_url}?keyword={keyword}&pageNum={page_num}&pageSize={page_size}"
# res = self.rq.get(url, headers=headers)
# data = res.json()
# if data['error_code'] == 0:
# if data['result']['total']:
# temp_list = [JsDict(item) for item in data['result']['items']]
# item_list.extend(temp_list)
#
result = self.get_data_core(url, query_info)
total = min(result['total'], 50)
temp_list = [JsDict(item) for item in result['items']]
item_list.extend(temp_list)
# todo:是否获取所有列表
# if total > len(item_list):
# for i in range(1, (total - len(item_list)) // page_size + 1):
# url = f"{query_url}&pageNum={page_num}&pageSize={page_size}"
# result = self.get_data_core(url)
# temp_list = [JsDict(item) for item in result['items']]
# item_list.extend(temp_list)
return item_list
def get_company_base_info(self, keyword) -> Schemas.CompanyBaseInfoRes:
return self.get_data(self.config.company_base_info_url, keyword)
return self.get_data(self.config.company_base_info_url, keyword,
query_info={"公司名称": keyword, "数据类型": TianyanchaApiDataType.工商信息})
def company_name_search(self, keyword) -> List[str]:
"""
@ -135,31 +146,28 @@ class TianyanchaApi:
print(data)
return company_names
def get_company_share_holder(self, keyword) -> List[Schemas.CompanyShareHolderItemInfo]:
def get_company_shareholder(self, keyword) -> List[Schemas.CompanyShareHolderItemInfo]:
"""
返回公司股东信息列表
"""
item_list = self.get_list(self.config.company_share_holder_url, keyword)
item_list = self.get_list(self.config.company_shareholder_url, keyword,
query_info={"公司名称": keyword, "数据类型": TianyanchaApiDataType.股东信息列表})
return item_list
def get_actual_control(self, keyword) -> List[Schemas.ActualControlItemInfo]:
"""
查询公司实控人多个结果则为疑似
"""
url = f"{self.config.company_share_holder_url}?keyword={keyword}"
headers = {'Authorization': self.token}
res = self.rq.get(url, headers=headers)
data = res.json()
item_list = []
if data['error_code'] == 0:
item_list.extend([JsDict(item) for item in data['result']["actualControllerList"]])
item_list = self.get_list(self.config.actual_control_url, keyword,
{"公司名称": keyword, "数据类型": TianyanchaApiDataType.公司实控人列表})
return item_list
def get_law_suit(self, keyword) -> List[Schemas.LawSuitItemInfo]:
"""
查询公司法律诉讼信息列表
"""
item_list = self.get_list(self.config.law_suit_url, keyword)
item_list = self.get_list(self.config.law_suit_url, keyword,
{"公司名称": keyword, "数据类型": TianyanchaApiDataType.法律诉讼信息列表})
return item_list
def get_news(self, name, start_time=None, end_time=None, tags=[]) -> List[Schemas.CompanyNewsItemInfo]:
@ -173,7 +181,9 @@ class TianyanchaApi:
"""
query_url = make_query_url({"name": name, "startTime": start_time, "endTime": end_time, "tags": ",".join(tags)})
full_query_url = f"{self.config.news_url}?{query_url}"
item_list = self.get_list_core(full_query_url)
item_list = self.get_list_core(full_query_url,
{"公司名称": name, "数据类型": TianyanchaApiDataType.舆情信息列表, "start_time": start_time,
"end_time": end_time, "tags": tags})
return item_list
def get_full_base_info(self, keyword) -> Schemas.FullBaseInfo:
@ -182,4 +192,4 @@ class TianyanchaApi:
@param keyword:
@return:
"""
return self.get_data(self.config.full_base_info_url, keyword)
return self.get_data(self.config.full_base_info_url, keyword, {"公司名称": keyword, "数据类型": "组合工商信息"})

View File

@ -1,3 +1,4 @@
from datetime import datetime
from typing import List
import requests as rq
from . import Schemas
@ -74,6 +75,23 @@ class YujingtongConfig:
other_financing_url = "https://openapi.qyyjt.cn/getDateInfo/financing/Other_financing"
class YujingtongApiDataType:
公司股东信息列表 = "公司股东信息列表"
主体评级信息 = "主体评级信息"
被担保信息列表 = "被担保信息列表"
被对外信息列表 = "被对外信息列表"
裁判文书信息列表 = "裁判文书信息列表"
区域经济 = "区域经济"
区域财政 = "区域财政"
区域债务 = "区域债务"
创投融资 = "创投融资"
股票融资 = "股票融资"
银行借款 = "银行借款"
信托融资 = "信托融资"
租赁融资 = "租赁融资"
其他融资 = "其他融资"
class YujingtongApi:
"""
预警通接口
@ -87,7 +105,7 @@ class YujingtongApi:
self.mongo_db_name = mongo_db_name
self.rq = rq
def get_list(self, api_url, keyword):
def get_list(self, api_url, keyword, query_info={}):
"""
获取列表
"""
@ -99,7 +117,7 @@ class YujingtongApi:
for i in range(max_time):
url = f"{api_url}?objs={keyword}&from={skip}&size={size}"
skip += size
data = self.get_data_core(url)
data = self.get_data_core(url, query_info)
if data:
temp_list = data['data']
item_list.extend(temp_list)
@ -109,63 +127,72 @@ class YujingtongApi:
break
return item_list
def get_data(self, api_url, keyword):
def get_data(self, api_url, keyword, query_info={}):
url = f"{api_url}?objs={keyword}"
return self.get_data_core(url)
return self.get_data_core(url, query_info)
def get_data_core(self, full_url):
def get_data_core(self, full_url, query_info={}):
if self.mongo_connect:
item = self.mongo_connect.db[self.mongo_db_name].find_one({"url": full_url})
if item:
data_value = item['data']
print("缓存请求成功")
return JsDict(data_value)
headers = {'token': self.token}
res = self.rq.get(full_url + f"&token={self.token}", headers=headers)
data = res.json()
if data['returncode'] == 0:
data_value = data['data']
data_value = data
if self.mongo_connect:
self.mongo_connect.db[self.mongo_db_name].insert_one({
"url": full_url,
"data": data_value
"data": data_value,
"query_info": query_info,
"create_time": datetime.now(),
"update_time": datetime.now()
})
print("请求成功")
return JsDict(data_value)
else:
# print(full_url,error_code_dic[data['returncode']])
raise Exception(error_code_dic[data['returncode']])
def get_company_share_holder(self, keyword) -> List[Schemas.ShareHolderItemInfo]:
def get_company_shareholder(self, keyword) -> List[Schemas.ShareHolderItemInfo]:
"""
返回公司股东信息列表
"""
item_list = self.get_list(self.config.shareholder_info_url, keyword)
item_list = self.get_list(self.config.shareholder_info_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.公司股东信息列表})
return item_list
def get_main_rating(self, keyword) -> Schemas.MainRatingDataInfo:
"""
获取主体评级信息
"""
return self.get_data(self.config.main_rating_url, keyword)
return self.get_data(self.config.main_rating_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.主体评级信息})
def get_guarantor_details(self, keyword) -> List[Schemas.GuarantorDetailsItemInfo]:
"""
返回被担保信息列表
"""
item_list = self.get_list(self.config.guarantor_details_url, keyword)
item_list = self.get_list(self.config.guarantor_details_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.被担保信息列表})
return item_list
def get_guarantee_details(self, keyword) -> List[Schemas.GuaranteeDetailsItemInfo]:
"""
返回被对外信息列表
"""
item_list = self.get_list(self.config.guarantee_details_url, keyword)
item_list = self.get_list(self.config.guarantee_details_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.被对外信息列表})
return item_list
def get_judgement_doc(self, keyword) -> List[Schemas.JudgementDocItemInfo]:
"""
获取裁判文书信息列表
"""
item_list = self.get_list(self.config.judgement_doc_url, keyword)
item_list = self.get_list(self.config.judgement_doc_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.裁判文书信息列表})
return item_list
def get_area_economic(self, area_id: int, year: int) -> Schemas.AreaEconomicInfo:
@ -176,7 +203,8 @@ class YujingtongApi:
@return:
"""
full_url = f"{self.config.area_economic_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url)
data = self.get_data_core(full_url,
{"区域编码": area_id, "年度": year, "数据类型": YujingtongApiDataType.区域经济})
return data
def get_area_finance(self, area_id: int, year: int) -> Schemas.AreaFinanceInfo:
@ -187,7 +215,8 @@ class YujingtongApi:
@return:
"""
full_url = f"{self.config.area_finance_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url)
data = self.get_data_core(full_url,
{"区域编码": area_id, "年度": year, "数据类型": YujingtongApiDataType.区域财政})
return data
def get_area_debt(self, area_id: int, year: int) -> Schemas.AreaDebtInfo:
@ -198,7 +227,8 @@ class YujingtongApi:
@return:
"""
full_url = f"{self.config.area_debt_url}?objs={area_id}&year={year}"
data = self.get_data_core(full_url)
data = self.get_data_core(full_url,
{"区域编码": area_id, "年度": year, "数据类型": YujingtongApiDataType.区域债务})
return data
def get_financing_PEVC_list(self, keyword) -> List[Schemas.FinancingPEVCItemInfo]:
@ -207,7 +237,8 @@ class YujingtongApi:
@param keyword:
@return:
"""
item_list = self.get_list(self.config.financing_PEVC_url, keyword)
item_list = self.get_list(self.config.financing_PEVC_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.创投融资})
return item_list
def get_share_financing_list(self, keyword) -> List[Schemas.ShareFinancingItemInfo]:
@ -216,16 +247,18 @@ class YujingtongApi:
@param keyword:
@return:
"""
item_list = self.get_list(self.config.share_financing_url, keyword)
item_list = self.get_list(self.config.share_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.股票融资})
return item_list
def get_bank_loan_list(self, keyword) -> List[Schemas.BankLoanItemInfo]:
"""
银行
银行
@param keyword:
@return:
"""
item_list = self.get_list(self.config.bank_loan_url, keyword)
item_list = self.get_list(self.config.bank_loan_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.银行借款})
return item_list
def get_trust_financing_list(self, keyword) -> List[Schemas.TrustFinancingItemInfo]:
@ -234,7 +267,8 @@ class YujingtongApi:
@param keyword:
@return:
"""
item_list = self.get_list(self.config.trust_financing_url, keyword)
item_list = self.get_list(self.config.trust_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.信托融资})
return item_list
def get_leasing_financing_list(self, keyword) -> List[Schemas.LeasingFinancingItemInfo]:
@ -243,7 +277,8 @@ class YujingtongApi:
@param keyword:
@return:
"""
item_list = self.get_list(self.config.leasing_financing_url, keyword)
item_list = self.get_list(self.config.leasing_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.租赁融资})
return item_list
def get_other_financing_list(self, keyword) -> List[Schemas.OtherFinancingItemInfo]:
@ -252,5 +287,6 @@ class YujingtongApi:
@param keyword:
@return:
"""
item_list = self.get_list(self.config.other_financing_url, keyword)
item_list = self.get_list(self.config.other_financing_url, keyword,
{"公司名称": keyword, "数据类型": YujingtongApiDataType.其他融资})
return item_list