import datetime from typing import List from sqlalchemy.orm import Session from werkzeug.security import check_password_hash from fastapi import APIRouter, Depends, HTTPException, Header from AppUser.Crud import UserCrud, VerifyCodeCrud, RoleCrud from AppUser.Model import UserModel from AppUser.Schemas import UserSchemas from Utils.DataBase.SqlAlchemyUtils import engine, get_db from Utils.Authentication import Config, TokenUtil from Utils.EncryptProcess import EncyptUtil from Utils.MessageUtils import EmailHelper from Utils.UniqueCoder import RandomNumUtils UserModel.Base.metadata.create_all(bind=engine) router = APIRouter( tags=["用户操作"], prefix="/api/user" ) @router.post("/login", summary="登录", response_model=UserSchemas.LoginResBody) def func01(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)): # 验证码校验 verify_code_item = VerifyCodeCrud.get_verify_code_by_email(db=db, email=body.email) if not verify_code_item: raise HTTPException(status_code=202, detail="VerifyCode Incorrect") if dict(verify_code_item).get("verify_code") != body.verify_code: raise HTTPException(status_code=202, detail="VerifyCode Incorrect") if (dict(verify_code_item).get("expire_time") - datetime.datetime.now()).seconds >= 300: raise HTTPException(status_code=202, detail="Code Has Expired") # 邮箱&密码&状态校验 user_info_item = UserCrud.get_user_by_email(db=db, email=body.email) if not user_info_item: raise HTTPException(status_code=404, detail="User Not Found") if not check_password_hash(user_info_item.passwd, EncyptUtil.decrypt_data(encrypt_msg=body.passwd)): raise HTTPException(status_code=202, detail="Password Incorrect") if not user_info_item.is_active: raise HTTPException(status_code=202, detail="User Inactive") # 清除验证码 VerifyCodeCrud.delete_verify_code_by_email(db, body.email) user_info = UserSchemas.TokenBody(**user_info_item.to_dict()).dict() # 生成&加密token信息 access_token = TokenUtil.create_token( key="user_info", data=user_info, expires_delta=datetime.timedelta(days=Config.TOKEN_EXPIRE_DAYS) ) user_info.update({"token": access_token}) return user_info @router.post("/login_email", summary="登录邮件") def func02(email: UserSchemas.EMailRegex, db: Session = Depends(get_db)): # 检查邮箱&用户状态 user_info_item = UserCrud.get_user_by_email(db=db, email=email) if not user_info_item: raise HTTPException(status_code=404, detail="User Not Found") if not user_info_item.is_active: raise HTTPException(status_code=202, detail="User Inactive") # 生成&保存随机6位验证码 verify_code = RandomNumUtils.get_random_num_code(length=6) VerifyCodeCrud.set_verify_code(db=db, email=email, verify_code=verify_code) # 发送验证码邮件 title = "【远东资信】邮箱登录验证码" msg_body = "您的验证码为 【{}】,5分钟内有效。".format(verify_code) is_sent_email = EmailHelper.send_email(title=title, email=email, msg_body=msg_body) if not is_sent_email: raise HTTPException(status_code=400, detail="Email Not Send") return {"info": "Email Has Sent"} @router.get("/detail", summary="用户信息", response_model=UserSchemas.DetailResBody) def func03(token: str = Header(None), db: Session = Depends(get_db)): # 校验token decoded_token = TokenUtil.decode_token(token) if decoded_token == "Expired": raise HTTPException(status_code=201, detail="Token Has Expired") if decoded_token == "Invalid": raise HTTPException(status_code=401, detail="Invalid Token") # 校验用户信息 user_info = decoded_token.get("user_info") if not user_info: raise HTTPException(status_code=401, detail="Invalid Token") uid = user_info.get("uid") if not uid: raise HTTPException(status_code=401, detail="Invalid Token") # 查询用户信息 user_obj = UserCrud.get_user_by_id(db=db, uid=uid) if user_obj is None: raise HTTPException(status_code=404, detail="User Not found") return user_obj.to_dict() @router.get("/menu", summary="用户菜单", response_model=List[str]) def func04(token: str = Header(None), db: Session = Depends(get_db)): # 校验token decoded_token = TokenUtil.decode_token(token) if decoded_token == "Expired": raise HTTPException(status_code=201, detail="Token Has Expired") if decoded_token == "Invalid": raise HTTPException(status_code=401, detail="Invalid Token") # 校验用户信息 user_info = decoded_token.get("user_info") if not user_info: raise HTTPException(status_code=401, detail="Invalid Token") role_id = user_info.get("role_id") if not role_id: raise HTTPException(status_code=401, detail="Invalid Token") # 查询用户菜单 role_menu_list = RoleCrud.get_role_menu(db=db, role_id=role_id) return [sub.subject for sub in role_menu_list]