api-datamanager/Company/Basic/Executives/ExecutivesImpl.py

280 lines
9.3 KiB
Python

from Company.Basic.Business.BusinessImpl import BusinessImpl
from Company.Basic.Executives.ExecutivesObj import ExecutiveList, ExecutiveObj
from Company.CompanyUtils import CompanyUtils
from DBHelper.MongoHelperInstance import DB_TYC, DB_TEST
from Utils.CommonUtil import CommonUtils
class ExecutivesImpl(object):
"""高管信息"""
@staticmethod
def auto_update(**kwargs):
"""新增/更新(自动)"""
condition = kwargs['condition']
def save_data(**args):
"""查询保存数据"""
def search_cid(name):
"""根据高管名称查询cid"""
cid_record = DB_TEST.find_single_column(
'WR_DataBase_v2_test',
'C3.1_工商信息',
{'企业名称': name},
'企业名称'
)
if cid_record:
return cid_record
else:
return None
company_name = args['cname']
company_id = args['cid']
records = DB_TYC.find_single_column(
'公司背景',
'主要人员',
{'企业名称': company_name},
'主要人员'
)
if records:
records = records['result']
data = dict()
list_data = list()
for record in records:
dict_data = dict()
dict_data['人物ID'] = search_cid(record['name'])
dict_data['姓名'] = record['name']
dict_data['职务'] = record['typeJoin']
list_data.append(dict_data)
data['企业ID'] = company_id
data['高管列表'] = list_data
data['更新时间'] = CommonUtils.get_current_time()
return data
else:
return False
if len(condition['cid']):
cid = condition['cid']
cname = DB_TEST.find_single_column(
'WR_DataBase_v2_test',
'C3.1_工商信息',
{'企业ID': cid},
'企业名称'
)
update = save_data(cname=cname, cid=cid)
if update:
DB_TEST.update_single_data(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业ID": cid},
update
)
return 'success'
else:
return 'update failed'
elif len(condition['cname']):
cname = condition['cname']
# 查询数据是否存在
cid = DB_TEST.find_single_column(
'WR_DataBase_v2_test',
'C3.1_工商信息',
{'企业名称': cname},
'企业ID'
)
if not cid:
res = BusinessImpl.auto_update(condition={"cname": cname, "cid": ""})
if res == 'success':
cid = DB_TEST.find_single_column(
'WR_DataBase_v2_test',
'C3.1_工商信息',
{'企业名称': cname},
'企业ID'
)
else:
return 'new failed'
insert = save_data(cname=cname, cid=cid)
if insert:
DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业名称": cname},
insert
)
return 'success'
else:
return 'new failed'
else:
return 'parameter exception'
@staticmethod
def manu_update(**kwargs):
"""新增/更新(手动)"""
cid = kwargs['cid']
cname = kwargs['cname']
template = kwargs['template']
data = CompanyUtils.excel_sheet_parser(file=template)
if len(cid):
if data:
for item in data:
name = item['姓名']
executive = ExecutiveList()
executive.pid = None
executive.name = item['姓名']
executive.duties = list()
executive.duties.append(item['职务'])
update_data = executive.fields_toggle()
DB_TEST.update_single_data_in_array(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业ID": cid},
{"高管列表": {"姓名": name}},
update_data
)
time_update = {"更新时间": CommonUtils.get_current_time()}
DB_TEST.update_single_data(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业ID": cid},
time_update
)
return 'success'
else:
return 'update failed'
elif len(cname):
cid = DB_TEST.find_single_column(
'WR_DataBase_v2_test',
'C3.1_工商信息',
{'企业名称': cname},
'企业ID'
)
if not cid:
res = BusinessImpl.auto_update(condition={"cname": cname, "cid": ""})
if res == 'success':
cid = DB_TEST.find_single_column(
'WR_DataBase_v2_test',
'C3.1_工商信息',
{'企业名称': cname},
'企业ID'
)
else:
return 'new failed'
obj = ExecutiveObj()
obj.cid = cid
obj.cname = cname
obj.executive_list = list()
obj.update_time = CommonUtils.get_current_time()
if data:
for item in data:
executive = ExecutiveList()
executive.pid = None
executive.name = item['姓名']
executive.duties = list()
executive.duties.append(item['职务'])
obj.executive_list.append(executive)
DB_TEST.upsert_single_data(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业名称": cname},
obj.fields_toggle()
)
return 'success'
else:
return 'parameter exception'
@staticmethod
def delete(**kwargs):
"""删除"""
param = kwargs['param']
cid = param['cid']
executive = param['executive']
res = DB_TEST.delete_single_data_in_array(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业ID": cid},
{"高管列表": {"姓名": executive}}
)
if res:
result = {
"info": "success",
}
else:
result = {
"info": "no result"
}
return result
@staticmethod
def edit(**kwargs):
"""编辑"""
param = kwargs['param']
cid = param['cid']
body = param['body']
name = param['executive']
try:
executive = ExecutiveList()
executive.pid = body['人物ID']
executive.name = body['姓名']
executive.duties = body['职务']
update_data = executive.fields_toggle()
DB_TEST.update_single_data_in_array(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业ID": cid},
{"高管列表": {"姓名": name}},
update_data
)
time_update = {"更新时间": CommonUtils.get_current_time()}
DB_TEST.update_single_data(
"WR_DataBase_v2_test",
"C3.3_高管信息",
{"企业ID": cid},
time_update
)
return 'success'
except Exception:
return 'edit failed'
@staticmethod
def view(**kwargs):
"""查看"""
query = kwargs['query']
cid = query['cid']
page_size = int(query['page_size'])
page_num = page_size * (int(query['page_num']) - 1)
res = DB_TEST.find_single_data(
'WR_DataBase_v2_test',
'C3.3_高管信息',
{"企业ID": cid},
['高管列表', '更新时间']
)
total = len(res['高管列表'])
update_time = res['更新时间']
record = DB_TEST.find_data_with_aggregate(
"WR_DataBase_v2_test",
"C3.3_高管信息",
[
{"$match": {"企业ID": cid}},
{"$project": {'_id': 0, '高管列表': {"$slice": ["${}".format('高管列表'), page_num, page_size]}}}
]
)
if record:
result = {
"info": "success",
"result": {
"total": total,
"update_time": update_time,
"records": record[0]['高管列表']
}
}
else:
result = {
"info": "no result"
}
return result