""" 接口数据获取类 """ import json import math from copy import deepcopy from datetime import datetime from pathlib import Path from typing import Dict, Any, Union, List import requests import requests as rq from utils.data_utils import JsDict from utils.mongo_utils import MongoConnect def make_query_url(url, params_dict: dict): str_list = [] for key, v in params_dict.items(): if v is not None and v != "": str_list.append(f"{key}={v}") add_content = "&".join(str_list) return f"{url}?{add_content}" error_code_dic = { 0: "请求成功", 300000: "无数据", 300001: "请求失败", 300002: "账号失效", 300003: "账号过期", 300004: "访问频率过快", 300005: "无权限访问此api", 300006: "余额不足", 300007: "剩余次数不足", 300008: "缺少必要参数", 300009: "账号信息有误", 300010: "URL不存在", 300011: "此IP无权限访问此api", 300012: "报告生成中", } api_config = { "司法风险": { "type": "dict", "url": "http://open.api.tianyancha.com/services/open/cb/judicial/2.0" }, "司法解析": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/jr/judicialCase/2.0" }, "资质证书": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/m/certificate/2.0" }, "历史经营异常": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/hi/abnormal/2.0" }, "双随机抽查": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/m/doubleRandomCheck/2.0" }, "双随机抽查详情": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/m/doubleRandomCheckDetail/2.0" }, "重要股东质押": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/stockPledge/shareholder/2.0" }, "重要股东质押详情": { "type": "dict", "url": "http://open.api.tianyancha.com/services/open/mr/stockPledge/shareholder/detail/2.0" }, "破产重整": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/jr/bankruptcy/2.0" }, "司法拍卖": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/judicialSale/3.0" }, "企业基本信息": { "type": "dict", "url": "https://open.api.tianyancha.com/services/open/ic/baseinfo/normal" }, "企业专利信息": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/ipr/patents/3.0" }, "知识产权出质": { "type": "list", "url": "http://open.api.tianyancha.com/services/v4/open/getPledgeReg" }, "税务评级": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/m/taxCredit/2.0" }, "进出口信用": { "type": "dict", "url": "http://open.api.tianyancha.com/services/open/m/importAndExport/2.0" }, "税收违法": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/taxContravention/2.0" }, "欠税公告": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/ownTax/2.0" }, "行政许可": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/m/getAdministrativeLicense/2.0" }, "行政处罚": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/punishmentInfo/3.0" }, "经营异常": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/abnormal/2.0" }, "严重违法": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/mr/illegalinfo/2.0" }, "终本案件": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/jr/endCase/2.0" }, "失信被执行人": { "type": "list", "url": "http://open.api.tianyancha.com/services/v4/open/human/dishonest" }, "被执行人": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/jr/zhixinginfo/2.0" }, "限制消费令": { "type": "list", "url": "http://open.api.tianyancha.com/services/open/jr/consumptionRestriction/2.0" }, "质押比例": { "type": "dict", "url": "http://open.api.tianyancha.com/services/open/mr/stockPledge/ratio/2.0" }, "企业三要素": { "type": "dict", "url": "http://open.api.tianyancha.com/services/open/ic/verify/2.0" } } class TianyanchaApi: """ 天眼查接口 """ def __init__(self, token: str, mongo_connect: MongoConnect = None, list_max_count=500, mongo_db_name="天眼查", api_config: Dict[str, Dict] = api_config ): self.config = api_config self.token = token self.rq = rq self.mongo_connect = mongo_connect self.mongo_db_name = mongo_db_name self.list_max_count = list_max_count def get_data_core(self, full_url, query_info={}): print(full_url) if self.mongo_connect: item = self.mongo_connect.db[self.mongo_db_name].find_one({"url": full_url}) if item: print('item', item) print("请求缓存成功") return JsDict(item['data']) headers = {'Authorization': self.token} res = self.rq.get(full_url, headers=headers) data = res.json() if data['error_code'] == 0: print("请求成功") if self.mongo_connect: self.mongo_connect.db[self.mongo_db_name].insert_one({ "url": full_url, "data": data['result'], "query_info": query_info, "create_time": datetime.now(), "update_time": datetime.now() }) return JsDict(data['result']) else: print(error_code_dic[data['error_code']]) # raise Exception(error_code_dic[data['error_code']]) def get(self, name, query: dict, query_info: dict = {}): total = 0 url_config = self.config.get(name) if not url_config: raise Exception(f"无 {name}") url = url_config['url'] full_url = make_query_url(url, query) if url_config["type"] == "list": data, total = self.get_list_core(full_url, {**query, **query_info}, url_config.get('list_max_count')) else: data = self.get_data_core(full_url, {**query, **query_info}) return data, total def get_total(self, name, query: dict, query_info: dict = {}): total = 0 url_config = self.config.get(name) if not url_config: raise Exception(f"无 {name}") url = url_config['url'] full_url = make_query_url(url, query) if url_config["type"] == "list": total = self.get_total_core(full_url, {**query, **query_info}) return int(total) def get_total_core(self, query_url, query_info={}): page_num = 1 page_size = 20 # 查询第一页的获取total url = f"{query_url}&pageNum={page_num}&pageSize={page_size}" result = self.get_data_core(url, query_info) if not result: return 0 total = result['total'] return total def get_list_core(self, query_url, query_info={}, list_max_count=None): page_num = 1 page_size = 20 item_list = [] # 查询第一页的获取total url = f"{query_url}&pageNum={page_num}&pageSize={page_size}" result = self.get_data_core(url, query_info) if not result: return [], 0 total = result['total'] print(f"共{total}条数据") max_count = list_max_count or self.list_max_count if max_count: total = min(result['total'], max_count) temp_list = [JsDict(item) for item in result['items']] item_list.extend(temp_list) if total > len(item_list): for page_num in range(2, math.ceil(total / page_size) + 1): url = f"{query_url}&pageNum={page_num}&pageSize={page_size}" result = self.get_data_core(url, query_info) or [] temp_list = [JsDict(item) for item in result['items']] item_list.extend(temp_list) return item_list, total class ApiItemConfig: url: str data_schema: Dict[str, Any] paging: bool method: str = 'GET' class Api: def __init__(self, source_config: Dict[str, ApiItemConfig], token: str): self.source_config = source_config self.token = token def get_list_data_total(self, name, params: Dict): pass def get_data(self, name, params: Dict) -> Union[List[Any], Dict[str, Any]]: source = self.source_config[name] full_url = self.make_url(source.url, method=source.method, params=params) return self.get_core(url=full_url) def get_core(self, url): data = requests.get(url, headers={'Authorization': self.token}).json() if data['error_code'] == 0: return JsDict(data['result']) else: print(error_code_dic[data['error_code']]) def make_url(self, url: str, method='GET', params: Dict = {}, page=None, page_size=None): if method == 'GET': params = deepcopy(params) url = requests.Request('GET', url, params=params).prepare().url return url