304 lines
11 KiB
Python
304 lines
11 KiB
Python
from bson import ObjectId
|
|
from datetime import datetime
|
|
from Company.Basic.Business.BusinessImpl import BusinessImpl
|
|
from Company.CompanyUtils import CompanyUtils
|
|
from Company.Ip.Patent.PatentObj import PatentObj
|
|
from DBHelper.MongoHelperInstance import DB_TYC, DB_TEST
|
|
from Utils.CommonUtil import CommonUtils
|
|
|
|
|
|
class PatentImpl(object):
|
|
"""专利信息"""
|
|
|
|
@staticmethod
|
|
def verify_time(**kwargs):
|
|
"""校验更新时间"""
|
|
|
|
def calculate_jet_lag(begin_time):
|
|
"""计算时差"""
|
|
end_time = CommonUtils.get_current_time()
|
|
time_1_struct = datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S")
|
|
time_2_struct = datetime.strptime(begin_time, "%Y-%m-%d %H:%M:%S")
|
|
seconds = (time_1_struct - time_2_struct).seconds
|
|
if seconds > 60:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
condition = kwargs['condition']
|
|
start_time = DB_TEST.find_all_data(
|
|
'WR_DataBase_v2_test',
|
|
'C5.2_专利信息',
|
|
condition,
|
|
['更新时间']
|
|
)
|
|
if start_time:
|
|
res = calculate_jet_lag(start_time[-1]['更新时间'])
|
|
else:
|
|
res = True
|
|
return res
|
|
|
|
def auto_update(self, **kwargs):
|
|
"""新增/更新(自动)"""
|
|
condition = kwargs['condition']
|
|
|
|
def save_data(**args):
|
|
"""查询保存数据"""
|
|
company_name = args['cname']
|
|
company_id = args['cid']
|
|
records = DB_TYC.find_all_data(
|
|
'知识产权',
|
|
'专利信息',
|
|
{'企业名称': company_name},
|
|
['专利信息']
|
|
)
|
|
if records:
|
|
list_data = list()
|
|
for record in records:
|
|
records_item = record['专利信息']['result']
|
|
for record_item in records_item:
|
|
dict_data = dict()
|
|
dict_data['企业ID'] = company_id
|
|
dict_data['申请日'] = record_item['applicationTime']
|
|
dict_data['专利名称'] = record_item['patentName']
|
|
dict_data['专利类型'] = record_item['patentType']
|
|
dict_data['专利状态'] = record_item['patentStatus']
|
|
dict_data['申请号'] = record_item['appnumber']
|
|
dict_data['公开号'] = record_item['pubnumber']
|
|
dict_data['公开日'] = record_item['grantDate']
|
|
dict_data['发明人'] = record_item['inventor']
|
|
dict_data['更新时间'] = CommonUtils.get_current_time()
|
|
list_data.append(dict_data)
|
|
return list_data
|
|
else:
|
|
return False
|
|
|
|
if len(condition['cid']):
|
|
cid = condition['cid']
|
|
cname = DB_TEST.find_single_column(
|
|
'WR_DataBase_v2_test',
|
|
'C3.1_工商信息',
|
|
{'企业ID': cid},
|
|
'企业名称'
|
|
)
|
|
# 判断更新时间
|
|
verify_res = self.verify_time(condition={"企业ID": cid})
|
|
if not verify_res:
|
|
return 'Please try again in a minute'
|
|
update = save_data(cname=cname, cid=cid)
|
|
if update:
|
|
DB_TEST.insert_many_data(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
update
|
|
)
|
|
return 'success'
|
|
else:
|
|
return 'update failed'
|
|
elif len(condition['cname']):
|
|
cname = condition['cname']
|
|
cid = DB_TEST.find_single_column(
|
|
'WR_DataBase_v2_test',
|
|
'C3.1_工商信息',
|
|
{'企业名称': cname},
|
|
'企业ID'
|
|
)
|
|
if not cid:
|
|
res = BusinessImpl.auto_update(condition={"cname": cname, "cid": ""})
|
|
if res == 'success':
|
|
cid = DB_TEST.find_single_column(
|
|
'WR_DataBase_v2_test',
|
|
'C3.1_工商信息',
|
|
{'企业名称': cname},
|
|
'企业ID'
|
|
)
|
|
else:
|
|
return 'new failed'
|
|
# 判断更新时间
|
|
verify_res = self.verify_time(condition={"企业ID": cid})
|
|
if not verify_res:
|
|
return 'Please try again in a minute'
|
|
insert = save_data(cname=cname, cid=cid)
|
|
if insert:
|
|
DB_TEST.insert_many_data(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
insert
|
|
)
|
|
return 'success'
|
|
else:
|
|
return 'new failed'
|
|
else:
|
|
return 'parameter exception'
|
|
|
|
def manu_update(self, **kwargs):
|
|
"""新增/更新(手动)"""
|
|
cid = kwargs['cid']
|
|
cname = kwargs['cname']
|
|
template = kwargs['template']
|
|
data = CompanyUtils.excel_sheet_parser(file=template)
|
|
if len(cid):
|
|
verify_res = self.verify_time(condition={"企业ID": cid})
|
|
if not verify_res:
|
|
return 'Please try again in a minute'
|
|
if data:
|
|
data_list = list()
|
|
for item in data:
|
|
patent = PatentObj()
|
|
patent.cid = cid
|
|
patent.application_id = item['申请号']
|
|
patent.application_date = CommonUtils.excel_time_stamp(item['申请日'])
|
|
patent.patent_name = item['专利名称']
|
|
patent.patent_type = item['专利类型']
|
|
patent.patent_status = item['专利状态']
|
|
patent.public_id = item['公开号']
|
|
patent.public_date = CommonUtils.excel_time_stamp(item['公开日'])
|
|
patent.inventor = item['发明人']
|
|
patent.update_time = CommonUtils.get_current_time()
|
|
data_list.append(patent.fields_toggle())
|
|
DB_TEST.insert_many_data(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
data_list
|
|
)
|
|
return 'success'
|
|
else:
|
|
return 'update failed'
|
|
elif len(cname):
|
|
cid = DB_TEST.find_single_column(
|
|
'WR_DataBase_v2_test',
|
|
'C3.1_工商信息',
|
|
{'企业名称': cname},
|
|
'企业ID'
|
|
)
|
|
if not cid:
|
|
res = BusinessImpl.auto_update(condition={"cname": cname, "cid": ""})
|
|
if res == 'success':
|
|
cid = DB_TEST.find_single_column(
|
|
'WR_DataBase_v2_test',
|
|
'C3.1_工商信息',
|
|
{'企业名称': cname},
|
|
'企业ID'
|
|
)
|
|
else:
|
|
return 'new failed'
|
|
verify_res = self.verify_time(condition={"企业ID": cid})
|
|
if not verify_res:
|
|
return 'Please try again in a minute'
|
|
if data:
|
|
data_list = list()
|
|
for item in data:
|
|
patent = PatentObj()
|
|
patent.cid = cid
|
|
patent.application_id = item['申请号']
|
|
patent.application_date = CommonUtils.excel_time_stamp(item['申请日'])
|
|
patent.patent_name = item['专利名称']
|
|
patent.patent_type = item['专利类型']
|
|
patent.patent_status = item['专利状态']
|
|
patent.public_id = item['公开号']
|
|
patent.public_date = CommonUtils.excel_time_stamp(item['公开日'])
|
|
patent.inventor = item['发明人']
|
|
patent.update_time = CommonUtils.get_current_time()
|
|
data_list.append(patent.fields_toggle())
|
|
DB_TEST.insert_many_data(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
data_list
|
|
)
|
|
return 'success'
|
|
else:
|
|
return 'parameter exception'
|
|
|
|
@staticmethod
|
|
def delete(**kwargs):
|
|
"""删除"""
|
|
param = kwargs['param']
|
|
_id = param['_id']
|
|
|
|
res = DB_TEST.delete_single_data(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
{"_id": ObjectId(_id)}
|
|
)
|
|
|
|
if res:
|
|
result = {
|
|
"info": "success",
|
|
}
|
|
else:
|
|
result = {
|
|
"info": "no result"
|
|
}
|
|
return result
|
|
|
|
@staticmethod
|
|
def edit(**kwargs):
|
|
"""编辑"""
|
|
param = kwargs['param']
|
|
_id = param['_id']
|
|
body = param['body']
|
|
try:
|
|
patent = PatentObj()
|
|
patent.application_date = body['申请日']
|
|
patent.patent_name = body['专利名称']
|
|
patent.patent_type = body['专利类型']
|
|
patent.patent_status = body['专利状态']
|
|
patent.application_id = body['申请号']
|
|
patent.public_id = body['公开号']
|
|
patent.public_date = body['公开日']
|
|
patent.inventor = body['发明人']
|
|
patent.update_time = CommonUtils.get_current_time()
|
|
update_data = patent.fields_toggle()
|
|
DB_TEST.update_single_data(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
{"_id": ObjectId(_id)},
|
|
update_data
|
|
)
|
|
return 'success'
|
|
except Exception:
|
|
return 'edit failed'
|
|
|
|
@staticmethod
|
|
def view(**kwargs):
|
|
"""查看"""
|
|
query = kwargs['query']
|
|
cid = query['cid']
|
|
page_num = int(query['page_num'])
|
|
page_size = int(query['page_size'])
|
|
page_size = 10 if page_size > 10 else page_size
|
|
|
|
total = DB_TEST.find_all_data_with_count(
|
|
'WR_DataBase_v2_test',
|
|
'C5.2_专利信息',
|
|
{"企业ID": cid}
|
|
)
|
|
|
|
record = DB_TEST.find_data_by_page_with_sort(
|
|
"WR_DataBase_v2_test",
|
|
"C5.2_专利信息",
|
|
{"企业ID": cid},
|
|
[],
|
|
{'_id': 1},
|
|
page_size,
|
|
page_num
|
|
)
|
|
|
|
if record:
|
|
for rec in record:
|
|
rec['_id'] = str(rec['_id'])
|
|
update_time = record[-1]['更新时间']
|
|
result = {
|
|
"info": "success",
|
|
"result": {
|
|
"total": total,
|
|
"update_time": update_time,
|
|
"records": record
|
|
}
|
|
}
|
|
else:
|
|
result = {
|
|
"info": "no result"
|
|
}
|
|
return result
|