import os import shutil import sys import zipfile class FileObjImpl(object): """对象文件创建""" @staticmethod def get_project_abs_path(**kwargs): file_abs_path = os.path.abspath(os.path.dirname(__file__)) name = kwargs['name'] pdf_relative_path = '/static/{}'.format(name) pdf_abs_path = os.path.abspath(file_abs_path + pdf_relative_path) return pdf_abs_path @staticmethod def write_obj_file(**kwargs): """写入obj文件""" path = kwargs['obj_path'] class_data = kwargs['class_data'] def hanlde_field(**args): """处理每个字段""" sub_data = args['sub_data'] # 属性 field_name = sub_data['attribute'] # 类型 types = sub_data['type'] # 校验类型 verify = sub_data['verify'] if verify == 'type': if len(types) != 1: types = 'type={}'.format(types) else: types = 'type={}'.format(types[0]) elif verify == 'instance_list': types = 'instance_list={}'.format(types[0]) else: types = 'func=Validate.{}'.format(types[0]) # 默认 if 'default' in sub_data: if len(sub_data['default']): default = 'default={}, '.format(sub_data['default']) else: default = '' else: default = '' # 补充校验 if 'complement' in sub_data: # 字符长度 if 'length' in sub_data['complement']: if isinstance(sub_data['complement']['length'], int): length = 'length={}, '.format(sub_data['complement']['length']) else: length = '' else: length = '' # 数据范围 if 'in_list' in sub_data['complement']: if len(sub_data['complement']['in_list']): in_list = 'in_list={}, '.format(sub_data['complement']['in_list']) else: in_list = '' else: in_list = '' else: length = '' in_list = '' temp = f"\t{field_name} = ValidateAttr(field='{field_name}', {length}{default}{in_list}{types})" f.write(temp) f.write('\n') with open(path, 'w', encoding='utf-8') as f: f.write('from Utils.ObjUtil import SpecObject') f.write('\n') f.write('from Utils.ValidateUtil import ValidateAttr, Validate') f.write('\n') # 遍历类数据 for item in class_data: class_name = item['class_name'] f.write('\n') f.write('\n') f.write('class {}(SpecObject):'.format(class_name)) f.write('\n') mapping_data = dict() for sub in item['parameter']: mapping_data[sub['attribute']] = sub['chinese'] hanlde_field(sub_data=sub) temp_fields = '\tfields_map = {}'.format(str(mapping_data)) f.write('\n') f.write(temp_fields) f.write('\n') @staticmethod def write_impl_file(**kwargs): """写入impl文件""" path = kwargs['impl_path'] package_data = kwargs['package_data'] package_name = package_data['package_name'] obj_name = package_data['obj_name'] impl_name = package_data['impl_name'] impl_range = package_data['impl_range'] data_base = package_data['data_base'] data_sheet = package_data['data_sheet'] obj_list = list() for obj in package_data['class_data']: obj_list.append(obj['class_name']) obj = ', '.join(obj_list) with open(path, 'w', encoding='utf-8') as f: f.write('from DBHelper.MongoHelperInstance import DB_TEST') f.write('\n') f.write('from Modules.{package_name}.{obj_name} import {obj}'.format(package_name=package_name, obj_name=obj_name, obj=obj)) f.write('\n') f.write('\n') f.write('\n') f.write('class {}(object):'.format(impl_name)) f.write('\n') if '增加' in impl_range: f.write('\n') f.write('\t@staticmethod') f.write('\n') f.write('\tdef create(**kwargs):') f.write('\n') f.write('\t\tdata = kwargs["data"]') f.write('\n') func = package_data['class_data'][-1] f.write('\t\tfunc = {}()'.format(func['class_name'])) f.write('\n') for item in func['parameter']: if item['verify'] == 'type' and item['type'][0].istitle(): # 判断type为对象的情况 func_name = 'func_{}'.format(item['attribute']) # 定义type关联class f.write('\t\t{} = {}()'.format(func_name, item['type'][0])) f.write('\n') # 定位type所属class type_data = [data for data in package_data['class_data'] if data['class_name'] == item['type'][0]] # 遍历class赋值属性 for sub_item in type_data[0]: f.write('\t\t{func_name}.{param} = data["{param}"]'.format(func_name=func_name, param=sub_item['attribute'])) f.write('\n') # 将处理好的class赋值于外层类属性 f.write('\t\tfunc.{param} = {func_name}'.format(param=item['attribute'], func_name=func_name)) f.write('\n') elif item['verify'] == 'instance_list' and item['type'][0].istitle(): # 判断type为列表的情况 func_name = 'func_{}'.format(item['attribute']) f.write('\t\tfunc.{} = list()'.format(item['attribute'])) f.write('\n') f.write('\t\tfor item in data["{}"]:'.format(item['attribute'])) f.write('\n') f.write('\t\t\t{} = {}()'.format(func_name, item['type'][0])) f.write('\n') type_data = [data for data in package_data['class_data'] if data['class_name'] == item['type'][0]] for sub_item in type_data[0]: f.write('\t\t\t{func_name}.{param} = data["{param}"]'.format(func_name=func_name, param=sub_item['attribute'])) f.write('\n') f.write('\t\t\tfunc.{}.append(item)'.format(func_name)) f.write('\n') else: f.write('\t\tfunc.{param} = data["{param}"]'.format(param=item['attribute'])) f.write('\n') # 数据库操作 f.write('\t\tDB_TEST.insert_single_data(') f.write('\n') f.write('\t\t\t"{}",'.format(data_base)) f.write('\n') f.write('\t\t\t"{}",'.format(data_sheet)) f.write('\n') f.write('\t\t\tfunc.fields_toggle()') f.write('\n') f.write('\t\t)') f.write('\n') if '删除' in impl_range: f.write('\n') f.write('\t@staticmethod') f.write('\n') f.write('\tdef delete(**kwargs):') f.write('\n') f.write('\t\tpass') f.write('\n') if '修改' in impl_range: f.write('\n') f.write('\t@staticmethod') f.write('\n') f.write('\tdef update(**kwargs):') f.write('\n') f.write('\t\tpass') f.write('\n') if '查询' in impl_range: f.write('\n') f.write('\t@staticmethod') f.write('\n') f.write('\tdef search(**kwargs):') f.write('\n') f.write('\t\tpass') f.write('\n') @staticmethod def write_route_file(**kwargs): """写入route文件""" path = kwargs['route_path'] package_data = kwargs['package_data'] package_name = package_data['package_name'] impl_name = package_data['impl_name'] route_prefix = package_data['route_prefix'] impl_range = package_data['impl_range'] with open(path, 'w', encoding='utf-8') as f: f.write('from flask import Blueprint, request') f.write('\n') f.write('from Utils.ErrorUtil import APIReturnError') f.write('\n') f.write('from Utils.RouteUtil import RouteParamsCheck') f.write('\n') f.write('from Modules.{package_name}.{impl_name} import {impl_name}'.format(package_name=package_name, impl_name=impl_name)) f.write('\n') f.write('\n') f.write('{route_prefix}_route = Blueprint("{route_prefix}", __name__)'.format(route_prefix=route_prefix)) f.write('\n') if '增加' in impl_range: f.write('\n') f.write('\n') f.write('@{route_prefix}_route.route("/create", methods=["POST"])'.format(route_prefix=route_prefix)) f.write('\n') f.write('def create_route():') f.write('\n') f.write('\ttry:') f.write('\n') f.write('\t\tRouteParamsCheck(req=request.json, params=[]).required()') f.write('\n') f.write('\t\tdata = request.json') f.write('\n') f.write('\t\timpl = {}()'.format(impl_name)) f.write('\n') f.write('\t\tresult = impl.create(data=data)') f.write('\n') f.write('\t\treturn {"info": "新增数据", "result": result}, 200') f.write('\n') f.write('\texcept APIReturnError as e:') f.write('\n') f.write('\t\treturn {"info": e.__str__()}, e.status_code') f.write('\n') if '删除' in impl_range: f.write('\n') f.write('\n') f.write('@{route_prefix}_route.route("/delete", methods=["GET"])'.format(route_prefix=route_prefix)) f.write('\n') f.write('def delete_route():') f.write('\n') f.write('\ttry:') f.write('\n') f.write('\t\tRouteParamsCheck(req=request.args, params=[]).required()') f.write('\n') f.write('\t\tdata = request.args') f.write('\n') f.write('\t\timpl = {}()'.format(impl_name)) f.write('\n') f.write('\t\tresult = impl.delete(data=data)') f.write('\n') f.write('\t\treturn {"info": "删除数据", "result": result}, 200') f.write('\n') f.write('\texcept APIReturnError as e:') f.write('\n') f.write('\t\treturn {"info": e.__str__()}, e.status_code') f.write('\n') if '修改' in impl_range: f.write('\n') f.write('\n') f.write('@{route_prefix}_route.route("/update", methods=["POST"])'.format(route_prefix=route_prefix)) f.write('\n') f.write('def update_route():') f.write('\n') f.write('\ttry:') f.write('\n') f.write('\t\tRouteParamsCheck(req=request.json, params=[]).required()') f.write('\n') f.write('\t\tdata = request.json') f.write('\n') f.write('\t\timpl = {}()'.format(impl_name)) f.write('\n') f.write('\t\tresult = impl.update(data=data)') f.write('\n') f.write('\t\treturn {"info": "修改数据", "result": result}, 200') f.write('\n') f.write('\texcept APIReturnError as e:') f.write('\n') f.write('\t\treturn {"info": e.__str__()}, e.status_code') f.write('\n') if '查询' in impl_range: f.write('\n') f.write('\n') f.write('@{route_prefix}_route.route("/search", methods=["POST"])'.format(route_prefix=route_prefix)) f.write('\n') f.write('def search_route():') f.write('\n') f.write('\ttry:') f.write('\n') f.write('\t\tRouteParamsCheck(req=request.json, params=[]).required()') f.write('\n') f.write('\t\tdata = request.json') f.write('\n') f.write('\t\timpl = {}()'.format(impl_name)) f.write('\n') f.write('\t\tresult = impl.search(data=data)') f.write('\n') f.write('\t\treturn {"info": "查询数据", "result": result}, 200') f.write('\n') f.write('\texcept APIReturnError as e:') f.write('\n') f.write('\t\treturn {"info": e.__str__()}, e.status_code') f.write('\n') def create_package(self, **kwargs): """根据包名创建包""" package_data = kwargs['package_data'] package_name = package_data['package_name'] route_name = package_data['route_name'] obj_name = package_data['obj_name'] impl_name = package_data['impl_name'] class_data = package_data['class_data'] path = self.get_project_abs_path(name=package_name) if os.path.exists(path): return False os.makedirs(path) init_path = os.path.join(path, '__init__.py') route_path = os.path.join(path, r'{}.py'.format(route_name)) obj_path = os.path.join(path, r'{}.py'.format(obj_name)) impl_path = os.path.join(path, r'{}.py'.format(impl_name)) f = open(init_path, 'w') f.close() self.write_obj_file(obj_path=obj_path, class_data=class_data) self.write_impl_file(impl_path=impl_path, package_data=package_data) self.write_route_file(route_path=route_path, package_data=package_data) def new_file_package(self, **kwargs): """新建package""" package_data = kwargs['data'] package_name = package_data['package_name'] # 创建包 self.create_package(package_data=package_data) package_path = self.get_project_abs_path(name=package_name) return package_path @staticmethod def write_file_to_zip(**kwargs): """文件夹转为zip文件""" src_dir = kwargs['src_dir'] zip_name = src_dir + '.zip' z = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) for dirpath, dirnames, filenames in os.walk(src_dir): fpath = dirpath.replace(src_dir, '') fpath = fpath and fpath + os.sep or '' for filename in filenames: z.write(os.path.join(dirpath, filename), fpath + filename) z.close() def return_file_data(self, **kwargs): """返回文件数据流""" package_name = '{}.zip'.format(kwargs['package_name']) zip_path = self.get_project_abs_path(name=package_name) zip_name = open(zip_path, 'rb').read() return zip_name def delete_all_file(self): """删除临时文件夹""" file_path = self.get_project_abs_path(name='') shutil.rmtree(file_path)