from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from APPData.Crud import CompanyCrud, DishonestCrud from Utils.AccessControl.AccessUtil import rbac from Utils.DataBase.MongoHelperUtils import get_mongodb, get_tyc_mongodb from Utils.DataBase.SqlAlchemyUtils import get_db from Utils.TycApi.CommonUtil import CommonUtils from Utils.TycApi.Configure import DB_SFFX from Utils.TycApi.Dishonest import get_tyc_data router = APIRouter( tags=["失信人"], prefix="/api/wide_data/dishonest", dependencies=[Depends(rbac)] ) @router.get("/pull", summary="拉取数据") def func01(name: str, db: Session = Depends(get_db)): mongodb = get_mongodb() tyc_mongodb = get_tyc_mongodb() # 根据name查询企业是否存在 data = CompanyCrud.get_company_by_name(db=db, name=name) if not data: # 不能存在新建企业再返回cid data = CompanyCrud.create_company_by_name(db=db, name=name) cid = data.id # 查询数据库是否有此条数据 search_data = mongodb.find_all_data("WR_DataBase", 'C9.5_失信人', {"企业ID": cid}, []) if search_data: return {"info": "Data already exists", "Datail": search_data} # 拉取天眼查数据到mongodb tyc_res = get_tyc_data(name) if tyc_res != 'success': raise HTTPException(status_code=401, detail=tyc_res) # 查询天眼查数据库 tyc_data = tyc_mongodb.find_data(DB_SFFX, '失信人', {"企业名称": name}, ['失信人']) if not tyc_data: raise HTTPException(status_code=401, detail='Data query failed') record = tyc_data.get('失信人', dict()) record = record.get('result', list()) for item in record: data = dict() data['企业ID'] = cid data['案号'] = item.get('casecode', None) data['发布日期'] = CommonUtils.time_stamp(item.get('publishdate', None)) data['失信行为'] = item.get('disrupttypename', None) data['履行情况'] = item.get('performance', None) data['执行依据文号'] = item.get('gistid', None) data['执行法院'] = item.get('courtname', None) data['涉案金额(元)'] = item.get('duty', None) data['立案日期'] = CommonUtils.time_stamp(item.get('regdate', None)) data['更新时间'] = CommonUtils.get_current_time() # 保存数据 DishonestCrud.create_dishonest_by_id(db=db, mongodb=mongodb, _id=cid, content=data) mongodb.client.close() tyc_mongodb.client.close() return {"info": "Pull succeeded"} @router.post("/update", summary="更新数据") def func02(cid: str, db: Session = Depends(get_db)): mongodb = get_mongodb() tyc_mongodb = get_tyc_mongodb() # 查询mysql数据库中的_id db_item = CompanyCrud.get_company_by_id(db, _id=cid) if not db_item: raise HTTPException(status_code=204, detail="Company Not Found") # 拉取天眼查数据到mongodb name = db_item.name # 拉取天眼查数据到mongodb tyc_res = get_tyc_data(name) if tyc_res != 'success': raise HTTPException(status_code=401, detail=tyc_res) # 查询天眼查数据库 tyc_data = tyc_mongodb.find_data(DB_SFFX, '失信人', {"企业名称": name}, ['失信人']) if not tyc_data: raise HTTPException(status_code=401, detail='Data query failed') record = tyc_data.get('失信人', dict()) record = record.get('result', list()) for item in record: data = dict() data['企业ID'] = cid data['案号'] = item.get('casecode', None) data['发布日期'] = CommonUtils.time_stamp(item.get('publishdate', None)) data['失信行为'] = item.get('disrupttypename', None) data['履行情况'] = item.get('performance', None) data['执行依据文号'] = item.get('gistid', None) data['执行法院'] = item.get('courtname', None) data['涉案金额(元)'] = item.get('duty', None) data['立案日期'] = CommonUtils.time_stamp(item.get('regdate', None)) data['更新时间'] = CommonUtils.get_current_time() # 保存数据 DishonestCrud.update_dishonest(mongodb=mongodb, serach={"企业ID": cid, "案号": data['案号']}, content=data) mongodb.client.close() tyc_mongodb.client.close() return {"info": "Update succeeded"} @router.get("/view", summary="查看数据") def func03(cid: str, db: Session = Depends(get_db)): mongodb = get_mongodb() # 查询mysql数据库中的_id db_item = CompanyCrud.get_company_by_id(db, _id=cid) if not db_item: raise HTTPException(status_code=204, detail="Company Not Found") # 根据_id查询mongodb数据库 data = DishonestCrud.find_dishonest_by_id(db=db, mongodb=mongodb, _id=cid) return data