api-datamanager/Utils/CommonUtil.py

133 lines
3.3 KiB
Python
Raw Normal View History

2022-06-23 16:44:09 +08:00
import os
import yaml
import random
import time
class CommonUtils(object):
@staticmethod
def get_current_time():
"""获取当前时间"""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
@staticmethod
def random_code(length):
"""生成随机编码"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
code = ''
for i in range(length):
code += random.choice(choices)
return code
2022-06-23 16:44:09 +08:00
def sub_dict(param1, param2):
"""
获取字典的子集
Parameters:
param1: 原字典
param2: 子集字段
Returns:
子集
"""
return dict((key, value) for key, value in param1.items() if key in param2)
def df_iterrows(param):
"""
按行以数组形式返回DataFrame的indexdata
Parameters:
param: DataFrame 某个df对象
Returns:
result: list 遍历df对象每行数据包括index
"""
result = []
for row in param.iterrows():
index, data = row
result.append([index] + data.tolist())
return result
def get_attr(_dict_, _keys_, **kwargs):
"""
获取字典属性值
若发生了键值异常或类型异常,则返回设置的默认值
若没有默认值,则返回None
"""
value = kwargs['default'] if kwargs.__contains__('default') else None
try:
text = "_dict_"
for _key_ in _keys_:
text = text + "['{}']".format(_key_)
value = eval(text)
except KeyError:
pass
except TypeError:
pass
return value
def trans_fields_name(_dict_, names1, names2):
"""
替换字典键值名称
_dict_: 替换字典
names1: 原字典键值
names2: 新字典键值
"""
for i in range(len(names1)):
_dict_[names2[i]] = _dict_.pop(names1[i])
return _dict_
def get_yaml(**kwargs):
file_rel_path = kwargs['file_rel_path']
file_path = os.path.abspath(os.path.dirname(__file__))
index = file_path.find('Utils')
if index != -1:
abs_path_root = file_path[:index]
file_abs_path = os.path.abspath(abs_path_root + '/Modules/' + file_rel_path)
doc_yaml = open(file_abs_path, 'r', encoding="utf-8").read()
return yaml.full_load(doc_yaml)
def make_multistage_update_body(input_dict):
def get_dict_keys(_dict_):
if isinstance(_dict_, dict):
for key in _dict_.keys():
if isinstance(_dict_[key], dict):
get_dict_keys(_dict_[key])
update_keys.append(key)
return update_keys
def get_dict_value(data, keys):
keys_list = keys.split('.')
if isinstance(data, dict):
for key in keys_list:
values = data.get(key)
data = values
return data
try:
update_key = ''
update_keys = []
get_dict_keys(input_dict)
update_keys.reverse()
floor = len(update_keys)
for i in range(floor):
if i == floor - 1:
update_key = update_key + update_keys[i]
else:
update_key = update_key + update_keys[i] + '.'
update_value = get_dict_value(input_dict, update_key)
return {update_key: update_value}
except Exception:
return None