wd-rating/mods/user/router.py

123 lines
5.5 KiB
Python

from fastapi import Depends, APIRouter, HTTPException
from sqlalchemy.orm import Session
from context.common import email_verify_code, phone_verify_code, auth_util, AdminUserTokenDataModel
from utils.verify_code_utils import PhoneVerifyType, EmailVerifyType
from .common import get_db
from .mods.user.models import User
from .mods.user import crud as user_crud
from .mods.user.router import router as user_router
from .mods.post.router import router as post_router
from .mods.department.router import router as department_router
from .mods.auth_rule.router import router as auth_rule_router
from . import schemas
from .mods.user.schemas import UserInfo
router = APIRouter(prefix="/user")
router.include_router(user_router)
router.include_router(post_router)
router.include_router(department_router)
router.include_router(auth_rule_router)
@router.post("/get_login_verify_code", tags=["用户登录"], summary="获取登录验证码接口")
def get_login_verify_code(body: schemas.GetLoginVerifyCodeReq, db: Session = Depends(get_db)):
try:
account = body.account
user = db.query(User).filter(User.email == account).first()
if user:
account_type = 'email'
else:
user = db.query(User).filter(User.phone == account).first()
if user:
account_type = "phone"
else:
raise HTTPException(detail="账号未录入系统", status_code=303)
if account_type == 'email':
email_verify_code.send_code(account, EmailVerifyType.login)
if account_type == 'phone':
phone_verify_code.send_code(account, PhoneVerifyType.login)
except HTTPException as e:
raise e
except Exception as e:
print(e)
raise HTTPException(detail="验证码发送失败", status_code=403)
return {"msg": f"验证码已发送至{'邮箱' if account_type == 'email' else '手机'},请查看", "state": 1}
# @router.post('/login_by_verify_code', tags=["用户登录"], summary='验证码登录接口')
# def login_by_verify_code(req: schemas.LoginByVerifyCode,
# db: Session = Depends(get_db)):
# account = req.account
# if phone_verify_code.check_code(account, req.code, PhoneVerifyType.login):
# account_type = 'phone'
# else:
# if email_verify_code.check_code(account, req.code, EmailVerifyType.login):
# account_type = 'email'
# else:
# raise HTTPException(detail="验证码错误", status_code=303)
# user = None
# if account_type == 'phone':
# user = db.query(User).filter(User.phone == account).first()
# if account_type == 'email':
# user = db.query(User).filter(User.email == account).first()
# if not user:
# raise HTTPException(detail="账号未录入系统", status_code=303)
# auth_data = user_crud.get_user_auth(user)
# user_data = user.to_full_dict(relation_use_id=True)
# user_data["auth_data"] = auth_data
# token_data = auth_util.tokenDataModel(**user_data).dict()
# token = auth_util.create_token(token_data)
# return {'token': token, 'user_info': user_data}
# 改为了只允许邮箱登录
@router.post('/login_by_verify_code', tags=["用户登录"], summary='验证码登录接口')
def login_by_verify_code(req: schemas.LoginByVerifyCode,
db: Session = Depends(get_db)):
account = req.account
# if phone_verify_code.check_code(account, req.code, PhoneVerifyType.login):
# account_type = 'phone'
# else:
if email_verify_code.check_code(account, req.code, EmailVerifyType.login):
account_type = 'email'
else:
raise HTTPException(detail="验证码错误", status_code=303)
user = None
# if account_type == 'phone':
# user = db.query(User).filter(User.phone == account).first()
if account_type == 'email':
user = db.query(User).filter(User.email == account).first()
if not user:
raise HTTPException(detail="账号未录入系统", status_code=303)
auth_data = user_crud.get_user_auth(user)
user_data = user.to_full_dict(relation_use_id=True)
user_data["auth_data"] = auth_data
token_data = auth_util.tokenDataModel(**user_data).dict()
token = auth_util.create_token(token_data)
return {'token': token, 'user_info': user_data}
@router.post('/refresh_token', tags=["用户登录"], summary='token刷新接口')
def refresh_token(token: AdminUserTokenDataModel = Depends(auth_util.token_data_depend), db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == token.id).first()
if not user:
raise HTTPException(detail="账号未录入系统", status_code=303)
auth_data = user_crud.get_user_auth(user)
user_data = user.to_full_dict(relation_use_id=True)
user_data["auth_data"] = auth_data
token_data = auth_util.tokenDataModel(**user_data).dict()
token = auth_util.create_token(token_data)
return {'token': token, 'user_info': user_data}
@router.post('/get_user_info', tags=["用户登录"], summary='用户信息获取接口', response_model=UserInfo)
def get_user_info(token: AdminUserTokenDataModel = Depends(auth_util.token_data_depend), db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == token.id).first()
if not user:
raise HTTPException(detail="账号未录入系统", status_code=303)
auth_data = user_crud.get_user_auth(user)
user_data = user.to_full_dict(relation_use_id=True)
user_data["auth_data"] = auth_data
return user_data