import json import random from datetime import datetime, timedelta from DBHelper.MongoHelperInstance import DB_GUA from Modules.Common.CommonUtils import CommonUtils class CompanyUtils(object): # 生成新的企业ID,如果该ID存在,则重新生成 @staticmethod def make_new_cid(): new_cid = CommonUtils.random_code(8) case = DB_GUA.find_single_column( "管理端", "企业数据", {"企业ID": new_cid}, "企业ID" ) is not None while case: new_cid = CommonUtils.random_code(8) return new_cid class ExcelParserUtil(object): # 转换excel日期 @staticmethod def trans_date_fmt_1(days): if type(days) == str: # 特殊情况:比如某些高管信息出生日期只有年份,则返回年份 return days else: delta = timedelta(days=days) # 将1899-12-30转化为可以计算的时间格式并加上要转化的日期戳 _date = datetime.strptime('1899/12/30', '%Y/%m/%d') + delta return datetime.strftime(_date, '%Y年%m月%d日') # 转换excel日期 @staticmethod def trans_date_fmt_2(days): delta = timedelta(days=days) # 将1899-12-30转化为可以计算的时间格式并加上要转化的日期戳 _date = datetime.strptime('1899/12/30', '%Y/%m/%d') + delta return datetime.strftime(_date, '%Y-%m-%d') # 转换excel日期 @staticmethod def trans_date_fmt_3(_date): """""" # 删除列表中所有'' @staticmethod def list_remove_blank(_list): while '' in _list: _list.remove('') return _list # 列表中的''转换为None @staticmethod def list_to_none(_list): return list(map(lambda x: None if x == '' else x, _list)) # 列表中的float类型数据都保留2位小数 @staticmethod def list_decimal(_list): return list(map(lambda _v: round(_v, 2) if isinstance(_v, float) else _v, _list)) # 列表中的float类型数据都转为为int类型 @staticmethod def list_to_int(_list): return list(map(lambda _v: int(_v) if isinstance(_v, float) else _v, _list)) class ExcelSheetParser(object): def __init__(self, sheet): self._sheet = sheet # 解析企业信息 def parse_sheet1(self): # 表头 cols_tag = self._sheet.col_values(1) cols_tag.pop(0) # 表值 cols_val = self._sheet.col_values(2) cols_val.pop(0) # 组织代码 cols_val[5] = str(cols_val[5]) # 参保人数 cols_val[16] = int(cols_val[16]) if type(cols_val[16]) == float else cols_val[16] # 数组中的空字符串转为None list_0 = ExcelParserUtil.list_to_none(cols_val) # 数组中的float类型数据都保留2位小数 list_1 = ExcelParserUtil.list_decimal(list_0) # 拼接 data = json.loads(json.dumps(dict(zip(cols_tag, list_1)))) # 转换日期 data['成立日期'] = ExcelParserUtil.trans_date_fmt_1(data['成立日期']) return data # 解析高管信息、股东信息 def parse_sheet2(self): # 高管信息 cols_tag1 = self._sheet.row_values(1)[1:7] n_rows1 = len(ExcelParserUtil.list_remove_blank(self._sheet.col_values(1)[2:])) return_data1 = [] for i in range(2, n_rows1 + 2): list_0 = ExcelParserUtil.list_to_none(self._sheet.row_values(i)[1:7]) list_1 = ExcelParserUtil.list_decimal(list_0) # 出生日期 list_1[3] = ExcelParserUtil.trans_date_fmt_1(list_1[3]) data = json.loads(json.dumps(dict(zip(cols_tag1, list_1)))) return_data1.append(data) # 股东信息 cols_tag2 = self._sheet.row_values(1)[8:] n_rows2 = len(ExcelParserUtil.list_remove_blank(self._sheet.col_values(8)[2:])) return_data2 = [] for i in range(2, n_rows2 + 2): list_0 = ExcelParserUtil.list_to_none(self._sheet.row_values(i)[8:]) list_1 = ExcelParserUtil.list_decimal(list_0) data = json.loads(json.dumps(dict(zip(cols_tag2, list_1)))) return_data2.append(data) return return_data1, return_data2 # 解析补充数据表 def parse_sheet3(self): cols_tag = self._sheet.col_values(0) return_data = [] for i in range(1, 5): list_0 = ExcelParserUtil.list_to_none(self._sheet.col_values(i)) list_1 = ExcelParserUtil.list_decimal(list_0) data = json.loads(json.dumps(dict(zip(cols_tag, list_1)))) return_data.append(data) return return_data # 解析客户信息 def parse_sheet4(self): cols_tag = self._sheet.row_values(1)[1:] cols_tag[0] = "报告期" n_rows = len(ExcelParserUtil.list_remove_blank(self._sheet.col_values(1)[2:])) return_data = [] for i in range(2, n_rows + 2): list_0 = ExcelParserUtil.list_to_none(self._sheet.row_values(i)[1:]) list_1 = ExcelParserUtil.list_decimal(list_0) data = json.loads(json.dumps(dict(zip(cols_tag, list_1)))) data['担保起始日'] = ExcelParserUtil.trans_date_fmt_2(data['担保起始日']) data['担保截止日'] = ExcelParserUtil.trans_date_fmt_2(data['担保截止日']) return_data.append(data) return return_data # 解析区域分布、行业分布 def parse_sheet5(self): cols_tag = self._sheet.row_values(1)[1:] cols_tag[0] = "报告期" n_rows = len(ExcelParserUtil.list_remove_blank(self._sheet.col_values(1)[2:])) return_data = [] for i in range(2, n_rows + 2): list_0 = ExcelParserUtil.list_to_none(self._sheet.row_values(i)[1:]) list_1 = ExcelParserUtil.list_decimal(list_0) data = json.loads(json.dumps(dict(zip(cols_tag, list_1)))) return_data.append(data) return return_data # 解析担保金额分布 def parse_sheet6(self): _keys = ["金额<=100w", "100w<金额<=500w", "500w<金额<=1000w", "1000w<金额<=3000w", "3000w<金额<=5000w", "5000w<金额<=8000w", "8000w<金额<=10000w", "金额>10000w", "合计"] _dict_list = list() for c in range(2, 13, 3): # c: col_num _year = self._sheet.col_values(c)[1] _dict = {"报告期": _year} for s in range(0, 3): # s: step _values = ExcelParserUtil.list_to_none(self._sheet.col_values(c+s)[3:13]) _name = self._sheet.col_values(c+s)[2] if _name == "担保责任余额": _values = ExcelParserUtil.list_decimal(_values) else: _values = ExcelParserUtil.list_to_int(_values) _dict[_name] = json.loads(json.dumps(dict(zip(_keys, _values)))) _dict_list.append(_dict) return _dict_list # 解析资产负债表 def parse_sheet7(self): cols_tag = self._sheet.col_values(0) return_data = [] for i in range(1, 5): list_0 = ExcelParserUtil.list_to_none(self._sheet.col_values(i)) list_1 = ExcelParserUtil.list_decimal(list_0) data = json.loads(json.dumps(dict(zip(cols_tag, list_1)))) data.pop('流动资产:') data.pop('非流动资产:') data.pop('流动负债:') data.pop('非流动负债:') data.pop('所有者权益:') return_data.append(data) return return_data # 解析利润表 def parse_sheet8(self): cols_tag = list(map(lambda x: x.strip().replace('减:', '').replace('加:', ''), self._sheet.col_values(0))) return_data = [] for i in range(1, 5): list_0 = ExcelParserUtil.list_to_none(self._sheet.col_values(i)) list_1 = ExcelParserUtil.list_decimal(list_0) data = json.loads(json.dumps(dict(zip(cols_tag, list_1)))) return_data.append(data) return return_data