dataplatform/Utils/TycApi/CommonUtil.py

224 lines
5.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import os
import yaml
import random
import time
TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2luZm8iOnsidWlkIjoiVUlEMDAwMSIsImVtYWlsIjoiZmVjcmliZEBmZWNyLmNvbS5jbiIsIm5hbWUiOiJyb290Iiwicm9sZSI6Ilx1N2JhMVx1NzQwNlx1NTQ1OCIsImRlcGFydG1lbnQiOiJcdTY1NzBcdTViNTdcdTUzMTZcdTkwZTgiLCJyb2xlX2lkIjoiUk9MRTAxIiwiZGVwYXJ0bWVudF9pZCI6IkQwMDEifSwiZXhwIjoxNjgzNDc2OTA4fQ.8girdw3n0WDktRuK0aSgGor10eb11nIFvRJqUtPZum4"
p_map = {
"heb": "河北省",
"sx": "山西省",
"ln": "辽宁省",
"jl": "吉林省",
"hlj": "黑龙江省",
"js": "江苏省",
"zj": "浙江省",
"ah": "安徽省",
"fj": "福建省",
"jx": "江西省",
"sd": "山东省",
"hen": "河南省",
"hub": "湖北省",
"hun": "湖南省",
"gd": "广东省",
"han": "海南省",
"sc": "四川省",
"gz": "贵州省",
"yn": "云南省",
"snx": "陕西省",
"gs": "甘肃省",
"qh": "青海省",
"tw": "台湾省",
"nmg": "内蒙古自治区",
"gx": "广西壮族自治区",
"xz": "西藏自治区",
"nx": "宁夏回族自治区",
"xj": "新疆维吾尔自治区",
"bj": "北京市",
"tj": "天津市",
"sh": "上海市",
"cq": "重庆市",
"xg": "香港特别行政区",
"am": "澳门特别行政区"
}
class CommonUtils(object):
@staticmethod
def get_current_time():
"""获取当前时间"""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
@staticmethod
def random_code(length):
"""生成随机编码"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
code = ''
for i in range(length):
code += random.choice(choices)
return code
@staticmethod
def time_stamp(param):
"""
时间戳转换
Parameters:
param: 时间戳数据
Returns:
转换后时间数据
"""
if param is None:
return '-'
elif isinstance(param, str) and len(param) == 0:
return '-'
else:
param = int(param)
estiblish_time = param / 1000
put_date = time.localtime(estiblish_time)
date = time.strftime("%Y-%m-%d", put_date)
return date
@staticmethod
def excel_time_stamp(param):
"""
excel时间戳转换
Parameters:
param: 时间戳数据
Returns:
转换后时间数据
"""
if param is None:
return '-'
elif isinstance(param, str) and len(param) == 0:
return '-'
else:
param = float(param)
delta = datetime.timedelta(days=param)
date_1 = datetime.datetime.strptime('1899-12-30', '%Y-%m-%d') + delta
date = datetime.datetime.strftime(date_1, '%Y-%m-%d')
return date
@staticmethod
def province_conversion(param):
"""
省份转换
Parameters:
param: 省份简称
Returns:
转换后省份全称
"""
res = [v for k, v in p_map.items() if k == param][0]
return res
def sub_dict(param1, param2):
"""
获取字典的子集
Parameters:
param1: 原字典
param2: 子集字段
Returns:
子集
"""
return dict((key, value) for key, value in param1.items() if key in param2)
def df_iterrows(param):
"""
按行以数组形式返回DataFrame的index、data
Parameters:
param: DataFrame 某个df对象
Returns:
result: list 遍历df对象每行数据包括index
"""
result = []
for row in param.iterrows():
index, data = row
result.append([index] + data.tolist())
return result
def get_attr(_dict_, _keys_, **kwargs):
"""
获取字典属性值
若发生了键值异常或类型异常,则返回设置的默认值
若没有默认值,则返回None
"""
value = kwargs['default'] if kwargs.__contains__('default') else None
try:
text = "_dict_"
for _key_ in _keys_:
text = text + "['{}']".format(_key_)
value = eval(text)
except KeyError:
pass
except TypeError:
pass
return value
def trans_fields_name(_dict_, names1, names2):
"""
替换字典键值名称
_dict_: 替换字典
names1: 原字典键值
names2: 新字典键值
"""
for i in range(len(names1)):
_dict_[names2[i]] = _dict_.pop(names1[i])
return _dict_
def get_yaml(**kwargs):
file_rel_path = kwargs['file_rel_path']
file_path = os.path.abspath(os.path.dirname(__file__))
index = file_path.find('Utils')
if index != -1:
abs_path_root = file_path[:index]
file_abs_path = os.path.abspath(abs_path_root + '/Modules/' + file_rel_path)
doc_yaml = open(file_abs_path, 'r', encoding="utf-8").read()
return yaml.full_load(doc_yaml)
def make_multistage_update_body(input_dict):
def get_dict_keys(_dict_):
if isinstance(_dict_, dict):
for key in _dict_.keys():
if isinstance(_dict_[key], dict):
get_dict_keys(_dict_[key])
update_keys.append(key)
return update_keys
def get_dict_value(data, keys):
keys_list = keys.split('.')
if isinstance(data, dict):
for key in keys_list:
values = data.get(key)
data = values
return data
try:
update_key = ''
update_keys = []
get_dict_keys(input_dict)
update_keys.reverse()
floor = len(update_keys)
for i in range(floor):
if i == floor - 1:
update_key = update_key + update_keys[i]
else:
update_key = update_key + update_keys[i] + '.'
update_value = get_dict_value(input_dict, update_key)
return {update_key: update_value}
except Exception:
return None