user-wsc/AppUser/Router/UserRouter.py

135 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
from typing import List
from sqlalchemy.orm import Session
from werkzeug.security import check_password_hash
from fastapi import APIRouter, Depends, HTTPException, Header
from AppUser.Crud import UserCrud, VerifyCodeCrud, RoleCrud
from AppUser.Model import UserModel
from AppUser.Schemas import UserSchemas
from Utils.DataBase.SqlAlchemyUtils import engine, get_db
from Utils.Authentication import Config, TokenUtil
from Utils.EncryptProcess import EncyptUtil
from Utils.MessageUtils import EmailHelper
from Utils.UniqueCoder import RandomNumUtils
UserModel.Base.metadata.create_all(bind=engine)
router = APIRouter(
tags=["用户操作"],
prefix="/api/user"
)
@router.post("/login", summary="登录", response_model=UserSchemas.LoginResBody)
def func01(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)):
# 验证码校验
verify_code_item = VerifyCodeCrud.get_verify_code_by_email(db=db, email=body.email)
if not verify_code_item:
raise HTTPException(status_code=202, detail="VerifyCode Incorrect")
if dict(verify_code_item).get("verify_code") != body.verify_code:
raise HTTPException(status_code=202, detail="VerifyCode Incorrect")
if (dict(verify_code_item).get("expire_time") - datetime.datetime.now()).seconds >= 300:
raise HTTPException(status_code=202, detail="Code Has Expired")
# 邮箱&密码&状态校验
user_info_item = UserCrud.get_user_by_email(db=db, email=body.email)
if not user_info_item:
raise HTTPException(status_code=404, detail="User Not Found")
if not check_password_hash(user_info_item.passwd, EncyptUtil.decrypt_data(encrypt_msg=body.passwd)):
raise HTTPException(status_code=202, detail="Password Incorrect")
if not user_info_item.is_active:
raise HTTPException(status_code=202, detail="User Inactive")
# 清除验证码
VerifyCodeCrud.delete_verify_code_by_email(db, body.email)
user_info = UserSchemas.TokenBody(**user_info_item.to_dict()).dict()
# 生成&加密token信息
access_token = TokenUtil.create_token(
key="user_info",
data=user_info,
expires_delta=datetime.timedelta(days=Config.TOKEN_EXPIRE_DAYS)
)
user_info.update({"token": access_token})
return user_info
@router.post("/login_email", summary="登录邮件")
def func02(email: UserSchemas.EMailRegex, db: Session = Depends(get_db)):
# 检查邮箱&用户状态
user_info_item = UserCrud.get_user_by_email(db=db, email=email)
if not user_info_item:
raise HTTPException(status_code=404, detail="User Not Found")
if not user_info_item.is_active:
raise HTTPException(status_code=202, detail="User Inactive")
# 生成&保存随机6位验证码
verify_code = RandomNumUtils.get_random_num_code(length=6)
VerifyCodeCrud.set_verify_code(db=db, email=email, verify_code=verify_code)
# 发送验证码邮件
title = "【远东资信】邮箱登录验证码"
msg_body = "您的验证码为 【{}5分钟内有效。".format(verify_code)
is_sent_email = EmailHelper.send_email(title=title, email=email, msg_body=msg_body)
if not is_sent_email:
raise HTTPException(status_code=400, detail="Email Not Send")
return {"info": "Email Has Sent"}
@router.get("/detail", summary="用户信息", response_model=UserSchemas.DetailResBody)
def func03(token: str = Header(None), db: Session = Depends(get_db)):
# 校验token
decoded_token = TokenUtil.decode_token(token)
if decoded_token == "Expired":
raise HTTPException(status_code=201, detail="Token Has Expired")
if decoded_token == "Invalid":
raise HTTPException(status_code=401, detail="Invalid Token")
# 校验用户信息
user_info = decoded_token.get("user_info")
if not user_info:
raise HTTPException(status_code=401, detail="Invalid Token")
uid = user_info.get("uid")
if not uid:
raise HTTPException(status_code=401, detail="Invalid Token")
# 查询用户信息
user_obj = UserCrud.get_user_by_id(db=db, uid=uid)
if user_obj is None:
raise HTTPException(status_code=404, detail="User Not found")
return user_obj.to_dict()
@router.get("/menu", summary="用户菜单", response_model=List[str])
def func04(token: str = Header(None), db: Session = Depends(get_db)):
# 校验token
decoded_token = TokenUtil.decode_token(token)
if decoded_token == "Expired":
raise HTTPException(status_code=201, detail="Token Has Expired")
if decoded_token == "Invalid":
raise HTTPException(status_code=401, detail="Invalid Token")
# 校验用户信息
user_info = decoded_token.get("user_info")
if not user_info:
raise HTTPException(status_code=401, detail="Invalid Token")
role_id = user_info.get("role_id")
if not role_id:
raise HTTPException(status_code=401, detail="Invalid Token")
# 查询用户菜单
role_menu_list = RoleCrud.get_role_menu(db=db, role_id=role_id)
return [sub.subject for sub in role_menu_list]