增加to_safe_dict 模型转字典时不转密码

This commit is contained in:
wcq 2023-06-13 11:00:25 +08:00
parent 8f36b4d95c
commit 2a0a4e4e45
8 changed files with 29 additions and 18 deletions

View File

@ -11,6 +11,7 @@ from Utils.SqlAlchemyUtils import SqlalchemyConnect
from Utils.VerifyCodeUtils import EmailCodeVerify, ImageCaptchaVerify, PhoneVerifyCode
from pydantic import BaseModel
from typing import Optional
from pathlib import Path
# 权限验证工具类
@ -19,7 +20,7 @@ uvicorn_log = getLogger('uvicorn')
# 配置数据
conf = ConfigParser()
conf.read("Config/common.ini", encoding='utf-8-sig')
conf.read(Path(__file__).parent.parent / 'Config' / 'common.ini', encoding='utf-8-sig')
class CompanyUserTokenDataModel(BaseModel):

View File

@ -21,3 +21,7 @@ class AdminUser(common_db.Base):
def to_dict(self):
data = {c.name: getattr(self, c.name) for c in self.__table__.columns}
return data
def to_safe_dict(self):
data = {c.name: getattr(self, c.name) for c in self.__table__.columns if c.name != 'passwd'}
return data

View File

@ -1,19 +1,12 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from Context.common import common_db, conf, email_code_verify, admin_user_auth_util, tianyancha_api, \
AdminUserTokenDataModel, AdminUserTokenDataModel, admin_user_auth_util
from Context.common import common_db, email_code_verify, AdminUserTokenDataModel, \
admin_user_auth_util
from Utils.PasswdUtils import get_encrypt_password, decode_b64_pwd
from Utils.SqlAlchemyUtils import QueryParams, query_common
from Utils.ValidateUtil import Validate
from Utils.VerifyCodeUtils import EmailVerifyType
from . import Schemas
from . import Crud
from .Models import AdminUser
# from ..UploadFile import Crud as UploadFileCrud
# from Utils.OcrUtils import ocr_business_license
router = APIRouter(tags=["管理员用户"], prefix="/admin_user")
@ -66,7 +59,7 @@ def func(body: Schemas.GetEmailVerifyCode):
def func(token_data: AdminUserTokenDataModel = Depends(admin_user_auth_util.token_data_depend),
db: Session = Depends(common_db.get_db)):
user = Crud.admin_user_get(db, token_data.id)
user_data = user.to_dict()
user_data = user.to_safe_dict()
if not user:
raise HTTPException(detail="无用户", status_code=403)
return Schemas.GetUserInfoRes(**user_data)

View File

@ -23,3 +23,7 @@ class CompanyUser(common_db.Base):
def to_dict(self):
data = {c.name: getattr(self, c.name) for c in self.__table__.columns}
return data
def to_safe_dict(self):
data = {c.name: getattr(self, c.name) for c in self.__table__.columns if c.name != 'passwd'}
return data

View File

