import inspect import json import os import shutil import string from pathlib import Path from typing import Optional, List, Any, Type, Tuple from pydantic import BaseModel from sqlalchemy.orm import DeclarativeMeta from utils.code_make_utils.table_model import TableModel, Column from utils.sqlalchemy_common_utils import get_model_config from mods.user.mods.user.models import User from utils.data_utils import JsDict def code_make(dir_path, replace_dic={}, save_path="temp"): """ 增删改查代码生成 @param dir_path: 代码模板 @param replace_dic: 需要替换的名称 @param save_path: 保存路径 """ code_path = Path(dir_path) if not code_path.exists(): os.makedirs(code_path) if code_path.is_dir(): save_path = Path(save_path) shutil.copytree(code_path, save_path) for root, dirs, files in os.walk(save_path): for file in files: file_path = Path(os.path.join(root, file)) item_code_str = file_path.read_text(encoding="utf-8") new_code_str = item_code_str for replace_word, replaced_word in replace_dic.items(): new_code_str = new_code_str.replace(replace_word, replaced_word) with open(file_path, "w", encoding="utf-8") as f: f.write(new_code_str) def to_snake(name): new_name = "" for i in range(len(name)): k = name[i] if name[i] in string.ascii_letters: if name[i].isupper(): if i == 0: k = name[i].lower() else: k = '_' + name[i].lower() new_name += k return new_name def to_upper_camel_case(name): items = name.split("_") new_name = "" for item in items: new_name += item[0].upper() + item[1:] return new_name def make_code_from_model(template_path, save_path, model: Type[DeclarativeMeta], chinese_name: str): """ UpperCamelCaseName snake_name chinese_name """ model_define = inspect.getsource(model) config = get_model_config(model) model_name = model.__name__ upper_camel_case_name = model_name snake_name = to_snake(model_name) add_schema_contents = [] add_optional_schema_contents = [] keys = list(config.cols.keys()) keys = sorted(keys, key=lambda key: model_define.index(key)) for key in keys: col_config = config.cols[key] col = getattr(model, key) add_code_line = "" col_base_type = col_config.col_base_type # id先pass if key == config.id_key: continue if col_base_type in ['int', 'float', 'date', 'datetime', 'bool', 'str']: if col.server_default or col.default or col.nullable: add_code_line = f"{key}: Optional[{col_base_type}]" else: add_code_line = f"{key}: {col_base_type}" elif col_base_type in ['relation']: if col_config.relation.relation_type == 'o2o': add_code_line = f"{key}: {'Optional[Any]'}" elif col_config.relation.relation_type == 'o2m': add_code_line = f"{key}: {'Optional[List[Any]]'}" elif col_config.relation.relation_type == 'm2m': add_code_line = f"{key}: {'Optional[List[Any]]'}" elif col_config.relation.relation_type == 'm2o': add_code_line = f"{key}: {'Optional[Any]'}" else: if col_base_type in ["json", 'jsonb']: add_code_line = f"{key}: {'Optional[Dict[Any,Any]]'} = {'{}'}" else: # 其他类型暂时any add_code_line = f"{key}: {'Any'}" if add_code_line: add_schema_contents.append(add_code_line) add_schema_content = "\n ".join(add_schema_contents) replace_dic = { '"""model_define"""': inspect.getsource(model), '"""add_schema"""': add_schema_content, 'UpperCamelCaseName': upper_camel_case_name, 'snake_name': snake_name, 'chinese_name': chinese_name, 'id_key': config.id_key, } code_make(template_path, replace_dic, save_path=f"{save_path}/{snake_name}") def make_mod_from_models(mod_name: str, save_path: str, model_with_name_list: List[Tuple[Type[DeclarativeMeta], str]]): mod_path = Path(save_path) / mod_name mods_path = Path(save_path) / mod_name / f"mods" if not mods_path.exists(): os.makedirs(mods_path) crud_template_path = Path(__file__).parent / "templates" / "crud_without_model" import_codes = [] router_include_codes = [] for item in model_with_name_list: model = item[0] name = item[1] make_code_from_model(crud_template_path, mods_path, model, name) snake_name = to_snake(model.__name__) import_code_line = f"from .mods.{snake_name}.router import router as {snake_name}_router" router_include_line = f"router.include_router({snake_name}_router)" import_codes.append(import_code_line) router_include_codes.append(router_include_line) # make router.py import_code_content = "\n".join(import_codes) router_include_code_content = "\n".join(router_include_codes) router_code = f'from fastapi import APIRouter\n{import_code_content}\n\nrouter = APIRouter(prefix="/{mod_name}")\n{router_include_code_content}' (mod_path / 'router.py').write_text(router_code, encoding="utf-8") # make common.py (mod_path / 'common.py').write_text( "from context.common import common_db\n\nget_db = common_db.get_db\nBase = common_db.Base", encoding="utf-8") routes = make_routes(mod_name, model_with_name_list) (mod_path / 'routes.json').write_text(json.dumps([routes], ensure_ascii=False), encoding='utf-8') def make_table_model(model: Type[DeclarativeMeta], chinese_name: str): model_config = get_model_config(model) config: TableModel = JsDict({}) config.tableName = model.__tablename__ config.name = chinese_name config.idKey = model_config.id_key config.columns = [] keys = list(model_config.cols.keys()) model_define = inspect.getsource(model) keys = sorted(keys, key=lambda key: model_define.index(key)) for key in keys: col_conf = model_config.cols[key] col = getattr(model, key) column: Column = JsDict({}) column.key = key if col_conf.col_org_type not in ['DeclarativeMeta']: column.name = col.comment or key if col.default or col.server_default or (col.primary_key and col.autoincrement): column.addNeed = False pass else: if not col.nullable: column.addNeed = True column.updateNeed = True else: column.name = key column.hidden = True if col_conf.col_org_type in ['String']: column.type = 'string' elif col_conf.col_org_type in ['BigInteger', 'Integer', "SmallInteger"]: column.type = 'int' elif col_conf.col_org_type in ["Boolean"]: column.type = 'bool' elif col_conf.col_org_type in ["Date"]: column.type = 'date' elif col_conf.col_org_type in ["DateTime"]: column.type = 'datetime' elif col_conf.col_org_type in ["Double", "Float"]: column.type = 'float' elif col_conf.col_org_type in ["UnicodeText", "Text", "Unicode"]: column.type = 'text' column.hidden = True elif col_conf.col_org_type in ["JSON", "JSONB"]: column.hidden = True args = model.__annotations__[key].__args__ if args and type(args[0]).__name__ == 'ModelMetaclass': column.type = 'json' column.config = {} column.config.vueJsonSchemaForm = { "schema": args[0].schema() } elif col_conf.col_org_type in ["Enum"]: column.type = 'enum' elif col_conf.col_org_type in ['DeclarativeMeta']: column.hidden = True if col_conf.relation.relation_type == 'o2o': column.type = 'json' if col_conf.relation.relation_type == 'o2m': column.type = 'set' if col_conf.relation.relation_type == 'm2m': column.type = 'set' if col_conf.relation.relation_type == 'm2o': column.type = 'json' config.columns.append(column) print(config) return config def make_routes(mod_name: str, model_with_name_list: List[Tuple[Type[DeclarativeMeta], str]]): router = { "path": f"/{mod_name}", "name": f"{mod_name}", "component": "Layout", "redirect": f"/{mod_name}/{to_snake(model_with_name_list[0][0].__name__)}", "meta": { "icon": "calendar", "title": "城投管理" }, "children": []} table_models = [] for item in model_with_name_list: table_model = make_table_model(item[0], item[1]) table_model.baseUrl = f"/fastapi_crud_template/{mod_name}/{table_model.tableName}" table_model.name = item[1] table_models.append(table_model) route = { "path": f"/{mod_name}/{table_model.tableName}", "name": table_model.name, "component": "tablePlus/index", "meta": { "keepAlive": True, "title": item[1], "tableModel": table_model } } router['children'].append(route) return router