252 lines
9.5 KiB
Python
252 lines
9.5 KiB
Python
import inspect
|
|
import json
|
|
import os
|
|
import shutil
|
|
import string
|
|
from pathlib import Path
|
|
from typing import Optional, List, Any, Type, Tuple
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import DeclarativeMeta
|
|
|
|
from utils.code_make_utils.table_model import TableModel, Column
|
|
from utils.sqlalchemy_common_utils import get_model_config
|
|
from mods.user.mods.user.models import User
|
|
from utils.data_utils import JsDict
|
|
|
|
|
|
def code_make(dir_path, replace_dic={}, save_path="temp"):
|
|
"""
|
|
增删改查代码生成
|
|
@param dir_path: 代码模板
|
|
@param replace_dic: 需要替换的名称
|
|
@param save_path: 保存路径
|
|
"""
|
|
code_path = Path(dir_path)
|
|
if not code_path.exists():
|
|
os.makedirs(code_path)
|
|
if code_path.is_dir():
|
|
save_path = Path(save_path)
|
|
shutil.copytree(code_path, save_path)
|
|
for root, dirs, files in os.walk(save_path):
|
|
for file in files:
|
|
file_path = Path(os.path.join(root, file))
|
|
item_code_str = file_path.read_text(encoding="utf-8")
|
|
new_code_str = item_code_str
|
|
for replace_word, replaced_word in replace_dic.items():
|
|
new_code_str = new_code_str.replace(replace_word, replaced_word)
|
|
with open(file_path, "w", encoding="utf-8") as f:
|
|
f.write(new_code_str)
|
|
|
|
|
|
def to_snake(name):
|
|
new_name = ""
|
|
for i in range(len(name)):
|
|
k = name[i]
|
|
if name[i] in string.ascii_letters:
|
|
if name[i].isupper():
|
|
if i == 0:
|
|
k = name[i].lower()
|
|
else:
|
|
k = '_' + name[i].lower()
|
|
|
|
new_name += k
|
|
return new_name
|
|
|
|
|
|
def to_upper_camel_case(name):
|
|
items = name.split("_")
|
|
new_name = ""
|
|
for item in items:
|
|
new_name += item[0].upper() + item[1:]
|
|
return new_name
|
|
|
|
|
|
def make_code_from_model(template_path, save_path, model: Type[DeclarativeMeta], chinese_name: str):
|
|
"""
|
|
UpperCamelCaseName
|
|
snake_name
|
|
chinese_name
|
|
"""
|
|
model_define = inspect.getsource(model)
|
|
|
|
config = get_model_config(model)
|
|
model_name = model.__name__
|
|
upper_camel_case_name = model_name
|
|
snake_name = to_snake(model_name)
|
|
|
|
add_schema_contents = []
|
|
add_optional_schema_contents = []
|
|
keys = list(config.cols.keys())
|
|
keys = sorted(keys, key=lambda key: model_define.index(key))
|
|
for key in keys:
|
|
col_config = config.cols[key]
|
|
col = getattr(model, key)
|
|
add_code_line = ""
|
|
col_base_type = col_config.col_base_type
|
|
# id先pass
|
|
if key == config.id_key:
|
|
continue
|
|
if col_base_type in ['int', 'float', 'date', 'datetime', 'bool', 'str']:
|
|
if col.server_default or col.default or col.nullable:
|
|
add_code_line = f"{key}: Optional[{col_base_type}]"
|
|
else:
|
|
add_code_line = f"{key}: {col_base_type}"
|
|
elif col_base_type in ['relation']:
|
|
if col_config.relation.relation_type == 'o2o':
|
|
add_code_line = f"{key}: {'Optional[Any]'}"
|
|
elif col_config.relation.relation_type == 'o2m':
|
|
add_code_line = f"{key}: {'Optional[List[Any]]'}"
|
|
elif col_config.relation.relation_type == 'm2m':
|
|
add_code_line = f"{key}: {'Optional[List[Any]]'}"
|
|
elif col_config.relation.relation_type == 'm2o':
|
|
add_code_line = f"{key}: {'Optional[Any]'}"
|
|
else:
|
|
if col_base_type in ["json", 'jsonb']:
|
|
add_code_line = f"{key}: {'Optional[Dict[Any,Any]]'} = {'{}'}"
|
|
else:
|
|
# 其他类型暂时any
|
|
add_code_line = f"{key}: {'Any'}"
|
|
if add_code_line:
|
|
add_schema_contents.append(add_code_line)
|
|
add_schema_content = "\n ".join(add_schema_contents)
|
|
replace_dic = {
|
|
'"""model_define"""': inspect.getsource(model),
|
|
'"""add_schema"""': add_schema_content,
|
|
'UpperCamelCaseName': upper_camel_case_name,
|
|
'snake_name': snake_name,
|
|
'chinese_name': chinese_name,
|
|
'id_key': config.id_key,
|
|
}
|
|
code_make(template_path, replace_dic, save_path=f"{save_path}/{snake_name}")
|
|
|
|
|
|
def make_mod_from_models(mod_name: str, save_path: str, model_with_name_list: List[Tuple[Type[DeclarativeMeta], str]]):
|
|
mod_path = Path(save_path) / mod_name
|
|
mods_path = Path(save_path) / mod_name / f"mods"
|
|
if not mods_path.exists():
|
|
os.makedirs(mods_path)
|
|
crud_template_path = Path(__file__).parent / "templates" / "crud_without_model"
|
|
import_codes = []
|
|
router_include_codes = []
|
|
for item in model_with_name_list:
|
|
model = item[0]
|
|
name = item[1]
|
|
make_code_from_model(crud_template_path, mods_path, model, name)
|
|
snake_name = to_snake(model.__name__)
|
|
import_code_line = f"from .mods.{snake_name}.router import router as {snake_name}_router"
|
|
router_include_line = f"router.include_router({snake_name}_router)"
|
|
import_codes.append(import_code_line)
|
|
router_include_codes.append(router_include_line)
|
|
# make router.py
|
|
import_code_content = "\n".join(import_codes)
|
|
router_include_code_content = "\n".join(router_include_codes)
|
|
router_code = f'from fastapi import APIRouter\n{import_code_content}\n\nrouter = APIRouter(prefix="/{mod_name}")\n{router_include_code_content}'
|
|
(mod_path / 'router.py').write_text(router_code, encoding="utf-8")
|
|
|
|
# make common.py
|
|
(mod_path / 'common.py').write_text(
|
|
"from context.common import common_db\n\nget_db = common_db.get_db\nBase = common_db.Base", encoding="utf-8")
|
|
|
|
routes = make_routes(mod_name, model_with_name_list)
|
|
(mod_path / 'routes.json').write_text(json.dumps([routes], ensure_ascii=False), encoding='utf-8')
|
|
|
|
|
|
def make_table_model(model: Type[DeclarativeMeta], chinese_name: str):
|
|
model_config = get_model_config(model)
|
|
config: TableModel = JsDict({})
|
|
config.tableName = model.__tablename__
|
|
config.name = chinese_name
|
|
config.idKey = model_config.id_key
|
|
config.columns = []
|
|
keys = list(model_config.cols.keys())
|
|
model_define = inspect.getsource(model)
|
|
keys = sorted(keys, key=lambda key: model_define.index(key))
|
|
for key in keys:
|
|
col_conf = model_config.cols[key]
|
|
col = getattr(model, key)
|
|
column: Column = JsDict({})
|
|
column.key = key
|
|
if col_conf.col_org_type not in ['DeclarativeMeta']:
|
|
column.name = col.comment or key
|
|
if col.default or col.server_default or (col.primary_key and col.autoincrement):
|
|
column.addNeed = False
|
|
pass
|
|
else:
|
|
if not col.nullable:
|
|
column.addNeed = True
|
|
column.updateNeed = True
|
|
|
|
else:
|
|
column.name = key
|
|
column.hidden = True
|
|
if col_conf.col_org_type in ['String']:
|
|
column.type = 'string'
|
|
elif col_conf.col_org_type in ['BigInteger', 'Integer', "SmallInteger"]:
|
|
column.type = 'int'
|
|
elif col_conf.col_org_type in ["Boolean"]:
|
|
column.type = 'bool'
|
|
elif col_conf.col_org_type in ["Date"]:
|
|
column.type = 'date'
|
|
elif col_conf.col_org_type in ["DateTime"]:
|
|
column.type = 'datetime'
|
|
elif col_conf.col_org_type in ["Double", "Float"]:
|
|
column.type = 'float'
|
|
elif col_conf.col_org_type in ["UnicodeText", "Text", "Unicode"]:
|
|
column.type = 'text'
|
|
column.hidden = True
|
|
elif col_conf.col_org_type in ["JSON", "JSONB"]:
|
|
column.hidden = True
|
|
args = model.__annotations__[key].__args__
|
|
if args and type(args[0]).__name__ == 'ModelMetaclass':
|
|
column.type = 'json'
|
|
column.config = {}
|
|
column.config.vueJsonSchemaForm = {
|
|
"schema": args[0].schema()
|
|
}
|
|
|
|
elif col_conf.col_org_type in ["Enum"]:
|
|
column.type = 'enum'
|
|
elif col_conf.col_org_type in ['DeclarativeMeta']:
|
|
column.hidden = True
|
|
if col_conf.relation.relation_type == 'o2o':
|
|
column.type = 'json'
|
|
if col_conf.relation.relation_type == 'o2m':
|
|
column.type = 'set'
|
|
if col_conf.relation.relation_type == 'm2m':
|
|
column.type = 'set'
|
|
if col_conf.relation.relation_type == 'm2o':
|
|
column.type = 'json'
|
|
config.columns.append(column)
|
|
return config
|
|
|
|
|
|
def make_routes(mod_name: str, model_with_name_list: List[Tuple[Type[DeclarativeMeta], str]]):
|
|
router = {
|
|
"path": f"/{mod_name}",
|
|
"name": f"{mod_name}",
|
|
"component": "Layout",
|
|
"redirect": f"/{mod_name}/{to_snake(model_with_name_list[0][0].__name__)}",
|
|
"meta": {
|
|
"icon": "calendar",
|
|
"title": "城投管理"
|
|
},
|
|
"children": []}
|
|
table_models = []
|
|
for item in model_with_name_list:
|
|
table_model = make_table_model(item[0], item[1])
|
|
table_model.baseUrl = f"/fastapi_crud_template/{mod_name}/{table_model.tableName}"
|
|
table_model.name = item[1]
|
|
table_models.append(table_model)
|
|
route = {
|
|
"path": f"/{mod_name}/{table_model.tableName}",
|
|
"name": table_model.name,
|
|
"component": "tablePlus/index",
|
|
"meta": {
|
|
"keepAlive": True,
|
|
"title": item[1],
|
|
"tableModel": table_model
|
|
}
|
|
}
|
|
router['children'].append(route)
|
|
return router
|