import datetime import os import yaml import random import time TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2luZm8iOnsidWlkIjoiVUlEMDAwMSIsImVtYWlsIjoiZmVjcmliZEBmZWNyLmNvbS5jbiIsIm5hbWUiOiJyb290Iiwicm9sZSI6Ilx1N2JhMVx1NzQwNlx1NTQ1OCIsImRlcGFydG1lbnQiOiJcdTY1NzBcdTViNTdcdTUzMTZcdTkwZTgiLCJyb2xlX2lkIjoiUk9MRTAxIiwiZGVwYXJ0bWVudF9pZCI6IkQwMDEifSwiZXhwIjoxNjgzNDc2OTA4fQ.8girdw3n0WDktRuK0aSgGor10eb11nIFvRJqUtPZum4" p_map = { "heb": "河北省", "sx": "山西省", "ln": "辽宁省", "jl": "吉林省", "hlj": "黑龙江省", "js": "江苏省", "zj": "浙江省", "ah": "安徽省", "fj": "福建省", "jx": "江西省", "sd": "山东省", "hen": "河南省", "hub": "湖北省", "hun": "湖南省", "gd": "广东省", "han": "海南省", "sc": "四川省", "gz": "贵州省", "yn": "云南省", "snx": "陕西省", "gs": "甘肃省", "qh": "青海省", "tw": "台湾省", "nmg": "内蒙古自治区", "gx": "广西壮族自治区", "xz": "西藏自治区", "nx": "宁夏回族自治区", "xj": "新疆维吾尔自治区", "bj": "北京市", "tj": "天津市", "sh": "上海市", "cq": "重庆市", "xg": "香港特别行政区", "am": "澳门特别行政区" } class CommonUtils(object): @staticmethod def get_current_time(): """获取当前时间""" return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) @staticmethod def random_code(length): """生成随机编码""" choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' code = '' for i in range(length): code += random.choice(choices) return code @staticmethod def time_stamp(param): """ 时间戳转换 Parameters: param: 时间戳数据 Returns: 转换后时间数据 """ if param is None: return '-' elif isinstance(param, str) and len(param) == 0: return '-' else: param = int(param) estiblish_time = param / 1000 put_date = time.localtime(estiblish_time) date = time.strftime("%Y-%m-%d", put_date) return date @staticmethod def excel_time_stamp(param): """ excel时间戳转换 Parameters: param: 时间戳数据 Returns: 转换后时间数据 """ if param is None: return '-' elif isinstance(param, str) and len(param) == 0: return '-' else: param = float(param) delta = datetime.timedelta(days=param) date_1 = datetime.datetime.strptime('1899-12-30', '%Y-%m-%d') + delta date = datetime.datetime.strftime(date_1, '%Y-%m-%d') return date @staticmethod def province_conversion(param): """ 省份转换 Parameters: param: 省份简称 Returns: 转换后省份全称 """ res = [v for k, v in p_map.items() if k == param][0] return res def sub_dict(param1, param2): """ 获取字典的子集 Parameters: param1: 原字典 param2: 子集字段 Returns: 子集 """ return dict((key, value) for key, value in param1.items() if key in param2) def df_iterrows(param): """ 按行以数组形式返回DataFrame的index、data Parameters: param: DataFrame 某个df对象 Returns: result: list 遍历df对象每行数据,包括index """ result = [] for row in param.iterrows(): index, data = row result.append([index] + data.tolist()) return result def get_attr(_dict_, _keys_, **kwargs): """ 获取字典属性值 若发生了键值异常或类型异常,则返回设置的默认值 若没有默认值,则返回None """ value = kwargs['default'] if kwargs.__contains__('default') else None try: text = "_dict_" for _key_ in _keys_: text = text + "['{}']".format(_key_) value = eval(text) except KeyError: pass except TypeError: pass return value def trans_fields_name(_dict_, names1, names2): """ 替换字典键值名称 _dict_: 替换字典 names1: 原字典键值 names2: 新字典键值 """ for i in range(len(names1)): _dict_[names2[i]] = _dict_.pop(names1[i]) return _dict_ def get_yaml(**kwargs): file_rel_path = kwargs['file_rel_path'] file_path = os.path.abspath(os.path.dirname(__file__)) index = file_path.find('Utils') if index != -1: abs_path_root = file_path[:index] file_abs_path = os.path.abspath(abs_path_root + '/Modules/' + file_rel_path) doc_yaml = open(file_abs_path, 'r', encoding="utf-8").read() return yaml.full_load(doc_yaml) def make_multistage_update_body(input_dict): def get_dict_keys(_dict_): if isinstance(_dict_, dict): for key in _dict_.keys(): if isinstance(_dict_[key], dict): get_dict_keys(_dict_[key]) update_keys.append(key) return update_keys def get_dict_value(data, keys): keys_list = keys.split('.') if isinstance(data, dict): for key in keys_list: values = data.get(key) data = values return data try: update_key = '' update_keys = [] get_dict_keys(input_dict) update_keys.reverse() floor = len(update_keys) for i in range(floor): if i == floor - 1: update_key = update_key + update_keys[i] else: update_key = update_key + update_keys[i] + '.' update_value = get_dict_value(input_dict, update_key) return {update_key: update_value} except Exception: return None