changes 接口重写

This commit is contained in:
P3ngSaM 2022-11-17 15:18:34 +08:00
parent 95975b037a
commit a8b4a4a4ff
27 changed files with 723 additions and 775 deletions

View File

@ -0,0 +1,23 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_balance_by_id(db: Session, mongodb, _id, reportdate, condition, content):
item = Model.Balance()
item._id = _id
item.reportdate = reportdate
obj_id = mongodb.upsert_data(dbname="WR_DataBase_v2_test", sheet="C6.1_资产负债表", condition=condition, data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def find_balance_by_id(db: Session, mongodb, _id):
db_item = db.query(Model.Balance).filter_by(_id=_id).all()
data = list()
for item in db_item:
insert = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C6.1_资产负债表", _id=item.obj_id)
data.append(insert)
return data

View File

@ -0,0 +1,25 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_business_by_id(db: Session, mongodb, _id, content):
item = Model.Business()
item._id = _id
obj_id = mongodb.insert_data(dbname="WR_DataBase_v2_test", sheet="C3.1_工商信息", data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def update_business_by_id(db: Session, mongodb, _id, content):
item = db.query(Model.Business).filter_by(_id=_id).first()
mongodb.update_data_by_id(dbname="WR_DataBase_v2_test", sheet="C3.1_工商信息", _id=item.obj_id, data=content)
return True
def find_business_by_id(db: Session, mongodb, _id):
item = db.query(Model.Business).filter_by(_id=_id).first()
data = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C3.1_工商信息", _id=item.obj_id)
return data

View File

@ -0,0 +1,23 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_cashflow_by_id(db: Session, mongodb, _id, reportdate, condition, content):
item = Model.CashFlow()
item._id = _id
item.reportdate = reportdate
obj_id = mongodb.upsert_data(dbname="WR_DataBase_v2_test", sheet="C6.3_现金流量表", condition=condition, data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def find_cashflow_by_id(db: Session, mongodb, _id):
db_item = db.query(Model.CashFlow).filter_by(_id=_id).all()
data = list()
for item in db_item:
insert = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C6.3_现金流量表", _id=item.obj_id)
data.append(insert)
return data

View File

@ -42,10 +42,13 @@ def get_company_total(db: Session, name, page: int = 0, pagesize: int = 10):
:param pagesize:
:return:
"""
total = db.query(Company).filter(or_(Company.name.like("%{}%".format("" if name is None else name)), name is None)).count()
detail = db.query(Company).filter(or_(Company.name.like("%{}%".format("" if name is None else name)), name is None)).offset(
total = db.query(Company).filter(
or_(Company.name.like("%{}%".format("" if name is None else name)), name is None)).count()
detail = db.query(Company).filter(
or_(Company.name.like("%{}%".format("" if name is None else name)), name is None)).offset(
(page - 1) * pagesize).limit(pagesize).all()
return {
"total": total,
"detail": detail
}

View File

@ -0,0 +1,27 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_dishonest_by_id(db: Session, mongodb, _id, content):
item = Model.Dishonest()
item._id = _id
obj_id = mongodb.insert_data(dbname="WR_DataBase_v2_test", sheet="C9.5_失信人", data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def update_dishonest(mongodb, serach, content):
mongodb.update_data(dbname="WR_DataBase_v2_test", sheet="C9.5_失信人", search=serach, field=content)
return True
def find_dishonest_by_id(db: Session, mongodb, _id):
db_item = db.query(Model.Dishonest).filter_by(_id=_id).all()
data = list()
for item in db_item:
insert = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C9.5_失信人", _id=item.obj_id)
data.append(insert)
return data

View File

@ -0,0 +1,23 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_income_by_id(db: Session, mongodb, _id, reportdate, condition, content):
item = Model.Income()
item._id = _id
item.reportdate = reportdate
obj_id = mongodb.upsert_data(dbname="WR_DataBase_v2_test", sheet="C6.2_利润表", condition=condition, data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def find_income_by_id(db: Session, mongodb, _id):
db_item = db.query(Model.Income).filter_by(_id=_id).all()
data = list()
for item in db_item:
insert = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C6.2_利润表", _id=item.obj_id)
data.append(insert)
return data

View File

@ -0,0 +1,28 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_lawsuit_by_id(db: Session, mongodb, _id, content):
item = Model.Lawsuit()
item._id = _id
obj_id = mongodb.insert_data(dbname="WR_DataBase_v2_test", sheet="C9.6_法律诉讼", data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def update_lawsuit_by_id(mongodb, _id, content):
mongodb.update_data(dbname="WR_DataBase_v2_test", sheet="C9.6_法律诉讼", search={"id": _id}, field=content)
return True
def find_lawsuit_by_id(db: Session, mongodb, _id):
db_item = db.query(Model.Lawsuit).filter_by(_id=_id).all()
data = list()
for item in db_item:
insert = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C9.6_法律诉讼", _id=item.obj_id)
data.append(insert)
return data

View File

@ -0,0 +1,25 @@
from sqlalchemy.orm import Session
from APPData.Model import CompanyModel as Model
def create_shareholder_by_id(db: Session, mongodb, _id, content):
item = Model.Shareholder()
item._id = _id
obj_id = mongodb.insert_data(dbname="WR_DataBase_v2_test", sheet="C3.2_股东信息", data=content)
item.obj_id = obj_id
db.add(item)
db.commit()
return True
def update_shareholder_by_id(db: Session, mongodb, _id, content):
item = db.query(Model.Shareholder).filter_by(_id=_id).first()
mongodb.update_data_by_id(dbname="WR_DataBase_v2_test", sheet="C3.2_股东信息", _id=item.obj_id, data=content)
return True
def find_shareholder_by_id(db: Session, mongodb, _id):
item = db.query(Model.Shareholder).filter_by(_id=_id).first()
data = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C3.2_股东信息", _id=item.obj_id)
return data

View File

@ -1,9 +1,126 @@
from sqlalchemy import Column, String
from Utils.DataBase.SqlAlchemyUtils import Base
from sqlalchemy import Column, String, ForeignKey, Integer, Date
from sqlalchemy.dialects.mysql import LONGTEXT
from sqlalchemy.orm import relationship
from Utils.DataBase.MongoHelperUtils import get_mongodb
from Utils.DataBase.SqlAlchemyUtils import Base, engine
# 企业数据
class Company(Base):
__tablename__ = "company"
__tablename__ = "data_company"
id = Column(String(11), primary_key=True)
name = Column(String(32), index=True)
business_data = relationship("Business", backref="data_company", uselist=False)
shareholder_data = relationship("Shareholder", backref="data_company", uselist=False)
lawsuit_data = relationship("Lawsuit", backref="data_company")
dishonest_data = relationship("Dishonest", backref="data_company")
balance_sheet_data = relationship("Balance", backref="data_company")
income_sheet_data = relationship("Income", backref="data_company")
cashflow_sheet_data = relationship("CashFlow", backref="data_company")
# 工商信息
class Business(Base):
__tablename__ = "data_bussiness"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
_id = Column(String(11), ForeignKey("data_company.id"))
def dict(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C3.1_工商信息", _id=self.obj_id)
return item.get("data")
# 股东信息
class Shareholder(Base):
__tablename__ = "data_shareholder"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
_id = Column(String(11), ForeignKey("data_company.id"))
def dict(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C3.2_股东信息", _id=self.obj_id)
return item.get("data")
# 法律诉讼
class Lawsuit(Base):
__tablename__ = "data_lawsuit"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
_id = Column(String(11), ForeignKey("data_company.id"))
def list(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C9.6_法律诉讼", _id=self.obj_id)
return item.get("data")
# 失信人
class Dishonest(Base):
__tablename__ = "data_dishonest"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
_id = Column(String(11), ForeignKey("data_company.id"))
def list(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C9.5_失信人", _id=self.obj_id)
return item.get("data")
# 资产负债表
class Balance(Base):
__tablename__ = "data_balance"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
reportdate = Column(Date)
_id = Column(String(11), ForeignKey("data_company.id"))
def list(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C6.1_资产负债表", _id=self.obj_id)
return item.get("data")
# 利润表
class Income(Base):
__tablename__ = "data_income"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
reportdate = Column(Date)
_id = Column(String(11), ForeignKey("data_company.id"))
def list(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C6.2_利润表", _id=self.obj_id)
return item.get("data")
# 现金流量表
class CashFlow(Base):
__tablename__ = "data_cashflow"
id = Column(Integer, primary_key=True, autoincrement=True)
obj_id = Column(String(24))
reportdate = Column(Date)
_id = Column(String(11), ForeignKey("data_company.id"))
def list(self):
mongodb = get_mongodb()
item = mongodb.find_data_by_id(dbname="WR_DataBase_v2_test", sheet="C6.3_现金流量表", _id=self.obj_id)
return item.get("data")
Base.metadata.create_all(bind=engine)

View File

@ -4,9 +4,10 @@ import pandas as pd
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST
from APPData.Crud import CompanyCrud, BalanceCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.CommonUtil import CommonUtils
router = APIRouter(
tags=["资产负债表"],
@ -32,6 +33,7 @@ async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends
for num in range(1, col):
finance_dict = dict()
finance_dict['企业ID'] = cid
finance_dict['更新时间'] = CommonUtils.get_current_time()
for row_num in range(row):
try:
if isinstance(data.iloc[row_num, 0], float):
@ -56,29 +58,20 @@ async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends
finance_dict[key] = None
except TypeError:
continue
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C6.1_资产负债表",
{"企业ID": cid, "截止日期": finance_dict['截止日期']},
finance_dict
)
mongodb = get_mongodb()
BalanceCrud.create_balance_by_id(db=db, mongodb=mongodb, _id=cid, reportdate=finance_dict['截止日期'],
condition={"企业ID": cid, "截止日期": finance_dict['截止日期']}, content=finance_dict)
mongodb.client.close()
return {"info": "Success"}
@router.post("/view", summary="查看数据")
def func02(cid: str, report_date: str, db: Session = Depends(get_db)):
def func02(cid: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C6.1_资产负债表",
{"企业ID": cid, "截止日期": report_date},
[]
)
data = BalanceCrud.find_balance_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -1,9 +1,9 @@
import requests
import requests as requests
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST, MG_DB_TYC
from APPData.Crud import CompanyCrud, BusinessCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb, get_tyc_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.Business import get_tyc_data
from Utils.TycApi.CommonUtil import CommonUtils
@ -17,6 +17,8 @@ router = APIRouter(
@router.get("/pull", summary="拉取数据")
def func01(name: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 根据name查询企业是否存在
data = CompanyCrud.get_company_by_name(db=db, name=name)
if not data:
@ -24,111 +26,87 @@ def func01(name: str, db: Session = Depends(get_db)):
data = CompanyCrud.create_company_by_name(db=db, name=name)
cid = data.id
# 查询数据库是否有此条数据
search_data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
[]
)
search_data = mongodb.find_data("WR_DataBase_v2_test", 'C3.1_工商信息', {"企业ID": cid}, [])
if search_data:
return {"info": "Data already exists", "Datail": search_data}
# 拉取天眼查数据到mongodb
tyc_res = get_tyc_data(name)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
raise HTTPException(status_code=202, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_data(
DB_GSBJ,
'基本信息',
{"企业名称": name},
['基本信息']
)
tyc_data = tyc_mongodb.find_data(DB_GSBJ, '基本信息', {"企业名称": name}, ['基本信息'])
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
record = tyc_data['基本信息']
raise HTTPException(status_code=202, detail='Data query failed')
record = tyc_data.get('基本信息', dict())
data = dict()
data['企业ID'] = cid
data['企业名称'] = name
data['企业类型'] = record['companyOrgType']
data['经营状态'] = record['regStatus']
data['法定代表人'] = record['legalPersonName']
data['注册资本'] = record['regCapital']
data['实缴资本'] = record['actualCapital']
data['经营范围'] = record['businessScope']
data['社会统一信用代码'] = record['creditCode']
data['企业类型'] = record.get('companyOrgType', None)
data['经营状态'] = record.get('regStatus', None)
data['法定代表人'] = record.get('legalPersonName', None)
data['注册资本'] = record.get('regCapital', None)
data['实缴资本'] = record.get('actualCapital', None)
data['经营范围'] = record.get('businessScope', None)
data['社会统一信用代码'] = record.get('creditCode', None)
data['成立日期'] = CommonUtils.time_stamp(record['estiblishTime'])
data['注册地址'] = record['regLocation']
data['登记机关'] = record['regInstitute']
data['注册地址'] = record.get('regLocation', None)
data['登记机关'] = record.get('regInstitute', None)
data['核准日期'] = CommonUtils.time_stamp(record['approvedTime'])
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业名称": name},
data
)
# 保存到数据到data_business
BusinessCrud.create_business_by_id(db=db, mongodb=mongodb, _id=cid, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Pull succeeded"}
@router.post("/update", summary="更新数据")
def func02(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 查询企业是否存在
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 拉取天眼查数据到mongodb
name = db_item.name
tyc_res = get_tyc_data(name)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_data(
DB_GSBJ,
'基本信息',
{"企业名称": name},
['基本信息']
)
tyc_data = tyc_mongodb.find_data(DB_GSBJ, '基本信息', {"企业名称": name}, ['基本信息'])
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
record = tyc_data['基本信息']
record = tyc_data.get('基本信息', dict())
data = dict()
data['企业ID'] = cid
data['企业名称'] = name
data['企业类型'] = record['companyOrgType']
data['经营状态'] = record['regStatus']
data['法定代表人'] = record['legalPersonName']
data['注册资本'] = record['regCapital']
data['实缴资本'] = record['actualCapital']
data['经营范围'] = record['businessScope']
data['社会统一信用代码'] = record['creditCode']
data['企业类型'] = record.get('companyOrgType', None)
data['经营状态'] = record.get('regStatus', None)
data['法定代表人'] = record.get('legalPersonName', None)
data['注册资本'] = record.get('regCapital', None)
data['实缴资本'] = record.get('actualCapital', None)
data['经营范围'] = record.get('businessScope', None)
data['社会统一信用代码'] = record.get('creditCode', None)
data['成立日期'] = CommonUtils.time_stamp(record['estiblishTime'])
data['注册地址'] = record['regLocation']
data['登记机关'] = record['regInstitute']
data['注册地址'] = record.get('regLocation', None)
data['登记机关'] = record.get('regInstitute', None)
data['核准日期'] = CommonUtils.time_stamp(record['approvedTime'])
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业名称": name},
data
)
# 保存到数据到data_business
BusinessCrud.update_business_by_id(db=db, mongodb=mongodb, _id=cid, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Update succeeded"}
@router.get("/view", summary="查看数据")
def func03(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
mongodb = get_mongodb()
# 查询企业是否存在
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
[]
)
raise HTTPException(status_code=404, detail="Company Not Found")
data = BusinessCrud.find_business_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -4,9 +4,10 @@ import pandas as pd
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST
from APPData.Crud import CompanyCrud, CashFlowCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.CommonUtil import CommonUtils
router = APIRouter(
tags=["现金流量表"],
@ -32,6 +33,7 @@ async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends
for num in range(1, col):
finance_dict = dict()
finance_dict['企业ID'] = cid
finance_dict['更新时间'] = CommonUtils.get_current_time()
for row_num in range(row):
try:
if isinstance(data.iloc[row_num, 0], float):
@ -57,28 +59,22 @@ async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends
except TypeError:
continue
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C6.3_现金流量表",
{"企业ID": cid, "截止日期": finance_dict['截止日期']},
finance_dict
)
mongodb = get_mongodb()
CashFlowCrud.create_cashflow_by_id(db=db, mongodb=mongodb, _id=cid, reportdate=finance_dict['截止日期'],
condition={"企业ID": cid, "截止日期": finance_dict['截止日期']}, content=finance_dict)
mongodb.client.close()
return {"info": "Success"}
@router.post("/view", summary="查看数据")
def func02(cid: str, report_date: str, db: Session = Depends(get_db)):
def func02(cid: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C6.3_现金流量表",
{"企业ID": cid, "截止日期": report_date},
[]
)
data = CashFlowCrud.find_cashflow_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -1,9 +1,8 @@
import requests
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST, MG_DB_TYC
from APPData.Crud import CompanyCrud, DishonestCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb, get_tyc_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.CommonUtil import CommonUtils
@ -18,6 +17,8 @@ router = APIRouter(
@router.get("/pull", summary="拉取数据")
def func01(name: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 根据name查询企业是否存在
data = CompanyCrud.get_company_by_name(db=db, name=name)
if not data:
@ -25,12 +26,7 @@ def func01(name: str, db: Session = Depends(get_db)):
data = CompanyCrud.create_company_by_name(db=db, name=name)
cid = data.id
# 查询数据库是否有此条数据
search_data = MG_DB_TEST.find_all_data(
"WR_DataBase_v2_test",
"C9.5_失信人",
{"企业ID": cid},
[]
)
search_data = mongodb.find_all_data("WR_DataBase_v2_test", 'C9.5_失信人', {"企业ID": cid}, [])
if search_data:
return {"info": "Data already exists", "Datail": search_data}
# 拉取天眼查数据到mongodb
@ -38,42 +34,34 @@ def func01(name: str, db: Session = Depends(get_db)):
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_column(
DB_SFFX,
'失信人',
{"企业名称": name},
'失信人'
)
tyc_data = tyc_mongodb.find_data(DB_SFFX, '失信人', {"企业名称": name}, ['失信人'])
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
try:
record = tyc_data['result']
for item in record:
data = dict()
data['企业ID'] = cid
data['案号'] = item['casecode']
data['发布日期'] = CommonUtils.time_stamp(item['publishdate'])
data['失信行为'] = item['disrupttypename']
data['履行情况'] = item['performance']
data['执行依据文号'] = item['gistid']
data['执行法院'] = item['courtname']
data['涉案金额(元)'] = item['duty']
data['立案日期'] = CommonUtils.time_stamp(item['regdate'])
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C9.5_失信人",
{"企业ID": cid, "执行依据文号": data['执行依据文号']},
data
)
except KeyError:
pass
record = tyc_data.get('失信人', dict())
record = record.get('result', list())
for item in record:
data = dict()
data['企业ID'] = cid
data['案号'] = item.get('casecode', None)
data['发布日期'] = CommonUtils.time_stamp(item.get('publishdate', None))
data['失信行为'] = item.get('disrupttypename', None)
data['履行情况'] = item.get('performance', None)
data['执行依据文号'] = item.get('gistid', None)
data['执行法院'] = item.get('courtname', None)
data['涉案金额(元)'] = item.get('duty', None)
data['立案日期'] = CommonUtils.time_stamp(item.get('regdate', None))
data['更新时间'] = CommonUtils.get_current_time()
# 保存数据
DishonestCrud.create_dishonest_by_id(db=db, mongodb=mongodb, _id=cid, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Pull succeeded"}
@router.post("/update", summary="更新数据")
def func02(cid: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
@ -85,52 +73,37 @@ def func02(cid: str, db: Session = Depends(get_db)):
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_column(
DB_SFFX,
'失信人',
{"企业名称": name},
'失信人'
)
tyc_data = tyc_mongodb.find_data(DB_SFFX, '失信人', {"企业名称": name}, ['失信人'])
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
try:
record = tyc_data['result']
for item in record:
data = dict()
data['企业ID'] = cid
data['案号'] = item['casecode']
data['发布日期'] = CommonUtils.time_stamp(item['publishdate'])
data['失信行为'] = item['disrupttypename']
data['履行情况'] = item['performance']
data['执行依据文号'] = item['gistid']
data['执行法院'] = item['courtname']
data['涉案金额(元)'] = item['duty']
data['立案日期'] = CommonUtils.time_stamp(item['regdate'])
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C9.5_失信人",
{"企业ID": cid, "执行依据文号": data['执行依据文号']},
data
)
except KeyError:
pass
record = tyc_data.get('失信人', dict())
record = record.get('result', list())
for item in record:
data = dict()
data['企业ID'] = cid
data['案号'] = item.get('casecode', None)
data['发布日期'] = CommonUtils.time_stamp(item.get('publishdate', None))
data['失信行为'] = item.get('disrupttypename', None)
data['履行情况'] = item.get('performance', None)
data['执行依据文号'] = item.get('gistid', None)
data['执行法院'] = item.get('courtname', None)
data['涉案金额(元)'] = item.get('duty', None)
data['立案日期'] = CommonUtils.time_stamp(item.get('regdate', None))
data['更新时间'] = CommonUtils.get_current_time()
# 保存数据
DishonestCrud.update_dishonest(mongodb=mongodb, serach={"企业ID": cid, "案号": data['案号']}, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Update succeeded"}
@router.get("/view", summary="查看数据")
def func03(cid: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_all_data(
"WR_DataBase_v2_test",
"C9.5_失信人",
{"企业ID": cid},
[]
)
data = DishonestCrud.find_dishonest_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -4,9 +4,10 @@ import pandas as pd
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST
from APPData.Crud import CompanyCrud, IncomeCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.CommonUtil import CommonUtils
router = APIRouter(
tags=["利润表"],
@ -32,6 +33,7 @@ async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends
for num in range(1, col):
finance_dict = dict()
finance_dict['企业ID'] = cid
finance_dict['更新时间'] = CommonUtils.get_current_time()
for row_num in range(row):
try:
if isinstance(data.iloc[row_num, 0], float):
@ -57,28 +59,21 @@ async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends
except TypeError:
continue
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C6.2_利润表",
{"企业ID": cid, "截止日期": finance_dict['截止日期']},
finance_dict
)
mongodb = get_mongodb()
IncomeCrud.create_income_by_id(db=db, mongodb=mongodb, _id=cid, reportdate=finance_dict['截止日期'],
condition={"企业ID": cid, "截止日期": finance_dict['截止日期']}, content=finance_dict)
mongodb.client.close()
return {"info": "Success"}
@router.post("/view", summary="查看数据")
def func02(cid: str, report_date: str, db: Session = Depends(get_db)):
def func02(cid: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C6.2_利润表",
{"企业ID": cid, "截止日期": report_date},
[]
)
data = IncomeCrud.find_income_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -2,8 +2,8 @@ import requests
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST, MG_DB_TYC
from APPData.Crud import CompanyCrud, LawsuitCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb, get_tyc_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.CommonUtil import CommonUtils
from Utils.TycApi.Configure import DB_SFFX
@ -17,6 +17,8 @@ router = APIRouter(
@router.get("/pull", summary="拉取数据")
def func01(name: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 根据name查询企业是否存在
data = CompanyCrud.get_company_by_name(db=db, name=name)
if not data:
@ -24,12 +26,7 @@ def func01(name: str, db: Session = Depends(get_db)):
data = CompanyCrud.create_company_by_name(db=db, name=name)
cid = data.id
# 查询数据库是否有此条数据
search_data = MG_DB_TEST.find_all_data(
"WR_DataBase_v2_test",
"C9.6_法律诉讼",
{"企业ID": cid},
[]
)
search_data = mongodb.find_data("WR_DataBase_v2_test", 'C9.6_法律诉讼', {"企业ID": cid}, [])
if search_data:
return {"info": "Data already exists", "Datail": search_data}
# 拉取天眼查数据到mongodb
@ -37,109 +34,87 @@ def func01(name: str, db: Session = Depends(get_db)):
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_column(
DB_SFFX,
'法律诉讼',
{"企业名称": name},
'法律诉讼'
)
tyc_data = tyc_mongodb.find_data(DB_SFFX, '法律诉讼', {"企业名称": name}, ['法律诉讼'])
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
try:
record = tyc_data['result']
for item in record:
data = dict()
data['企业ID'] = cid
data['id'] = item['id']
data['标题'] = item['title']
data['案由'] = item['caseReason']
data['案号'] = item['caseNo']
data['涉案方'] = list()
for person in item['casePersons']:
person_dict = dict()
person_dict['案件身份'] = person['role']
person_dict['名称'] = person['name']
person_dict['裁判结果'] = person['result']
data['涉案方'].append(person_dict)
data['发布日期'] = CommonUtils.time_stamp(item['submitTime'])
data['裁判日期'] = item['judgeTime']
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C9.6_法律诉讼",
{"企业ID": cid, "id": data['id']},
data
)
except KeyError:
pass
record = tyc_data.get('法律诉讼', dict())
record = record.get('result', list())
for item in record:
data = dict()
data['企业ID'] = cid
data['id'] = item.get('id', None)
data['标题'] = item.get('title', None)
data['案由'] = item.get('caseReason', None)
data['案号'] = item.get('caseNo', None)
data['涉案方'] = list()
persons = item.get('casePersons', list())
for person in persons:
person_dict = dict()
person_dict['案件身份'] = person.get('role')
person_dict['名称'] = person.get('name')
person_dict['裁判结果'] = person.get('result')
data['涉案方'].append(person_dict)
data['发布日期'] = CommonUtils.time_stamp(item.get('submitTime', None))
data['裁判日期'] = item.get('judgeTime', None)
data['更新时间'] = CommonUtils.get_current_time()
# 保存到数据到data_business
LawsuitCrud.create_lawsuit_by_id(db=db, mongodb=mongodb, _id=cid, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Pull succeeded"}
@router.post("/update", summary="更新数据")
def func02(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 查询企业是否存在
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 拉取天眼查数据到mongodb
name = db_item.name
# 拉取天眼查数据到mongodb
tyc_res = get_tyc_data(name)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
tyc_data = MG_DB_TYC.find_single_column(
DB_SFFX,
'法律诉讼',
{"企业名称": name},
'法律诉讼'
)
tyc_data = tyc_mongodb.find_data(DB_SFFX, '法律诉讼', {"企业名称": name}, ['法律诉讼'])
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
try:
record = tyc_data['result']
for item in record:
data = dict()
data['企业ID'] = cid
data['id'] = item['id']
data['标题'] = item['title']
data['案由'] = item['caseReason']
data['案号'] = item['caseNo']
data['涉案方'] = list()
for person in item['casePersons']:
person_dict = dict()
person_dict['案件身份'] = person['role']
person_dict['名称'] = person['name']
person_dict['裁判结果'] = person['result']
data['涉案方'].append(person_dict)
data['发布日期'] = CommonUtils.time_stamp(item['submitTime'])
data['裁判日期'] = item['judgeTime']
data['更新时间'] = CommonUtils.get_current_time()
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C9.6_法律诉讼",
{"企业ID": cid, "id": data['id']},
data
)
except KeyError:
pass
record = tyc_data.get('法律诉讼', dict())
record = record.get('result', list())
for item in record:
data = dict()
data['企业ID'] = cid
data['id'] = item.get('id', None)
data['标题'] = item.get('title', None)
data['案由'] = item.get('caseReason', None)
data['案号'] = item.get('caseNo', None)
data['涉案方'] = list()
persons = item.get('casePersons', list())
for person in persons:
person_dict = dict()
person_dict['案件身份'] = person.get('role')
person_dict['名称'] = person.get('name')
person_dict['裁判结果'] = person.get('result')
data['涉案方'].append(person_dict)
data['发布日期'] = CommonUtils.time_stamp(item.get('submitTime', None))
data['裁判日期'] = item.get('judgeTime', None)
data['更新时间'] = CommonUtils.get_current_time()
# 保存到数据到data_lawsuit
LawsuitCrud.update_lawsuit_by_id(mongodb=mongodb, _id=data['id'], content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Update succeeded"}
@router.get("/view", summary="查看数据")
def func03(cid: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
# 查询mysql数据库中的_id
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_all_data(
"WR_DataBase_v2_test",
"C9.6_法律诉讼",
{"企业ID": cid},
[]
)
data = LawsuitCrud.find_lawsuit_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -0,0 +1,49 @@
import datetime
import math
import pandas as pd
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from sqlalchemy.orm import Session
from Utils.DataBase.MongoHelperUtils import get_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.CommonUtil import CommonUtils
router = APIRouter(
tags=["宏观数据"],
prefix="/api/macro_data"
)
@router.post("/upload", summary="上传经济实力表")
async def func01(region: str, excel: UploadFile = File(...), db: Session = Depends(get_db)):
mongodb = get_mongodb()
contents = await excel.read()
data = pd.read_excel(contents, skiprows=1)
data = data.set_index('序号')
for num in data.index.values:
df_line = data.loc[num, data.columns.values].to_dict()
df_line['省/直辖市'] = region
for key, value in df_line.items():
try:
if math.isnan(value):
df_line[key] = None
except TypeError:
continue
mongodb.upsert_data("WR_DataBase_v2_test", 'C12.2_区域经济',
{"省/直辖市": region, "地区": df_line['地区'], "年份": df_line["年份"]}, df_line)
return {"info": "Success"}
if __name__ == '__main__':
mongodb = get_mongodb()
all_data = mongodb.find_all_data("WR_DataBase_v2_test", 'C12.2_区域经济', {"年份": 2021}, [])
for item in all_data:
for key, value in item.items():
try:
if math.isnan(value):
item[key] = None
except TypeError:
continue
mongodb.upsert_data("WR_DataBase_v2_test", 'C12.2_区域经济',
{"省/直辖市": item['省/直辖市'], "地区": item['地区'], "年份": item["年份"]}, item)

View File

@ -2,10 +2,9 @@ import requests
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from APPData.Crud import CompanyCrud
from Utils.DBHelper.MongoDBInstance import MG_DB_TEST, MG_DB_TYC
from APPData.Crud import CompanyCrud, ShareholderCrud
from Utils.DataBase.MongoHelperUtils import get_mongodb, get_tyc_mongodb
from Utils.DataBase.SqlAlchemyUtils import get_db
from Utils.TycApi.Business import get_tyc_data
from Utils.TycApi.CommonUtil import CommonUtils
from Utils.TycApi.Configure import DB_GSBJ
from Utils.TycApi.Shareholder import get_shareholer_data
@ -18,6 +17,8 @@ router = APIRouter(
@router.post("/pull", summary="拉取数据")
def func01(name: str, db: Session = Depends(get_db)):
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 根据name查询企业是否存在
data = CompanyCrud.get_company_by_name(db=db, name=name)
if not data:
@ -25,53 +26,31 @@ def func01(name: str, db: Session = Depends(get_db)):
data = CompanyCrud.create_company_by_name(db=db, name=name)
cid = data.id
# 查询数据库是否有此条数据
search_data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C3.2_股东信息",
{"企业ID": cid},
[]
)
search_data = mongodb.find_data("WR_DataBase_v2_test", 'C3.2_股东信息', {"企业ID": cid}, [])
if search_data:
return {"info": "Data already exists", "Datail": search_data}
# 获取企业类型,如果不存在则拉取工商信息
company_type = MG_DB_TEST.find_single_column(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
"企业类型"
)
company_type = mongodb.find_data("WR_DataBase_v2_test", 'C3.1_工商信息', {"企业ID": cid}, ["企业类型"])
if not company_type:
url = "http://127.0.0.1:8008/api/business/pull?name={}"
url = "http://127.0.0.1:8009/api/business/pull?name={}"
url = url.format(name)
# headers = {"token": token}
res = requests.get(url=url)
if res.status_code == 200:
company_type = MG_DB_TEST.find_single_column(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
"企业类型"
)
company_type = mongodb.find_data("WR_DataBase_v2_test", 'C3.1_工商信息', {"企业ID": cid}, ["企业类型"])
else:
raise HTTPException(status_code=401, detail="Failed to get company type")
raise HTTPException(status_code=202, detail="Failed to get company type")
company_type = company_type.get('企业类型', None)
tyc_res = get_shareholer_data(name, company_type)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
if '上市' in company_type:
tyc_data = MG_DB_TYC.find_single_column(
DB_GSBJ,
'十大股东',
{"企业名称": name},
'十大股东'
)
tyc_data = tyc_mongodb.find_data(DB_GSBJ, '十大股东', {"企业名称": name}, ["十大股东"])
tyc_data = tyc_data.get('十大股东', list())
else:
tyc_data = MG_DB_TYC.find_single_column(
DB_GSBJ,
'企业股东',
{"企业名称": name},
'企业股东'
)
tyc_data = tyc_mongodb.find_data(DB_GSBJ, '企业股东', {"企业名称": name}, ["企业股东"])
tyc_data = tyc_data.get('企业股东', dict())
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
data = dict()
@ -80,90 +59,74 @@ def func01(name: str, db: Session = Depends(get_db)):
data['股东列表'] = list()
data['更新时间'] = CommonUtils.get_current_time()
if '上市' in company_type:
try:
for item in tyc_data['holderList']:
shareholer = dict()
shareholer['股东ID'] = item['id']
shareholer['股东名称'] = item['name']
shareholer['股东类型'] = item['shareType']
shareholer['股东性质'] = None
shareholer['持股比例'] = item['proportion']
shareholer['持股数量'] = item['holdingNum']
shareholer['认缴出资日期'] = CommonUtils.time_stamp(item['publishDate'])
data['股东列表'].append(shareholer)
except KeyError:
pass
holder_list = tyc_data.get('holderList', list())
for item in holder_list:
shareholer = dict()
shareholer['股东ID'] = item.get('id', None)
shareholer['股东名称'] = item.get('name', None)
shareholer['股东类型'] = item.get('shareType', None)
shareholer['股东性质'] = None
shareholer['持股比例'] = item.get('proportion', None)
shareholer['持股数量'] = item.get('holdingNum', None)
shareholer['认缴出资日期'] = CommonUtils.time_stamp(item.get('publishDate', None))
data['股东列表'].append(shareholer)
else:
try:
for item in tyc_data['result']:
shareholer = dict()
shareholer['股东ID'] = item['id']
shareholer['股东名称'] = item['name']
shareholer['股东类型'] = item['type']
shareholer['股东性质'] = None
shareholer['持股比例'] = item['capital'][0]['percent']
shareholer['认缴出资额'] = item['capital'][0]['amomon']
shareholer['认缴出资日期'] = item['capital'][0]['time']
data['股东列表'].append(shareholer)
except KeyError:
pass
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.2_股东信息",
{"企业名称": name},
data
)
holder_list = tyc_data.get('result', list())
for item in holder_list:
shareholer = dict()
shareholer['股东ID'] = item.get('id', None)
shareholer['股东名称'] = item.get('name', None)
shareholer['股东类型'] = item.get('type', None)
shareholer['股东性质'] = None
capital_list = item.get('capital', list())
if capital_list:
shareholer['持股比例'] = capital_list[0].get('percent', None)
shareholer['认缴出资额'] = capital_list[0].get('amomon', None)
shareholer['认缴出资日期'] = capital_list[0].get('time', None)
else:
shareholer['持股比例'] = None
shareholer['认缴出资额'] = None
shareholer['认缴出资日期'] = None
data['股东列表'].append(shareholer)
# 保存到数据到data_business
ShareholderCrud.create_shareholder_by_id(db=db, mongodb=mongodb, _id=cid, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Pull succeeded"}
@router.post("/update", summary="更新数据")
def func02(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
mongodb = get_mongodb()
tyc_mongodb = get_tyc_mongodb()
# 查询企业是否存在
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
# 拉取天眼查数据到mongodb
name = db_item.name
# 获取企业类型,如果不存在则拉取工商信息
company_type = MG_DB_TEST.find_single_column(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
"企业类型"
)
company_type = mongodb.find_data("WR_DataBase_v2_test", 'C3.1_工商信息', {"企业ID": cid}, ["企业类型"])
if not company_type:
url = "http://127.0.0.1:8008/api/business/pull?name={}"
url = "http://127.0.0.1:8009/api/business/pull?name={}"
url = url.format(name)
# headers = {"token": token}
res = requests.get(url=url)
if res.status_code == 200:
company_type = MG_DB_TEST.find_single_column(
"WR_DataBase_v2_test",
"C3.1_工商信息",
{"企业ID": cid},
"企业类型"
)
company_type = mongodb.find_data("WR_DataBase_v2_test", 'C3.1_工商信息', {"企业ID": cid}, ["企业类型"])
else:
raise HTTPException(status_code=401, detail="Failed to get company type")
company_type = company_type.get('企业类型', None)
tyc_res = get_shareholer_data(name, company_type)
if tyc_res != 'success':
raise HTTPException(status_code=401, detail=tyc_res)
# 查询天眼查数据库
if '上市' in company_type:
tyc_data = MG_DB_TYC.find_single_column(
DB_GSBJ,
'十大股东',
{"企业名称": name},
'十大股东'
)
tyc_data = tyc_mongodb.find_data(DB_GSBJ, '十大股东', {"企业名称": name}, ["十大股东"])
tyc_data = tyc_data.get('十大股东', list())
else:
tyc_data = MG_DB_TYC.find_single_column(
DB_GSBJ,
'企业股东',
{"企业名称": name},
'企业股东'
)
tyc_data = tyc_mongodb.find_data(DB_GSBJ, '企业股东', {"企业名称": name}, ["企业股东"])
tyc_data = tyc_data.get('企业股东', dict())
if not tyc_data:
raise HTTPException(status_code=401, detail='Data query failed')
data = dict()
@ -172,55 +135,48 @@ def func02(cid: str, db: Session = Depends(get_db)):
data['股东列表'] = list()
data['更新时间'] = CommonUtils.get_current_time()
if '上市' in company_type:
try:
for item in tyc_data['holderList']:
shareholer = dict()
shareholer['股东ID'] = item['id']
shareholer['股东名称'] = item['name']
shareholer['股东类型'] = item['shareType']
shareholer['股东性质'] = None
shareholer['持股比例'] = item['proportion']
shareholer['持股数量'] = item['holdingNum']
shareholer['认缴出资日期'] = CommonUtils.time_stamp(item['publishDate'])
data['股东列表'].append(shareholer)
except KeyError:
pass
holder_list = tyc_data.get('holderList', list())
for item in holder_list:
shareholer = dict()
shareholer['股东ID'] = item.get('id', None)
shareholer['股东名称'] = item.get('name', None)
shareholer['股东类型'] = item.get('shareType', None)
shareholer['股东性质'] = None
shareholer['持股比例'] = item.get('proportion', None)
shareholer['持股数量'] = item.get('holdingNum', None)
shareholer['认缴出资日期'] = CommonUtils.time_stamp(item.get('publishDate', None))
data['股东列表'].append(shareholer)
else:
try:
for item in tyc_data['result']:
shareholer = dict()
shareholer['股东ID'] = item['id']
shareholer['股东名称'] = item['name']
shareholer['股东类型'] = item['type']
shareholer['股东性质'] = None
shareholer['持股比例'] = item['capital'][0]['percent']
shareholer['认缴出资额'] = item['capital'][0]['amomon']
shareholer['认缴出资日期'] = item['capital'][0]['time']
data['股东列表'].append(shareholer)
except KeyError:
pass
# 保存天眼查数据到企业数据库
MG_DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.2_股东信息",
{"企业名称": name},
data
)
holder_list = tyc_data.get('result', list())
for item in holder_list:
shareholer = dict()
shareholer['股东ID'] = item.get('id', None)
shareholer['股东名称'] = item.get('name', None)
shareholer['股东类型'] = item.get('type', None)
shareholer['股东性质'] = None
capital_list = item.get('capital', list())
if capital_list:
shareholer['持股比例'] = capital_list[0].get('percent', None)
shareholer['认缴出资额'] = capital_list[0].get('amomon', None)
shareholer['认缴出资日期'] = capital_list[0].get('time', None)
else:
shareholer['持股比例'] = None
shareholer['认缴出资额'] = None
shareholer['认缴出资日期'] = None
data['股东列表'].append(shareholer)
# 保存到数据到data_business
ShareholderCrud.create_shareholder_by_id(db=db, mongodb=mongodb, _id=cid, content=data)
mongodb.client.close()
tyc_mongodb.client.close()
return {"info": "Update succeeded"}
@router.get("/view", summary="查看数据")
def func03(cid: str, db: Session = Depends(get_db)):
# 查询mysql数据库中的_id
mongodb = get_mongodb()
db_item = CompanyCrud.get_company_by_id(db, _id=cid)
if not db_item:
raise HTTPException(status_code=204, detail="Company Not Found")
raise HTTPException(status_code=404, detail="Company Not Found")
# 根据_id查询mongodb数据库
data = MG_DB_TEST.find_single_data(
"WR_DataBase_v2_test",
"C3.2_股东信息",
{"企业ID": cid},
[]
)
data = ShareholderCrud.find_shareholder_by_id(db=db, mongodb=mongodb, _id=cid)
return data

View File

@ -1,12 +0,0 @@
{
"MongoDB": {
"log": "root:sromitdTW569kC#M@116.63.130.34:27018",
"tyc": "root:gP@DwMSVd5Sh6EiH@116.63.130.34:27019"
},
"Mysql": {
},
"Redis": {
}
}

View File

@ -1,6 +0,0 @@
from Utils.DBHelper.MongoHelper import MongoHelper
MG_DB_TEST = MongoHelper("log")
MG_DB_TYC = MongoHelper("tyc")

View File

@ -1,333 +0,0 @@
import re
import os
import json
import gridfs
import pymongo
from urllib import parse
from bson import ObjectId
from gridfs import GridFS
class MongoHelper:
def __init__(self, param):
"""
param:
typestr
desc: 选择连接哪个MongoDB数据库
"""
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
db_configs = json.load(f)
this_mongo_cfg = db_configs['MongoDB'][param]
m = re.match('([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg)
parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)),
m.group(3))
self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config))
def find_single_column(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据的某个指定字段值
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: str 一个指定查询字段
return:
type: None or dict
desc: 查询结果为空返回None; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
column = {**{'_id': False}, **{param4: 1}}
record = list(collection.find(param3, column))
return None if record == [] else record[0][param4]
def find_single_data(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns))
return False if record == [] else record[0]
def find_single_data_id(self, param1, param2, param3, param4):
"""
查询符合条件的第一条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**dict(zip(param4, [1] * len(param4)))}
if columns:
record = list(collection.find(param3, columns))
else:
record = list(collection.find(param3))
return False if record == [] else record[0]
def find_single_data_with_single_sort(self, param1, param2, param3, param4, param5):
"""
查询符合条件的第一条数据,按单个排序条件返回
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(1))
return False if record == [] else record[0]
def find_all_data(self, param1, param2, param3, param4):
"""
查询符合条件的所有数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
return:
type: list
desc: 查询结果
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns))
return record
def find_all_data_with_count(self, param1, param2, param3):
"""
查询所有符合条件的数据并返回统计数量
param1: str 数据库
param2: str 数据表
param3: str 查询条件
return: int 符合条件的数据数量
"""
collection = self.client[param1][param2]
# num = collection.find(param3).count()
num = collection.estimated_document_count(param3)
return num
def find_all_data_with_single_sort(self, param1, param2, param3, param4, param5):
"""
查询符合条件的数据,按单个排序条件返回
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: list 查询字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
return:
type: bool or dict
desc: 查询结果为空返回False; 查询结果正常返回查询结果的第一条数据;
"""
collection = self.client[param1][param2]
columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))}
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]))
return False if record == [] else record
def find_data_with_aggregate(self, param1, param2, param3):
"""
根据聚合条件查询
param1: str 数据库
param2: str 数据集
param3:
type: list
desc: 聚合条件
demo: [{'$match':{'price':{'$gte':50}}}, {'$group': {'_id': "$fName", 'count': {'$sum': 1}}}]
"""
collection = self.client[param1][param2]
data = list(collection.aggregate(param3))
return data
def find_data_by_page_with_sort(self, param1, param2, param3, param4, param5, param6, param7):
"""
根据聚合翻页查询,且按照需求字段排序返回
param1: str 数据库
param2: str 数据集
param3: dict 查询条件
param4: list 显示字段
param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回
param6: int page_size 每页数据条数
param7: int page_no 当前页码
"""
collection = self.client[param1][param2]
columns = {**dict(zip(param4, [1] * len(param4)))}
page_size = int(param6)
page_no = int(param7)
skip_num = page_size * (page_no - 1)
if columns:
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(
page_size).skip(skip_num))
else:
record = list(collection.find(param3).sort(list(param5.keys())[0], list(param5.values())[0]).limit(
page_size).skip(skip_num))
return record
def insert_single_data(self, param1, param2, param3):
"""
插入一条数据
param1: str 数据库
param2: str 数据集
param3: obj 插入数据
return: None
"""
collection = self.client[param1][param2]
collection.insert_one(param3)
def insert_many_data(self, param1, param2, param3):
"""
插入多条数据
param1: str 数据库
param2: str 数据集
param3: obj 插入数据
return: None
"""
collection = self.client[param1][param2]
collection.insert_many(param3)
def upsert_single_data(self, param1, param2, param3, param4):
"""
插入单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$set": param4}, upsert=True)
def update_single_data(self, param1, param2, param3, param4):
"""
更新单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return: None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$set": param4})
def update_single_data_in_array(self, param1, param2, param3, param4, param5):
"""
更新数组中的单条数据
param1: str 数据库
param2: str 数据表
param3: dict 一级字段查询条件
param4: dict 二级字段查询条件
param5: dict 更新或新插入的数据
return: None
"""
collection = self.client[param1][param2]
try:
param4_key = list(param4.keys())[0]
param4_value = list(param4.values())[0]
search = {**param3, param4_key: {"$elemMatch": param4_value}}
condition = collection.find(search).count()
if condition:
for item in param5.items():
collection.update_one(search, {"$set": {"{}.$.{}".format(param4_key, item[0]): item[1]}})
else:
param6 = {list(param4.keys())[0]: param5}
collection.update_one(param3, {"$addToSet": param6})
except Exception:
pass
def delete_single_data_in_array(self, param1, param2, param3, param4):
"""
删除数组中的单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$pull": param4})
return True
def add_single_data_in_array(self, param1, param2, param3, param4):
"""
新增数组中的单条数据
param1: str 数据库
param2: str 数据表
param3: dict 查询条件
param4: dict 更新或新插入的数据
return:
None
"""
collection = self.client[param1][param2]
collection.update_one(param3, {"$addToSet": param4})
def delete_single_data(self, param1, param2, param3):
"""
根据查询条件删除一条文档
param1: str 数据库
param2: str 数据集
param3: obj 查询条件
return: None
"""
collection = self.client[param1][param2]
collection.delete_one(param3)
return True
def find_file(self, param1, param2, param3):
"""
读取一个文件
param1: str 数据库
param2: str 存储桶
param3: str 文件id
return:
type: binary?
desc: 二进制文件流
"""
try:
# 实例化一个文件存储器
gfs = GridFS(self.client[param1], collection=param2)
# 二进制读取文件
data_stream = gfs.get(ObjectId(param3)).read()
# 返回文件二进制流
return data_stream
except gridfs.errors.NoFile:
return False
def delete_file(self, param1, param2, param3):
"""
根据id删除文件
param1: str 数据库
param2: str 存储桶
param3: str 文件fid
"""
fs = GridFS(self.client[param1], param2)
fs.delete(ObjectId(param3))
def find_database_all_gather(self, param):
"""
查询数据库中的所有集合
param 数据库
"""
if param:
dbs = self.client[param].list_collection_names(session=None)
return dbs
else:
dbs = self.client.database_names()
return dbs

View File

@ -0,0 +1,9 @@
{
"Mysql": {
"wr_model_store": "mysql+pymysql://root:123456@localhost/wr_model_store?charset=utf8mb4"
},
"MongoDB": {
"test": "root:sromitdTW569kC#M@116.63.130.34:27018",
"tyc": "root:gP@DwMSVd5Sh6EiH@116.63.130.34:27019"
}
}

View File

@ -0,0 +1,77 @@
import re
import os
import json
import pymongo
from urllib import parse
from bson import ObjectId
class MongoHelper:
def __init__(self, param):
"""
param:
typestr
desc: 选择连接哪个MongoDB数据库
"""
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
db_configs = json.load(f)
this_mongo_cfg = db_configs['MongoDB'][param]
m = re.match(r'([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg)
parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)),
m.group(3))
self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config))
def insert_data(self, dbname: str, sheet: str, data: dict):
collection = self.client[dbname][sheet]
item = collection.insert_one(data)
return item.inserted_id.__str__()
def upsert_data(self, dbname: str, sheet: str, condition: dict, data: dict):
collection = self.client[dbname][sheet]
collection.update_one(condition, {"$set": data}, upsert=True)
_id = collection.find_one(condition).get('_id', None)
return _id.__str__()
def delete_data_by_id(self, dbname: str, sheet: str, _id: str):
collection = self.client[dbname][sheet]
collection.delete_one({'_id': ObjectId(_id)})
return True
def find_data_by_id(self, dbname: str, sheet: str, _id: str):
collection = self.client[dbname][sheet]
return collection.find_one({'_id': ObjectId(_id)}, {"_id": False})
def update_data_by_id(self, dbname: str, sheet: str, _id: str, data: dict):
collection = self.client[dbname][sheet]
collection.update_one({'_id': ObjectId(_id)}, {"$set": data})
return True
def find_data(self, dbname: str, sheet: str, search: dict, field: list):
collection = self.client[dbname][sheet]
columns = {**{'_id': False}, **dict(zip(field, [1] * len(field)))}
record = list(collection.find(search, columns))
return False if record == [] else record[0]
def find_all_data(self, dbname: str, sheet: str, search: dict, field: list):
collection = self.client[dbname][sheet]
columns = {**{'_id': False}, **dict(zip(field, [1] * len(field)))}
record = list(collection.find(search, columns))
return False if record == [] else record
def update_data(self, dbname: str, sheet: str, search: dict, field: dict):
collection = self.client[dbname][sheet]
collection.update_one(search, {"$set": field})
return True
def get_mongodb():
db = MongoHelper("test")
return db
def get_tyc_mongodb():
db = MongoHelper("tyc")
return db

View File

@ -1,6 +1,6 @@
import time
from Utils.DBHelper.MongoDBInstance import MG_DB_TYC
from Utils.DataBase.MongoHelperUtils import get_tyc_mongodb
token = "32737167-cb63-4ce9-9397-d66169488f51"
headers = {'Authorization': token}
@ -58,12 +58,13 @@ def conserve_data(db, name, key, value):
保存天眼查api接口数据
传入数据库名称企业名称集合名称需要保存的数据
"""
tyc_mongodb = get_tyc_mongodb()
insert_data = dict()
insert_data['企业名称'] = name
insert_data['录入时间'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
insert_data[key] = value
# 数据库操作
MG_DB_TYC.upsert_single_data(
tyc_mongodb.upsert_data(
db,
key,
{"企业名称": name},

View File

@ -3,7 +3,7 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from APPData.Router import CompanyRouter, BusinessRouter, ShareholderRouter, LawsuitRouter, DishonestRouter, \
BalanceSheetRouter, IncomeSheetRouter, CashFlowStatementRouter
BalanceSheetRouter, IncomeSheetRouter, CashFlowStatementRouter, MacroDataRouter
app = FastAPI(
title="数据平台",
@ -27,6 +27,7 @@ app.include_router(DishonestRouter.router)
app.include_router(BalanceSheetRouter.router)
app.include_router(IncomeSheetRouter.router)
app.include_router(CashFlowStatementRouter.router)
app.include_router(MacroDataRouter.router)
if __name__ == "__main__":

View File

@ -10,4 +10,6 @@ casbin~=1.17.1
cryptography
pycryptodome
pymongo~=4.3.2
PyYAML~=6.0
PyYAML~=6.0
pandas~=1.5.1
openpyxl