usermod/Router/UserRegisterRouter.py

140 lines
6.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from Context.common import ctx
from Crud.UserCrud import get_user_by_email, create_user, rest_user_password
from Crud import AppCrud
from CrudModel.AllowedEmail import allowed_email_crud, AllowedEmailQuery
from Schemas import UserSchemas
from Schemas.ApiCommonSchemas import CommonRes
from Utils.AuthUtils import Token, token_data_depend, create_token
from Utils.EncyptUtil import get_encrypt_password
from Utils.SqlAlchemyUtils import get_db
from Utils.VerifyCodeUtils import EmailVerifyCode, ImageCaptchaVerify, EmailVerifyType
router = APIRouter(
tags=["用户注册操作"],
prefix="/api/user_mod/user_register",
)
@router.get("/get_image_captcha", summary="获取图片验证码", response_model=UserSchemas.ImageCaptchaRes)
def get_image_captcha():
# 是否加上时间对ip的时间验证
captcha_id, image_bytes, captcha_code = ImageCaptchaVerify.make_captcha_image()
img_data = "data:image/png;base64," + base64.b64encode(image_bytes).decode()
# 注意测试时才传入captcha_code
if ctx.env == 'test':
return UserSchemas.ImageCaptchaRes(captcha_id=captcha_id, img_data=img_data, captcha_code=captcha_code)
else:
return UserSchemas.ImageCaptchaRes(captcha_id=captcha_id, img_data=img_data)
# return UserSchemas.ImageCaptchaRes(captcha_id=captcha_id, img_data=img_data)
@router.post("/get_register_email_verify_code", summary="获取邮箱验证码", response_model=CommonRes)
def get_register_email_verify_code(body: UserSchemas.EmailSendReqBody):
email = body.email
EmailVerifyCode.send_register_code(email)
return {"msg": "验证码已发送至邮箱,请查看", "state": 1}
@router.post("/login", summary="登录", response_model=UserSchemas.LoginResBody)
def login(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)):
# 图片验证码校验
check_res = ImageCaptchaVerify.check_code(body.captcha_id, body.captcha_code)
if not check_res:
raise HTTPException(status_code=202, detail="验证码过期或错误")
# 密码验证
# 是否在前端传输时进行一次base64处理
user_obj = get_user_by_email(db, body.email)
if user_obj is None:
raise HTTPException(status_code=404, detail="用户未注册")
encrypt_password = get_encrypt_password(body.password)
if encrypt_password != user_obj.password:
raise HTTPException(status_code=202, detail="密码错误")
if user_obj.disable:
raise HTTPException(status_code=202, detail="该账户被禁用")
user_info = user_obj.as_info()
# 生成&加密token信息
token = create_token({"uid": user_obj.id, "role": user_obj.role,
"name": user_obj.name, 'post': user_obj.post, "auth_data": user_obj.auth_data})
return UserSchemas.LoginResBody(**user_info, access_token=token, token_type='bearer')
@router.post("/login_to_app", summary="授权应用登录", response_model=UserSchemas.LoginResBody)
def login(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)):
if not body.app_id:
raise HTTPException(status_code=202, detail="未传入应用ID")
app = AppCrud.get_app_by_id(db, body.app_id)
if not app:
raise HTTPException(status_code=202, detail="应用信息错误")
# 图片验证码校验
check_res = ImageCaptchaVerify.check_code(body.captcha_id, body.captcha_code)
if not check_res:
raise HTTPException(status_code=202, detail="验证码过期或错误")
# 密码验证
# 是否在前端传输时进行一次base64处理
user_obj = get_user_by_email(db, body.email)
if user_obj is None:
raise HTTPException(status_code=404, detail="用户未注册")
encrypt_password = get_encrypt_password(body.password)
if encrypt_password != user_obj.password:
raise HTTPException(status_code=202, detail="密码错误")
if user_obj.disable:
raise HTTPException(status_code=202, detail="该账户被禁用")
user_info = user_obj.as_info()
# 生成&加密token信息
token = create_token({"uid": user_obj.id, "role": user_obj.role,
"name": user_obj.name, 'post': user_obj.post, "auth_data": user_obj.auth_data}, app.token_key)
return UserSchemas.LoginResBody(**user_info, access_token=token, token_type='bearer', app_id=app.id)
@router.post("/register", summary="注册", response_model=CommonRes)
def register(body: UserSchemas.RegisterReqBody, db: Session = Depends(get_db)):
# 用户是否存在
user_obj = get_user_by_email(db, body.email)
if user_obj is not None:
raise HTTPException(status_code=202, detail="该邮箱已注册")
# 判断邮箱是否合规
count, item_list = allowed_email_crud.query(db, AllowedEmailQuery())
email_checked = False
for item in item_list:
if body.email.endswith(item.email):
email_checked = True
break
if not email_checked:
raise HTTPException(status_code=202, detail="该域名邮箱不允许注册")
# 邮箱验证码校验
check_res = EmailVerifyCode.check_code(body.email, body.email_verify_code)
if not check_res:
raise HTTPException(status_code=202, detail="验证码过期或错误")
create_user(db, body.email, body.password)
return {"state": 1, "msg": "注册成功"}
@router.post("/rest_password", summary="密码重置", response_model=CommonRes)
def rest_password(body: UserSchemas.RestPasswordReqBody, db: Session = Depends(get_db)):
# 用户是否存在
user_obj = get_user_by_email(db, body.email)
if user_obj is None:
raise HTTPException(status_code=202, detail="该邮箱未注册")
# 邮箱验证码校验
check_res = EmailVerifyCode.check_code(body.email, body.email_verify_code, EmailVerifyType.reset_password)
if not check_res:
raise HTTPException(status_code=202, detail="验证码过期或错误")
rest_user_password(db, body.email, body.password)
return {"state": 1, "msg": "密码修改成功"}
@router.post("/get_rest_password_email_verify_code", summary="获取邮箱验证码", response_model=CommonRes)
def get_rest_password_email_verify_code(body: UserSchemas.EmailSendReqBody):
EmailVerifyCode.send_rest_code(body.email)
return {"msg": "验证码已发送至邮箱,请查看", "state": 1}