@ -105,7 +105,7 @@ def func(req: Schemas.CompanyUserRegisterReq, db: Session = Depends(common_db.ge
def func(token_data: CompanyUserTokenDataModel = Depends(company_user_auth_util.token_data_depend),
db: Session = Depends(common_db.get_db)):
user = Crud.company_user_get(db, token_data.id)
user_data = user.to_dict()
user_data = user.to_safe_dict()
if not user:
raise HTTPException(detail="无用户", status_code=403)
return Schemas.GetUserInfoRes(**user_data)

View File

@ -22,3 +22,7 @@ class InstitutionUser(common_db.Base):
def to_dict(self):
data = {c.name: getattr(self, c.name) for c in self.__table__.columns}
return data
def to_safe_dict(self):
data = {c.name: getattr(self, c.name) for c in self.__table__.columns if c.name != 'passwd'}
return data

View File

@ -79,7 +79,7 @@ def func(body: Schemas.GetEmailVerifyCode):
def func(token_data: InstitutionUserTokenDataModel = Depends(institution_user_auth_util.token_data_depend),
db: Session = Depends(common_db.get_db)):
user = Crud.institution_user_get(db, token_data.id)
user_data = user.to_dict()
user_data = user.to_safe_dict()
if not user:
raise HTTPException(detail="无用户", status_code=403)
return Schemas.GetUserInfoRes(**user_data)
@ -88,8 +88,9 @@ def func(token_data: InstitutionUserTokenDataModel = Depends(institution_user_au
@router.post("/add", summary="添加协会用户", response_model=Schemas.InstitutionUserAddRes)
def func(req: Schemas.InstitutionUserAddReq, db: Session = Depends(common_db.get_db),
token_data: AdminUserTokenDataModel = Depends(admin_user_auth_util.token_data_depend)):
req.passwd = get_encrypt_password(req.passwd)
item = Crud.institution_user_add(db, req)
return Schemas.InstitutionUserAddRes(**item.to_dict())
return Schemas.InstitutionUserAddRes(**item.to_safe_dict())
@router.post("/delete", summary="删除协会用户")
@ -102,8 +103,10 @@ def func(req: Schemas.InstitutionUserDeleteReq, db: Session = Depends(common_db.
@router.post("/update", summary="更新协会用户", response_model=Schemas.InstitutionUserUpdateRes)
def func(req: Schemas.InstitutionUserUpdateReq, db: Session = Depends(common_db.get_db),
token_data: AdminUserTokenDataModel = Depends(admin_user_auth_util.token_data_depend)):
if req.passwd:
req.passwd = get_encrypt_password(req.passwd)
item = Crud.institution_user_update(db, req)
return Schemas.InstitutionUserUpdateRes(**item.to_dict())
return Schemas.InstitutionUserUpdateRes(**item.to_safe_dict())
@router.post("/get", summary="获取协会用户", response_model=Schemas.InstitutionUserGetRes)
@ -112,7 +115,7 @@ def func(req: Schemas.InstitutionUserGetReq, db: Session = Depends(common_db.get
item = Crud.institution_user_get(db, req.id)
if not item:
raise HTTPException(detail="未查询到信息", status_code=404)
return Schemas.InstitutionUserGetRes(**item.to_dict())
return Schemas.InstitutionUserGetRes(**item.to_safe_dict())
@router.post("/query", summary="查询协会用户", response_model=Schemas.InstitutionUserQueryRes)
@ -129,7 +132,7 @@ def func(req: QueryParams, db: Session = Depends(common_db.get_db),
token_data: AdminUserTokenDataModel = Depends(
admin_user_auth_util.token_data_depend)):
count, query = query_common(db, InstitutionUser, req)
items = [Schemas.InstitutionUserInfo(**item.to_dict()) for item in query]
items = [Schemas.InstitutionUserInfo(**item.to_safe_dict()) for item in query]
return Schemas.InstitutionUserQueryRes(count=count, items=items)
#########

View File

@ -2,12 +2,14 @@ import unittest
from fastapi.testclient import TestClient
from main import app
admin_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpZCI6IjEiLCJlbWFpbCI6Ind1Y2h1bnF1YW5AZmVjci5jb20uY24iLCJwaG9uZSI6IjE4MDkwNDc4MTIzIiwiZXhwIjoxNjg2ODgyMjE3fQ.pA-3SqIgxr0mFxP_luXgs9FnhQo0jsGOyUTtaPclRf0"
client = TestClient(app)
class UserTestCase(unittest.TestCase):
def test_something(self):
self.assertEqual(True, False) # add assertion here
res = client.post('/rate_sys/user/admin_user/get_user_info', headers={"Authorization": "bearer " + admin_token})
print('/rate_sys/user/admin_user/get_user_info', res.json())
if __name__ == '__main__':