api-datamanager/Modules/File/FileImpl.py

190 lines
7.4 KiB
Python

import os
import sys
import zipfile
class FileObjImpl(object):
"""对象文件创建"""
@staticmethod
def get_project_abs_path(**kwargs):
file_abs_path = os.path.abspath(os.path.dirname(__file__))
name = kwargs['name']
pdf_relative_path = '/static/{}'.format(name)
pdf_abs_path = os.path.abspath(file_abs_path + pdf_relative_path)
return pdf_abs_path
def create_package(self, path, route_path, obj_path, impl_path):
"""根据包名创建包"""
path = self.get_project_abs_path(name=path)
if os.path.exists(path):
return r"E:\TFSE\wide_rating_v0.21\Modules\File\static\TestFile\{}".format(path)
os.makedirs(path)
# 将__init__.py文件
init_path = os.path.join(path, '__init__.py')
route_path = os.path.join(path, r'{}.py'.format(route_path))
obj_path = os.path.join(path, r'{}.py'.format(obj_path))
impl_path = os.path.join(path, r'{}.py'.format(impl_path))
# 合入路径, 打开路径进入写的状态
f0 = open(init_path, 'w')
f0.close()
f1 = open(route_path, 'w')
f1.close()
f2 = open(obj_path, 'w')
f2.close()
f3 = open(impl_path, 'w')
f3.close()
def new_file_obj(self, **kwargs):
"""
新建对象文件
1.根据参数动态创建class类
2.将代码写入.py文件保存
"""
data = kwargs['data']
name = data['class_name']
file_name = '{}.py'.format(name)
parameter = data['parameter']
filename = self.get_project_abs_path(name=file_name)
with open(filename, 'w', encoding="utf-8") as f:
f.write('from Utils.ObjUtil import SpecObject')
f.write('\n')
f.write('from Utils.ValidateUtil import ValidateAttr, Validate')
f.write('\n')
f.write('\n')
f.write('\n')
f.write('class {}(SpecObject):'.format(name))
f.write('\n')
def handle_fields(**args):
fields = args['param']
fields_name = args['class_name']
level = args['levels']
kind = args['kind']
# 判断类型(list)
if fields['types'] == 'classmethod' and kind == 'list':
list_instance = 'instance_list={}'.format(fields_name)
else:
list_instance = ''
# 判断类型(dict)
if len(list_instance) == 0:
if fields['types'] == 'classmethod' and kind == 'dict':
types = 'type={}'.format(fields_name)
else:
if fields['types'] == 'date':
types = 'func=Validate.date_format'
elif fields['types'] == 'time':
types = 'func=Validate.time_format'
else:
types = 'type={}'.format(fields['types'])
else:
types = ''
# 判断长度
if 'length' in fields:
length = 'length={}, '.format(fields['length'])
else:
length = ''
# 包含列表
if 'in_list' in fields:
in_list = 'in_list={}, '.format(fields['in_list'])
else:
in_list = ''
# 类名
class_names = fields['name']
if level == 'frist':
temp = f"\t{class_names} = ValidateAttr(field='{class_names}', {length}{list_instance}{in_list}{types})"
else:
temp = f"\t\t{class_names} = ValidateAttr(field='{class_names}', {length}{list_instance}{in_list}{types})"
f.write(temp)
f.write('\n')
# 写入子类
def hanlde_special(temp_data, temp_type):
f.write('\n')
f.write('\tclass {}(SpecObject):'.format(temp_data['class_name']))
f.write('\n')
temp_dict = dict()
for list_param in temp_data['parameter']:
# fields_map映射数据
temp_dict[list_param['name']] = list_param['chinese']
# 写入每个fields
handle_fields(param=list_param, class_name=temp_data['class_name'], levels='second', kind=temp_type)
temp_fields = '\t\tfields_map = {}'.format(str(temp_dict))
f.write('\n')
f.write(temp_fields)
f.write('\n')
# 第一次遍历,创建参数中的子类对象
for param in parameter:
# 为instance_list新建对象
if 'instance_list' in param:
instance_list = param['instance_list']
hanlde_special(instance_list, 'list')
# 为instance_dict新建对象
if 'instance_dict' in param:
instance_dict = param['instance_dict']
hanlde_special(instance_dict, 'dict')
f.write('\n')
first_dict = dict()
for param in parameter:
first_dict[param['name']] = param['chinese']
class_name = None
temp_type = None
if 'instance_list' in param:
instance_list = param['instance_list']
class_name = instance_list['class_name']
temp_type = 'list'
if 'instance_dict' in param:
instance_dict = param['instance_dict']
class_name = instance_dict['class_name']
temp_type = 'dict'
handle_fields(param=param, class_name=class_name, levels='frist', kind=temp_type)
f.write('\n')
frist_fields = '\tfields_map = {}'.format(str(first_dict))
f.write(frist_fields)
f.write('\n')
return '新建成功'
def new_file_package(self, **kwargs):
"""新建package,并以zip格式返回"""
package_data = kwargs['data']
package_name = package_data['package_name']
route_name = package_data['route_name']
obj_name = package_data['obj_name']
impl_name = package_data['impl_name']
# 创建包
self.create_package(package_name, route_name, obj_name, impl_name)
package_path = r"E:\TFSE\wide_rating_v0.21\Modules\File\static\{}".format(package_name)
return package_path
@staticmethod
def write_file_to_zip( **kwargs):
"""文件夹转为zip文件"""
src_dir = kwargs['src_dir']
zip_name = src_dir + '.zip'
z = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
for dirpath, dirnames, filenames in os.walk(src_dir):
fpath = dirpath.replace(src_dir, '')
fpath = fpath and fpath + os.sep or ''
for filename in filenames:
z.write(os.path.join(dirpath, filename), fpath + filename)
z.close()
@staticmethod
def return_file_data(**kwargs):
"""返回文件数据流"""
package_name = kwargs['package_name']
zip_path = r"E:\TFSE\wide_rating_v0.21\Modules\File\static\{}.zip".format(package_name)
zip_name = open(zip_path, 'rb').read()
return zip_name