import uuid from enum import Enum from captcha.image import ImageCaptcha from fastapi import HTTPException from Utils.EmailUtils import send_email from Utils.PhoneMsgUtils import send_phone_code from Utils.RandomUtils import get_random_num_code, get_random_letter_and_num_code from Utils.RedisUtils import redis_pool class EmailVerifyType(Enum): login = 1 register = 2 reset_password = 3 change = 4 class PhoneVerifyType(Enum): login = 1 register = 2 reset_password = 3 change = 4 class ImageCaptchaVerify: @classmethod def make_captcha_image(cls, code=None, expire_time_s: int = 120): captcha_id = str(uuid.uuid4()) image = ImageCaptcha() if not code: code = get_random_letter_and_num_code(4) image_bytes = image.generate(code).getvalue() redis_pool.conn.set(captcha_id, code, expire_time_s) return captcha_id, image_bytes, code @classmethod def check_code(cls, captcha_id, code: str) -> bytes: captcha_code = redis_pool.conn.get(captcha_id) redis_pool.conn.delete(captcha_id) if captcha_code and captcha_code.lower() != code.lower(): return False return True class PhoneVerifyCode: @classmethod def get_phone_id(cls, phone, verify_type: PhoneVerifyType = PhoneVerifyType.login): """ :param phone: :param verify_type: 不同业务验证类型对应不同的id :return:str """ return f"{phone}_EmailVerifyCodeId_{verify_type.name}" @classmethod def make_code(cls, phone, expire_time_s=60 * 5, verify_type: PhoneVerifyType = PhoneVerifyType.login): client = redis_pool.get_redis_client() phone_id = cls.get_phone_id(phone, verify_type) code = get_random_num_code(4) client.set(phone_id, code) client.set(phone_id + '_sign', code) client.expire(phone_id + '_sign', 60) if expire_time_s: client.expire(phone_id, expire_time_s) return code @classmethod def send_code(cls, phone, verify_type: PhoneVerifyType): phone_id = cls.get_phone_id(phone, verify_type=verify_type) client = redis_pool.get_redis_client() sended = client.get(phone_id+ '_sign') if sended: raise HTTPException(status_code=303,detail="验证码已经发送成功,请一分钟后再试") code = cls.make_code(phone, verify_type=verify_type) send_phone_code(phone, code) @classmethod def check_code(cls, phone, code, verify_type: PhoneVerifyType = PhoneVerifyType.login): phone_id = cls.get_phone_id(phone, verify_type=verify_type) client = redis_pool.get_redis_client() phone_code = client.get(phone_id) return code and code == phone_code class EmailVerifyCode: @classmethod def get_email_id(cls, email, verify_type: EmailVerifyType = EmailVerifyType.login): """ :param email: :param verify_type: 不同业务验证类型对应不同的id :return:str """ return f"{email}_EmailVerifyCodeId_{verify_type.name}" @classmethod def make_code(cls, email, expire_time_s=60 * 5, verify_type: EmailVerifyType = EmailVerifyType.login): client = redis_pool.get_redis_client() email_id = cls.get_email_id(email, verify_type) if client.get(email_id+'_sign'): raise HTTPException(detail='验证码已经发送,请一分钟后再试',status_code=303) code = get_random_num_code(4) client.set(email_id, code) client.set(email_id+"_sign", code) client.expire(email_id+"_sign", 60) if expire_time_s: client.expire(email_id, expire_time_s) return code @classmethod def send_login_code(cls, email): send_email("", email, "") pass @classmethod def send_register_code(cls, email): code = cls.make_code(email) send_email("注册验证码", email, code) pass @classmethod def send_rest_code(cls, email): code = cls.make_code(email, verify_type=EmailVerifyType.reset_password) send_email("密码重置验证码", email, code) pass @classmethod def send_change_code(cls, email): code = cls.make_code(email, verify_type=EmailVerifyType.change) send_email("日报小程序邮箱验证码", email, code) pass @classmethod def check_code(cls, email, code, verify_type: EmailVerifyType = EmailVerifyType.login): email_id = cls.get_email_id(email, verify_type) client = redis_pool.get_redis_client() email_code = client.get(email_id) return code and code == email_code @classmethod def remove_code(cls, email, verify_type: EmailVerifyType = EmailVerifyType.login): client = redis_pool.get_redis_client() email_id = cls.get_email_id(email) client.delete(email_id)