wd-rating/utils/code_make_utils/make_from_model.py

252 lines
9.5 KiB
Python

import inspect
import json
import os
import shutil
import string
from pathlib import Path
from typing import Optional, List, Any, Type, Tuple
from pydantic import BaseModel
from sqlalchemy.orm import DeclarativeMeta
from utils.code_make_utils.table_model import TableModel, Column
from utils.sqlalchemy_common_utils import get_model_config
from mods.user.mods.user.models import User
from utils.data_utils import JsDict
def code_make(dir_path, replace_dic={}, save_path="temp"):
"""
增删改查代码生成
@param dir_path: 代码模板
@param replace_dic: 需要替换的名称
@param save_path: 保存路径
"""
code_path = Path(dir_path)
if not code_path.exists():
os.makedirs(code_path)
if code_path.is_dir():
save_path = Path(save_path)
shutil.copytree(code_path, save_path)
for root, dirs, files in os.walk(save_path):
for file in files:
file_path = Path(os.path.join(root, file))
item_code_str = file_path.read_text(encoding="utf-8")
new_code_str = item_code_str
for replace_word, replaced_word in replace_dic.items():
new_code_str = new_code_str.replace(replace_word, replaced_word)
with open(file_path, "w", encoding="utf-8") as f:
f.write(new_code_str)
def to_snake(name):
new_name = ""
for i in range(len(name)):
k = name[i]
if name[i] in string.ascii_letters:
if name[i].isupper():
if i == 0:
k = name[i].lower()
else:
k = '_' + name[i].lower()
new_name += k
return new_name
def to_upper_camel_case(name):
items = name.split("_")
new_name = ""
for item in items:
new_name += item[0].upper() + item[1:]
return new_name
def make_code_from_model(template_path, save_path, model: Type[DeclarativeMeta], chinese_name: str):
"""
UpperCamelCaseName
snake_name
chinese_name
"""
model_define = inspect.getsource(model)
config = get_model_config(model)
model_name = model.__name__
upper_camel_case_name = model_name
snake_name = to_snake(model_name)
add_schema_contents = []
add_optional_schema_contents = []
keys = list(config.cols.keys())
keys = sorted(keys, key=lambda key: model_define.index(key))
for key in keys:
col_config = config.cols[key]
col = getattr(model, key)
add_code_line = ""
col_base_type = col_config.col_base_type
# id先pass
if key == config.id_key:
continue
if col_base_type in ['int', 'float', 'date', 'datetime', 'bool', 'str']:
if col.server_default or col.default or col.nullable:
add_code_line = f"{key}: Optional[{col_base_type}]"
else:
add_code_line = f"{key}: {col_base_type}"
elif col_base_type in ['relation']:
if col_config.relation.relation_type == 'o2o':
add_code_line = f"{key}: {'Optional[Any]'}"
elif col_config.relation.relation_type == 'o2m':
add_code_line = f"{key}: {'Optional[List[Any]]'}"
elif col_config.relation.relation_type == 'm2m':
add_code_line = f"{key}: {'Optional[List[Any]]'}"
elif col_config.relation.relation_type == 'm2o':
add_code_line = f"{key}: {'Optional[Any]'}"
else:
if col_base_type in ["json", 'jsonb']:
add_code_line = f"{key}: {'Optional[Dict[Any,Any]]'} = {'{}'}"
else:
# 其他类型暂时any
add_code_line = f"{key}: {'Any'}"
if add_code_line:
add_schema_contents.append(add_code_line)
add_schema_content = "\n ".join(add_schema_contents)
replace_dic = {
'"""model_define"""': inspect.getsource(model),
'"""add_schema"""': add_schema_content,
'UpperCamelCaseName': upper_camel_case_name,
'snake_name': snake_name,
'chinese_name': chinese_name,
'id_key': config.id_key,
}
code_make(template_path, replace_dic, save_path=f"{save_path}/{snake_name}")
def make_mod_from_models(mod_name: str, save_path: str, model_with_name_list: List[Tuple[Type[DeclarativeMeta], str]]):
mod_path = Path(save_path) / mod_name
mods_path = Path(save_path) / mod_name / f"mods"
if not mods_path.exists():
os.makedirs(mods_path)
crud_template_path = Path(__file__).parent / "templates" / "crud_without_model"
import_codes = []
router_include_codes = []
for item in model_with_name_list:
model = item[0]
name = item[1]
make_code_from_model(crud_template_path, mods_path, model, name)
snake_name = to_snake(model.__name__)
import_code_line = f"from .mods.{snake_name}.router import router as {snake_name}_router"
router_include_line = f"router.include_router({snake_name}_router)"
import_codes.append(import_code_line)
router_include_codes.append(router_include_line)
# make router.py
import_code_content = "\n".join(import_codes)
router_include_code_content = "\n".join(router_include_codes)
router_code = f'from fastapi import APIRouter\n{import_code_content}\n\nrouter = APIRouter(prefix="/{mod_name}")\n{router_include_code_content}'
(mod_path / 'router.py').write_text(router_code, encoding="utf-8")
# make common.py
(mod_path / 'common.py').write_text(
"from context.common import common_db\n\nget_db = common_db.get_db\nBase = common_db.Base", encoding="utf-8")
routes = make_routes(mod_name, model_with_name_list)
(mod_path / 'routes.json').write_text(json.dumps([routes], ensure_ascii=False), encoding='utf-8')
def make_table_model(model: Type[DeclarativeMeta], chinese_name: str):
model_config = get_model_config(model)
config: TableModel = JsDict({})
config.tableName = model.__tablename__
config.name = chinese_name
config.idKey = model_config.id_key
config.columns = []
keys = list(model_config.cols.keys())
model_define = inspect.getsource(model)
keys = sorted(keys, key=lambda key: model_define.index(key))
for key in keys:
col_conf = model_config.cols[key]
col = getattr(model, key)
column: Column = JsDict({})
column.key = key
if col_conf.col_org_type not in ['DeclarativeMeta']:
column.name = col.comment or key
if col.default or col.server_default or (col.primary_key and col.autoincrement):
column.addNeed = False
pass
else:
if not col.nullable:
column.addNeed = True
column.updateNeed = True
else:
column.name = key
column.hidden = True
if col_conf.col_org_type in ['String']:
column.type = 'string'
elif col_conf.col_org_type in ['BigInteger', 'Integer', "SmallInteger"]:
column.type = 'int'
elif col_conf.col_org_type in ["Boolean"]:
column.type = 'bool'
elif col_conf.col_org_type in ["Date"]:
column.type = 'date'
elif col_conf.col_org_type in ["DateTime"]:
column.type = 'datetime'
elif col_conf.col_org_type in ["Double", "Float"]:
column.type = 'float'
elif col_conf.col_org_type in ["UnicodeText", "Text", "Unicode"]:
column.type = 'text'
column.hidden = True
elif col_conf.col_org_type in ["JSON", "JSONB"]:
column.hidden = True
args = model.__annotations__[key].__args__
if args and type(args[0]).__name__ == 'ModelMetaclass':
column.type = 'json'
column.config = {}
column.config.vueJsonSchemaForm = {
"schema": args[0].schema()
}
elif col_conf.col_org_type in ["Enum"]:
column.type = 'enum'
elif col_conf.col_org_type in ['DeclarativeMeta']:
column.hidden = True
if col_conf.relation.relation_type == 'o2o':
column.type = 'json'
if col_conf.relation.relation_type == 'o2m':
column.type = 'set'
if col_conf.relation.relation_type == 'm2m':
column.type = 'set'
if col_conf.relation.relation_type == 'm2o':
column.type = 'json'
config.columns.append(column)
return config
def make_routes(mod_name: str, model_with_name_list: List[Tuple[Type[DeclarativeMeta], str]]):
router = {
"path": f"/{mod_name}",
"name": f"{mod_name}",
"component": "Layout",
"redirect": f"/{mod_name}/{to_snake(model_with_name_list[0][0].__name__)}",
"meta": {
"icon": "calendar",
"title": "城投管理"
},
"children": []}
table_models = []
for item in model_with_name_list:
table_model = make_table_model(item[0], item[1])
table_model.baseUrl = f"/fastapi_crud_template/{mod_name}/{table_model.tableName}"
table_model.name = item[1]
table_models.append(table_model)
route = {
"path": f"/{mod_name}/{table_model.tableName}",
"name": table_model.name,
"component": "tablePlus/index",
"meta": {
"keepAlive": True,
"title": item[1],
"tableModel": table_model
}
}
router['children'].append(route)
return router