from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from Models.DepartmentModel import Department from Models.PostModel import Post from Models.UserModel import User from Schemas import UserSchemas from Crud import UserCrud, AuthCrud from Schemas.UserSchemas import TokenData from Utils.AuthUtils import token_data_depend, create_token, registered_depend, check_auth from Utils.CrudUtils import auto_create_crud from Utils.SqlAlchemyUtils import get_db from Utils.VerifyCodeUtils import EmailVerifyCode, EmailVerifyType, PhoneVerifyCode, PhoneVerifyType from Utils.wxAppUtils import code2Session router = APIRouter( prefix="/api/daily/user" ) # 授权登录后调用的接口, @router.post('/login_to_app', summary="应用登录", tags=["用户接口"], response_model=UserSchemas.LoginToAppRes) def login_to_app(req: UserSchemas.LoginToAppReq, db: Session = Depends(get_db)): openid = code2Session(req.code).openid if not openid: raise HTTPException(detail="code无效", status_code=303) # auth_data = AuthCrud.get_user_auth(db, openid) # user_data = user.to_dict() # user_data["auth_data"] = auth_data # token_data = TokenData(**user_data) # token = create_token(token_data.dict()) # return UserSchemas.LoginToAppRes(**user.to_dict(), token=token) return UserSchemas.LoginToAppRes(openid=openid) @router.post('/get_user_info', summary='获取用户信息', tags=["用户接口"], response_model=UserSchemas.GetUserInfoRes) def get_user_info(token_data: TokenData = Depends(registered_depend), db: Session = Depends(get_db)): user_data = UserCrud.get_full_user_info(db, token_data.email) if not user_data: raise HTTPException(detail="用户未注册", status_code=403) if not user_data['email']: raise HTTPException(detail="用户未绑定邮箱", status_code=403) user_data['post_list'] = [UserSchemas.PostInfo(**item) for item in user_data['post_list']] user_data['department_list'] = [UserSchemas.DepartmentInfo(**item) for item in user_data['department_list']] auth_data = AuthCrud.get_user_auth(db, token_data.email) user_data["auth_data"] = auth_data return UserSchemas.GetUserInfoRes(**user_data) # 修改用户的邮箱、姓名、部门信息,需要进行邮箱验证码验证 @router.post('/change_user_info_and_email', tags=["用户接口"], summary='修改用户信息及email') def change_user_info_and_email(req: UserSchemas.ChangeUserInfoReq, token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): checked = EmailVerifyCode.check_code(req.email, req.email_code, EmailVerifyType.change) if not checked: raise HTTPException(detail="邮箱验证码错误", status_code=303) if token_data.email != req.email: raise HTTPException(detail="无权限的操作", status_code=303) user = UserCrud.change_user_info(db, token_data.email, UserSchemas.UserInfoChange(**req.dict(), registered=True)) auth_data = AuthCrud.get_user_auth(db, token_data.email) user_data = user.to_dict() user_data["auth_data"] = auth_data new_token_data = TokenData(**user_data) token = create_token(new_token_data.dict()) return {'msg': "修改成功", 'state': 1, 'data': {'token': token}} @router.post('/change_user_info', tags=["用户接口"], summary='修改用户信息') def change_user_info(req: UserSchemas.ChangeUserInfoReq, token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): if token_data.email != req.email: raise HTTPException(detail="无权限的操作", status_code=303) UserCrud.change_user_info(db, token_data.email, UserSchemas.UserInfoChange(**req.dict())) return {'msg': "修改成功", 'state': 1} @router.post('/bind_email', tags=["用户接口"], summary='邮箱登录') def bind_email(req: UserSchemas.BindEmailReq, db: Session = Depends(get_db)): email = req.email.replace(" ", "") checked = EmailVerifyCode.check_code(email, req.email_code, EmailVerifyType.change) if not checked: raise HTTPException(detail="邮箱验证码错误", status_code=303) user = UserCrud.get_user_info(db, email) # if user: # # 邮箱已绑定 # if user.email and user.email != req.email: # raise HTTPException(detail="该微信已绑定邮箱,请使用该微信号绑定的邮箱登录", status_code=403) if not user: user = UserCrud.create_user(db, email) if not user.registered: UserCrud.update_user_info_from_email(db, email) auth_data = AuthCrud.get_user_auth(db, email) user_data = user.to_dict() user_data["auth_data"] = auth_data token_data = TokenData(**user_data) token = create_token(token_data) return {'msg': "成功", 'state': 1, 'data': {'token': token, 'user_info': user_data}} @router.post("/get_phone_verify_code", tags=["用户接口"], summary="获取短信验证码") def get_phone_verify_code(body: UserSchemas.GetPhoneVerifyCodeReq, db=Depends(get_db)): phone = body.phone user = UserCrud.get_user_info_by_phone(db, phone) if not user: raise HTTPException(detail="手机号未录入系统", status_code=303) try: PhoneVerifyCode.send_code(phone, PhoneVerifyType.login) except HTTPException as e: raise e except Exception as e: print(e) raise HTTPException(detail=f"验证码发送失败", status_code=403) return {"msg": "验证码已发送至手机,请查看", "state": 1} @router.post('/login_by_phone', tags=["用户接口"], summary='手机号登录') def login_by_phone(req: UserSchemas.LoginByPhoneReq, db: Session = Depends(get_db)): phone = req.phone.replace(" ", "") checked = PhoneVerifyCode.check_code(phone, req.code, PhoneVerifyType.login) if not checked: raise HTTPException(detail="验证码错误", status_code=303) user = UserCrud.get_user_info_by_phone(db, phone) if not user: raise HTTPException(detail="手机号未录入系统", status_code=303) if user.disable: raise HTTPException(detail="账号已停用", status_code=303) auth_data = AuthCrud.get_user_auth(db, user.email) user_data = user.to_dict() user_data["auth_data"] = auth_data token_data = TokenData(**user_data).dict() token = create_token(token_data) return {'msg': "成功", 'state': 1, 'data': {'token': token, 'user_info': user_data}} @router.post("/get_email_verify_code", tags=["用户接口"], summary="获取邮箱验证码") def get_email_verify_code(body: UserSchemas.EmailSendReqBody, db: Session = Depends(get_db)): try: email = body.email user = UserCrud.get_user_info(db, email) # if user: # # 邮箱已绑定 # if user.email and user.email != req.email: # raise HTTPException(detail="该微信已绑定邮箱,请使用该微信号绑定的邮箱登录", status_code=403) if not user: raise HTTPException(detail="邮箱未录入系统", status_code=303) if user.disable: raise HTTPException(detail="账号已停用", status_code=303) EmailVerifyCode.send_change_code(email) except HTTPException as e: raise e except Exception as e: print(e) raise HTTPException(detail="邮箱验证码发送失败", status_code=403) return {"msg": "验证码已发送至邮箱,请查看", "state": 1} @router.post("/refresh_token", tags=["用户接口"], summary="重新获取token") def refresh_token(token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): auth_data = AuthCrud.get_user_auth(db, token_data.email) user = UserCrud.get_user_info(db, token_data.email) if user.disable: raise HTTPException(detail="账号已停用", status_code=303) user_data = user.to_dict() user_data["auth_data"] = auth_data token_data = TokenData(**user_data) token = create_token(token_data) return {'token': token} class GetLoginVerifyCodeReq(BaseModel): account: str @router.post("/get_login_verify_code", tags=["用户接口"], summary="获取登录验证码") def get_login_verify_code(body: GetLoginVerifyCodeReq, db: Session = Depends(get_db)): try: account = body.account user = db.query(User).filter(User.email == account).first() if user: account_type = 'email' else: user = db.query(User).filter(User.phone == account).first() if user: account_type = "phone" else: raise HTTPException(detail="账号未录入系统", status_code=303) if account_type == 'email': EmailVerifyCode.send_login_code(account) if account_type == 'phone': PhoneVerifyCode.send_code(account, PhoneVerifyType.login) except HTTPException as e: raise e except Exception as e: print(e) raise HTTPException(detail="验证码发送失败", status_code=403) return {"msg": f"验证码已发送至{'邮箱' if account_type == 'email' else '手机'},请查看", "state": 1} class LoginByVerifyCode(BaseModel): account: str code: str @router.post('/login_by_verify_code', tags=["用户接口"], summary='验证码登录') def login_by_verify_code(req: LoginByVerifyCode, db: Session = Depends(get_db)): account = req.account if PhoneVerifyCode.check_code(account, req.code, PhoneVerifyType.login): account_type = 'phone' else: if EmailVerifyCode.check_code(account, req.code, EmailVerifyType.login): account_type = 'email' else: raise HTTPException(detail="验证码错误", status_code=303) user = None if account_type == 'phone': user = db.query(User).filter(User.phone == account).first() if account_type == 'email': user = db.query(User).filter(User.email == account).first() if not user: raise HTTPException(detail="账号未录入系统", status_code=303) auth_data = AuthCrud.get_user_auth(db, user.email) user_data = user.to_dict() user_data["auth_data"] = auth_data token_data = TokenData(**user_data).dict() token = create_token(token_data) return {'msg': "成功", 'state': 1, 'data': {'token': token, 'user_info': user_data}} @router.post("/get_user_by_department", tags=["用户接口"], summary="根据部门列表获取用户信息", response_model=UserSchemas.GetUserByDepartmentRes) def get_user_by_department(body: UserSchemas.GetUserByDepartmentReq, token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): users = UserCrud.get_user_by_departments(db, body.departments) return UserSchemas.GetUserByDepartmentRes(users=[UserSchemas.UserInfo(**user) for user in users]) @router.post("/get_user_by_department_type", tags=["用户接口"], summary=["根据部门类型获取用户信息"], response_model=UserSchemas.GetUserByDepartmentTypeRes) def get_user_by_department_type(body: UserSchemas.GetUserByDepartmentTypeReq, token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): users = UserCrud.get_user_by_department_type(db, body.department_type) return UserSchemas.GetUserByDepartmentTypeRes(users=[UserSchemas.UserFullInfo(**user) for user in users]) @router.post("/get_user_can_watch_department", tags=["获取用户可查看部门列表"], summary=['查询']) def get_user_can_watch_department(token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): print(token_data) department_list = [item.to_dict() for item in UserCrud.get_department_list(db) if item.id not in [1, 2, 3]] department_dict = {item['id']: item for item in department_list} # 所有部门动态查看 if check_auth(token_data.auth_data, [2]): return {'item_list': department_list, "count": len(department_list)} can_watch_department = set() for item in token_data.department.split(","): # if int(item) in [1, 2, 3]: # can_watch_department.add(1) # can_watch_department.add(2) # can_watch_department.add(3) can_watch_department.add(int(item)) if token_data.manage_departments: for item in token_data.manage_departments.split(","): can_watch_department.add(int(item)) can_watch_department_list = [department_dict[item] for item in can_watch_department if item not in [1, 2, 3]] return {'item_list': can_watch_department_list, "count": len(can_watch_department_list)} department_crud = auto_create_crud(Department, 'department', "部门", auto_create_keys=['id'], tags=["部门职务操作"]) post_crud = auto_create_crud(Post, 'post', "职务", auto_create_keys=['id'], tags=["部门职务操作"]) department_crud.mount(router) post_crud.mount(router)