dataplatform/APPData/Router/BusinessRouter.py

135 lines
4.7 KiB
Python
Raw Normal View History

2022-11-11 15:21:19 +08:00
import requests
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST, MG_DB_TYC
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.Business import get_tyc_data
from Utils.TycApi.CommonUtil import CommonUtils
from Utils.TycApi.Configure import DB_GSBJ
router = APIRouter(
tags=["工商信息"],
prefix="/api/business"
)
@router.get("/pull", summary="拉取数据")
def func01(name: str, db: Session = Depends(get_db)):
# 根据name查询企业是否存在
data = CompanyCrud.get_company_by_name(db=db, name=name)
if not data:
# 不能存在新建企业再返回cid
data = CompanyCrud.create_company_by_name(db=db, name=name)
cid = data.id
# 查询数据库是否有此条数据
search_data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
[]
)
if search_data:
return {"info": "Data already exists", "Datail": search_data}
# 拉取天眼查数据到mongodb
tyc_res = get_tyc_data(name)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_data(
DB_GSBJ,
'基本信息',
{"企业名称": name},
['基本信息']
)
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
record = tyc_data['基本信息']
data = dict()
data['企业ID'] = cid
data['企业名称'] = name
data['企业类型'] = record['companyOrgType']
data['经营状态'] = record['regStatus']
data['法定代表人'] = record['legalPersonName']
data['注册资本'] = record['regCapital']
data['实缴资本'] = record['actualCapital']
data['经营范围'] = record['businessScope']
data['社会统一信用代码'] = record['creditCode']
data['成立日期'] = CommonUtils.time_stamp(record['estiblishTime'])
data['注册地址'] = record['regLocation']
data['登记机关'] = record['regInstitute']
data['核准日期'] = CommonUtils.time_stamp(record['approvedTime'])
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业名称": name},
data
)
return {"info": "Pull succeeded"}
@router.post("/update", summary="更新数据")
def func02(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 拉取天眼查数据到mongodb
name = db_item.name
tyc_res = get_tyc_data(name)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_data(
DB_GSBJ,
'基本信息',
{"企业名称": name},
['基本信息']
)
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
record = tyc_data['基本信息']
data = dict()
data['企业ID'] = cid
data['企业名称'] = name
data['企业类型'] = record['companyOrgType']
data['经营状态'] = record['regStatus']
data['法定代表人'] = record['legalPersonName']
data['注册资本'] = record['regCapital']
data['实缴资本'] = record['actualCapital']
data['经营范围'] = record['businessScope']
data['社会统一信用代码'] = record['creditCode']
data['成立日期'] = CommonUtils.time_stamp(record['estiblishTime'])
data['注册地址'] = record['regLocation']
data['登记机关'] = record['regInstitute']
data['核准日期'] = CommonUtils.time_stamp(record['approvedTime'])
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业名称": name},
data
)
return {"info": "Update succeeded"}
@router.get("/view", summary="查看数据")
def func03(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
[]
)
return data