from fastapi import APIRouter, Depends, HTTPException from sqlalchemy.orm import Session from context.common import common_db, conf, email_verify_code, company_user_auth_util, tianyancha_api, \ CompanyUserTokenDataModel from utils.email_utils import email_check from utils.passwd_utils import get_encrypt_password, decode_b64_pwd from utils.validate_utils import Validate from utils.verify_code_utils import EmailVerifyType from . import schemas from . import crud from utils.sqlalchemy_common_utils import QueryParams, query_common from .models import CompanyUser from ..form_file import crud as FormFileCrud from utils.ocr_utils import ocr_business_license router = APIRouter(tags=["企业用户"]) @router.post('/company_user/login_by_email', summary='邮箱登录', response_model=schemas.LoginByEmailRes) def login_by_email(req: schemas.LoginByEmailReq, db: Session = Depends(common_db.get_db)): checked = email_verify_code.check_code(req.email, req.code, EmailVerifyType.login) if not checked: raise HTTPException(detail="邮箱验证码错误", status_code=303) user = crud.company_user_get_by_email(db, req.email) if not user: raise HTTPException(detail="该账户未注册", status_code=303) user_data = user.to_dict() token_data = company_user_auth_util.tokenDataModel(**user_data) token = company_user_auth_util.create_token(token_data) return schemas.LoginByEmailRes(token=token) @router.post('/company_user/login_by_passwd', summary='邮箱密码登录', response_model=schemas.LoginByPasswdRes) def login_by_passwd(req: schemas.LoginByPasswdReq, db: Session = Depends(common_db.get_db)): # checked = email_verify_code.check_code(req.email, req.code, EmailVerifyType.login) org_passwd = decode_b64_pwd(req.passwd) hashed_passwd = get_encrypt_password(org_passwd) user = crud.login_by_passwd_check(db, req.email, hashed_passwd) if not user: raise HTTPException(detail="邮箱或密码错误", status_code=303) user_data = user.to_dict() token_data = company_user_auth_util.tokenDataModel(**user_data) token = company_user_auth_util.create_token(token_data) return schemas.LoginByEmailRes(token=token) @router.post("/company_user/get_email_login_verify_code", summary="获取登录邮箱验证码") def get_email_login_verify_code(body: schemas.GetEmailVerifyCode): try: email = body.email if not Validate.email(email): raise HTTPException(detail="错误的邮箱格式", status_code=403) email_verify_code.send_code(email, EmailVerifyType.login) except Exception as e: print(e) raise HTTPException(detail="邮箱验证码发送失败", status_code=403) return {'state': 1, "msg": "邮箱验证码已发送至邮箱,请查看"} @router.post("/company_user/get_email_register_verify_code", summary="获取注册邮箱验证码") def get_email_register_verify_code(body: schemas.GetEmailVerifyCode): try: email = body.email if not Validate.email(email): raise HTTPException(detail="错误的邮箱格式", status_code=403) email_verify_code.send_code(email, EmailVerifyType.register) except Exception as e: print(e) raise HTTPException(detail="邮箱验证码发送失败", status_code=403) return {'state': 1, "msg": "邮箱验证码已发送至邮箱,请查看"} @router.post("/company_user/register", summary="注册企业用户") def company_user_register(req: schemas.CompanyUserRegisterReq, db: Session = Depends(common_db.get_db)): org_passwd = decode_b64_pwd(req.passwd) if not Validate.password(org_passwd): raise HTTPException(detail="密码格式不规范", status_code=303) checked = email_verify_code.check_code(req.email, req.email_verified_code,EmailVerifyType.register) if not checked: raise HTTPException(detail='邮箱验证码错误', status_code=303) file = FormFileCrud.file_get(db, req.license_file_md) image_url = conf['app']['host'] + '/wd_bj_static/' + "/".join(file.file_url.split('/')[2:]) license_res = ocr_business_license(image_url) license_check_res,_ = tianyancha_api.get('企业三要素', {'code': license_res.credit, 'name': license_res.name, 'legalPersonName': license_res.owner}) if not (license_check_res and license_check_res['result'] == 1): if license_check_res: raise HTTPException(detail=license_check_res['remark'], status_code=303) raise HTTPException(detail="营业执照信息验证失败", status_code=303) company_name = license_res.name item = crud.company_user_get_by_company_name(db, company_name) if item: raise HTTPException(detail="公司已经被注册", status_code=303) passwd_hashed = get_encrypt_password(org_passwd) add_res = crud.company_user_add(db, schemas.CompanyUserAddInfo(company_name=company_name, email=req.email, passwd=passwd_hashed, verified=True)) return {'msg': "注册成功", 'state': 1} @router.post('/get_user_info', summary='获取用户信息', response_model=schemas.GetUserInfoRes) def get_user_info(token_data: CompanyUserTokenDataModel = Depends(company_user_auth_util.token_data_depend), db: Session = Depends(common_db.get_db)): user = crud.company_user_get(db, token_data.id) user_data=user.to_dict() if not user: raise HTTPException(detail="无用户", status_code=403) return schemas.GetUserInfoRes(**user_data) # @router.post("/company_user/add", summary="添加企业用户", response_model=schemas.CompanyUserAddRes) # def company_user_add(req: schemas.CompanyUserAddReq, db: Session = Depends(common_db.get_db)): # item = crud.company_user_add(db, req) # return schemas.CompanyUserAddRes(**item.to_dict()) # # # @router.post("/company_user/delete", summary="删除企业用户") # def company_user_delete(req: schemas.CompanyUserDeleteReq, db: Session = Depends(common_db.get_db)): # crud.company_user_delete(db, req.id) # return "删除成功" # # # @router.post("/company_user/update", summary="更新企业用户", response_model=schemas.CompanyUserUpdateRes) # def company_user_update(req: schemas.CompanyUserUpdateReq, db: Session = Depends(common_db.get_db)): # item = crud.company_user_update(db, req) # return schemas.CompanyUserUpdateRes(**item.to_dict()) # # # @router.post("/company_user/get", summary="获取企业用户", response_model=schemas.CompanyUserGetRes) # def company_user_get(req: schemas.CompanyUserGetReq, db: Session = Depends(common_db.get_db)): # item = crud.company_user_get(db, req.id) # if not item: # raise HTTPException(detail="未查询到信息", status_code=404) # return schemas.CompanyUserGetRes(**item.to_dict()) # # # @router.post("/company_user/query", summary="查询企业用户", response_model=schemas.CompanyUserQueryRes) # def company_user_query(req: schemas.CompanyUserQueryReq, db: Session = Depends(common_db.get_db)): # count, query = crud.company_user_query(db, req) # items = [schemas.CompanyUserInfo(**item.to_dict()) for item in query] # return schemas.CompanyUserQueryRes(count=count, items=items) # # # @router.post("/company_user/query_common", summary="通用查询企业用户", # response_model=schemas.CompanyUserQueryRes) # def company_user_query_common(req: QueryParams, db: Session = Depends(common_db.get_db)): # count, query = query_common(db, CompanyUser, req) # items = [schemas.CompanyUserInfo(**item.to_dict()) for item in query] # return schemas.CompanyUserQueryRes(count=count, items=items) #########