import base64 from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from Context.common import ctx from Crud.UserCrud import get_user_by_email, create_user, rest_user_password from Crud import AppCrud from CrudModel.AllowedEmail import allowed_email_crud, AllowedEmailQuery from Schemas import UserSchemas from Schemas.ApiCommonSchemas import CommonRes from Utils.AuthUtils import Token, token_data_depend, create_token from Utils.EncyptUtil import get_encrypt_password from Utils.SqlAlchemyUtils import get_db from Utils.VerifyCodeUtils import EmailVerifyCode, ImageCaptchaVerify, EmailVerifyType router = APIRouter( tags=["用户注册操作"], prefix="/api/user_mod/user_register", ) @router.get("/get_image_captcha", summary="获取图片验证码", response_model=UserSchemas.ImageCaptchaRes) def get_image_captcha(): # 是否加上时间对ip的时间验证 captcha_id, image_bytes, captcha_code = ImageCaptchaVerify.make_captcha_image() img_data = "data:image/png;base64," + base64.b64encode(image_bytes).decode() # 注意,测试时才传入captcha_code if ctx.env == 'test': return UserSchemas.ImageCaptchaRes(captcha_id=captcha_id, img_data=img_data, captcha_code=captcha_code) else: return UserSchemas.ImageCaptchaRes(captcha_id=captcha_id, img_data=img_data) # return UserSchemas.ImageCaptchaRes(captcha_id=captcha_id, img_data=img_data) @router.post("/get_register_email_verify_code", summary="获取邮箱验证码", response_model=CommonRes) def get_register_email_verify_code(body: UserSchemas.EmailSendReqBody): email = body.email EmailVerifyCode.send_register_code(email) return {"msg": "验证码已发送至邮箱,请查看", "state": 1} @router.post("/login", summary="登录", response_model=UserSchemas.LoginResBody) def login(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)): # 图片验证码校验 check_res = ImageCaptchaVerify.check_code(body.captcha_id, body.captcha_code) if not check_res: raise HTTPException(status_code=202, detail="验证码过期或错误") # 密码验证 # 是否在前端传输时进行一次base64处理 user_obj = get_user_by_email(db, body.email) if user_obj is None: raise HTTPException(status_code=404, detail="用户未注册") encrypt_password = get_encrypt_password(body.password) if encrypt_password != user_obj.password: raise HTTPException(status_code=202, detail="密码错误") if user_obj.disable: raise HTTPException(status_code=202, detail="该账户被禁用") user_info = user_obj.as_info() # 生成&加密token信息 token = create_token({"uid": user_obj.id, "role": user_obj.role, "name": user_obj.name, 'post': user_obj.post, "auth_data": user_obj.auth_data}) return UserSchemas.LoginResBody(**user_info, access_token=token, token_type='bearer') @router.post("/login_to_app", summary="授权应用登录", response_model=UserSchemas.LoginResBody) def login(body: UserSchemas.LoginReqBody, db: Session = Depends(get_db)): if not body.app_id: raise HTTPException(status_code=202, detail="未传入应用ID") app = AppCrud.get_app_by_id(db, body.app_id) if not app: raise HTTPException(status_code=202, detail="应用信息错误") # 图片验证码校验 check_res = ImageCaptchaVerify.check_code(body.captcha_id, body.captcha_code) if not check_res: raise HTTPException(status_code=202, detail="验证码过期或错误") # 密码验证 # 是否在前端传输时进行一次base64处理 user_obj = get_user_by_email(db, body.email) if user_obj is None: raise HTTPException(status_code=404, detail="用户未注册") encrypt_password = get_encrypt_password(body.password) if encrypt_password != user_obj.password: raise HTTPException(status_code=202, detail="密码错误") if user_obj.disable: raise HTTPException(status_code=202, detail="该账户被禁用") user_info = user_obj.as_info() # 生成&加密token信息 token = create_token({"uid": user_obj.id, "role": user_obj.role, "name": user_obj.name, 'post': user_obj.post, "auth_data": user_obj.auth_data}, app.token_key) return UserSchemas.LoginResBody(**user_info, access_token=token, token_type='bearer', app_id=app.id) @router.post("/register", summary="注册", response_model=CommonRes) def register(body: UserSchemas.RegisterReqBody, db: Session = Depends(get_db)): # 用户是否存在 user_obj = get_user_by_email(db, body.email) if user_obj is not None: raise HTTPException(status_code=202, detail="该邮箱已注册") # 判断邮箱是否合规 count, item_list = allowed_email_crud.query(db, AllowedEmailQuery()) email_checked = False for item in item_list: if body.email.endswith(item.email): email_checked = True break if not email_checked: raise HTTPException(status_code=202, detail="该域名邮箱不允许注册") # 邮箱验证码校验 check_res = EmailVerifyCode.check_code(body.email, body.email_verify_code) if not check_res: raise HTTPException(status_code=202, detail="验证码过期或错误") create_user(db, body.email, body.password) return {"state": 1, "msg": "注册成功"} @router.post("/rest_password", summary="密码重置", response_model=CommonRes) def rest_password(body: UserSchemas.RestPasswordReqBody, db: Session = Depends(get_db)): # 用户是否存在 user_obj = get_user_by_email(db, body.email) if user_obj is None: raise HTTPException(status_code=202, detail="该邮箱未注册") # 邮箱验证码校验 check_res = EmailVerifyCode.check_code(body.email, body.email_verify_code, EmailVerifyType.reset_password) if not check_res: raise HTTPException(status_code=202, detail="验证码过期或错误") rest_user_password(db, body.email, body.password) return {"state": 1, "msg": "密码修改成功"} @router.post("/get_rest_password_email_verify_code", summary="获取邮箱验证码", response_model=CommonRes) def get_rest_password_email_verify_code(body: UserSchemas.EmailSendReqBody): EmailVerifyCode.send_rest_code(body.email) return {"msg": "验证码已发送至邮箱,请查看", "state": 1}