145 lines
5.3 KiB
Python
145 lines
5.3 KiB
Python
import datetime
|
||
|
||
from sqlalchemy.orm import Session
|
||
from werkzeug.security import check_password_hash
|
||
from fastapi import APIRouter, Depends, HTTPException, Header
|
||
|
||
|
||
from AppUser.Crud import UserCrud, VerifyCodeCrud
|
||
from AppUser.Model import UserModel
|
||
from AppUser.Schemas import UserSchemas
|
||
|
||
from Utils.DataBase.SqlAlchemyUtils import engine, get_db
|
||
from Utils.Authentication import Config, TokenUtil
|
||
from Utils.EncryptProcess import EncyptUtil
|
||
from Utils.MessageUtils import EmailHelper
|
||
from Utils.UniqueCoder import RandomNumUtils
|
||
|
||
UserModel.Base.metadata.create_all(bind=engine)
|
||
|
||
router = APIRouter(
|
||
tags=["用户操作"],
|
||
prefix="/api/user"
|
||
)
|
||
|
||
|
||
@router.post("/login", summary="登录")
|
||
def func01(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)):
|
||
|
||
# 验证码校验
|
||
verify_code_item = VerifyCodeCrud.get_verify_code_by_email(db=db, email=body.email)
|
||
if not verify_code_item:
|
||
raise HTTPException(status_code=202, detail="VerifyCode Incorrect")
|
||
if dict(verify_code_item).get("verify_code") != body.verify_code:
|
||
raise HTTPException(status_code=202, detail="VerifyCode Incorrect")
|
||
if (dict(verify_code_item).get("expire_time") - datetime.datetime.now()).seconds >= 300:
|
||
raise HTTPException(status_code=202, detail="Code Has Expired")
|
||
VerifyCodeCrud.delete_verify_code_by_email(db, body.email)
|
||
|
||
# 邮箱&密码&状态校验
|
||
user_info_item = UserCrud.get_user_by_email(db=db, email=body.email)
|
||
if not user_info_item:
|
||
raise HTTPException(status_code=202, detail="User Not Existed")
|
||
if not check_password_hash(user_info_item.passwd, EncyptUtil.decrypt_data(encrypt_msg=body.passwd)):
|
||
raise HTTPException(status_code=202, detail="Password Incorrect")
|
||
if not user_info_item.is_active:
|
||
raise HTTPException(status_code=202, detail="User Inactive")
|
||
|
||
# 生成&加密token信息
|
||
user_info = {
|
||
"uid": user_info_item.id,
|
||
"email": user_info_item.email,
|
||
"name": user_info_item.name,
|
||
"role": user_info_item.role.__str__(),
|
||
"department": user_info_item.department.__str__()
|
||
}
|
||
access_token = TokenUtil.create_token(
|
||
key="user_info",
|
||
data=user_info,
|
||
expires_delta=datetime.timedelta(minutes=Config.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
)
|
||
refresh_token = TokenUtil.create_token(
|
||
key="user_info",
|
||
data={"uid": user_info_item.id},
|
||
expires_delta=datetime.timedelta(days=Config.REFRESH_TOKEN_EXPIRE_DAYS)
|
||
)
|
||
user_info.update({"token": access_token})
|
||
user_info.update({"refresh_token": refresh_token})
|
||
|
||
return user_info
|
||
|
||
|
||
@router.post("/login_email", summary="登录邮件")
|
||
def func02(email: UserSchemas.EMailRegex, db: Session = Depends(get_db)):
|
||
|
||
# 检查邮箱&用户状态
|
||
user_info_item = UserCrud.get_user_by_email(db=db, email=email)
|
||
if not user_info_item:
|
||
raise HTTPException(status_code=202, detail="User Not Existed")
|
||
if not user_info_item.is_active:
|
||
raise HTTPException(status_code=202, detail="User Inactive")
|
||
|
||
# 生成&保存随机6位验证码
|
||
verify_code = RandomNumUtils.get_random_num_code(length=6)
|
||
VerifyCodeCrud.set_verify_code(db=db, email=email, verify_code=verify_code)
|
||
|
||
# 发送验证码邮件
|
||
title = "【远东资信】邮箱登录验证码"
|
||
msg_body = "您的验证码为 【{}】,5分钟内有效。".format(verify_code)
|
||
is_sent_email = EmailHelper.send_email(title=title, email=email, msg_body=msg_body)
|
||
if not is_sent_email:
|
||
raise HTTPException(status_code=400, detail="Email Not Send")
|
||
|
||
return {"info": "Email Has Sent"}
|
||
|
||
|
||
@router.get("/detail", summary="用户信息", response_model=UserSchemas.DetailResBody)
|
||
def func03(token: str = Header(...), db: Session = Depends(get_db)):
|
||
|
||
# 校验token
|
||
decoded_token = TokenUtil.decode_token(token)
|
||
user_info = decoded_token.get("user_info")
|
||
if not user_info:
|
||
raise HTTPException(status_code=401, detail="Invalid Token")
|
||
uid = user_info.get("uid")
|
||
if not uid:
|
||
raise HTTPException(status_code=401, detail="Invalid Token")
|
||
|
||
# 查询用户信息
|
||
user_obj = UserCrud.get_user_by_id(db=db, uid=uid)
|
||
if user_obj is None:
|
||
raise HTTPException(status_code=404, detail="User Not found")
|
||
|
||
return user_obj.to_dict()
|
||
|
||
|
||
@router.get("/refresh_token", summary="更新token")
|
||
def func04(refresh_token: str = Header(...), db: Session = Depends(get_db)):
|
||
|
||
# 校验token
|
||
decoded_token = TokenUtil.decode_token(refresh_token)
|
||
user_info = decoded_token.get("user_info")
|
||
if not user_info:
|
||
raise HTTPException(status_code=401, detail="Invalid Token")
|
||
uid = user_info.get("uid")
|
||
if not uid:
|
||
raise HTTPException(status_code=401, detail="Invalid Token")
|
||
|
||
# 查询用户信息
|
||
user_info_item = UserCrud.get_user_by_id(db=db, uid=uid)
|
||
if not user_info_item:
|
||
raise HTTPException(status_code=404, detail="User Not Found")
|
||
|
||
# 生成新token
|
||
user_info = {
|
||
"uid": user_info_item.id,
|
||
"email": user_info_item.email,
|
||
"name": user_info_item.name,
|
||
"role": user_info_item.role.__str__(),
|
||
"department": user_info_item.department.__str__()
|
||
}
|
||
access_token_expires = datetime.timedelta(minutes=Config.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
access_token = TokenUtil.create_token(key="user_info", data=user_info, expires_delta=access_token_expires)
|
||
|
||
return {"token": access_token}
|