daily/Router/UserRouter.py

296 lines
14 KiB
Python

from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from Models.DepartmentModel import Department
from Models.PostModel import Post
from Models.UserModel import User
from Schemas import UserSchemas
from Crud import UserCrud, AuthCrud
from Schemas.UserSchemas import TokenData
from Utils.AuthUtils import token_data_depend, create_token, registered_depend, check_auth
from Utils.CrudUtils import auto_create_crud
from Utils.SqlAlchemyUtils import get_db
from Utils.VerifyCodeUtils import EmailVerifyCode, EmailVerifyType, PhoneVerifyCode, PhoneVerifyType
from Utils.wxAppUtils import code2Session
router = APIRouter(
prefix="/api/daily/user"
)
# 授权登录后调用的接口,
@router.post('/login_to_app', summary="应用登录", tags=["用户接口"], response_model=UserSchemas.LoginToAppRes)
def login_to_app(req: UserSchemas.LoginToAppReq, db: Session = Depends(get_db)):
openid = code2Session(req.code).openid
if not openid:
raise HTTPException(detail="code无效", status_code=303)
# auth_data = AuthCrud.get_user_auth(db, openid)
# user_data = user.to_dict()
# user_data["auth_data"] = auth_data
# token_data = TokenData(**user_data)
# token = create_token(token_data.dict())
# return UserSchemas.LoginToAppRes(**user.to_dict(), token=token)
return UserSchemas.LoginToAppRes(openid=openid)
@router.post('/get_user_info', summary='获取用户信息', tags=["用户接口"], response_model=UserSchemas.GetUserInfoRes)
def get_user_info(token_data: TokenData = Depends(registered_depend), db: Session = Depends(get_db)):
user_data = UserCrud.get_full_user_info(db, token_data.email)
if not user_data:
raise HTTPException(detail="用户未注册", status_code=403)
if not user_data['email']:
raise HTTPException(detail="用户未绑定邮箱", status_code=403)
user_data['post_list'] = [UserSchemas.PostInfo(**item) for item in user_data['post_list']]
user_data['department_list'] = [UserSchemas.DepartmentInfo(**item) for item in user_data['department_list']]
auth_data = AuthCrud.get_user_auth(db, token_data.email)
user_data["auth_data"] = auth_data
return UserSchemas.GetUserInfoRes(**user_data)
# 修改用户的邮箱、姓名、部门信息,需要进行邮箱验证码验证
@router.post('/change_user_info_and_email', tags=["用户接口"], summary='修改用户信息及email')
def change_user_info_and_email(req: UserSchemas.ChangeUserInfoReq, token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
checked = EmailVerifyCode.check_code(req.email, req.email_code, EmailVerifyType.change)
if not checked:
raise HTTPException(detail="邮箱验证码错误", status_code=303)
if token_data.email != req.email:
raise HTTPException(detail="无权限的操作", status_code=303)
user = UserCrud.change_user_info(db, token_data.email, UserSchemas.UserInfoChange(**req.dict(), registered=True))
auth_data = AuthCrud.get_user_auth(db, token_data.email)
user_data = user.to_dict()
user_data["auth_data"] = auth_data
new_token_data = TokenData(**user_data)
token = create_token(new_token_data.dict())
return {'msg': "修改成功", 'state': 1, 'data': {'token': token}}
@router.post('/change_user_info', tags=["用户接口"], summary='修改用户信息')
def change_user_info(req: UserSchemas.ChangeUserInfoReq, token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
if token_data.email != req.email:
raise HTTPException(detail="无权限的操作", status_code=303)
UserCrud.change_user_info(db, token_data.email, UserSchemas.UserInfoChange(**req.dict()))
return {'msg': "修改成功", 'state': 1}
@router.post('/bind_email', tags=["用户接口"], summary='邮箱登录')
def bind_email(req: UserSchemas.BindEmailReq,
db: Session = Depends(get_db)):
email = req.email.replace(" ", "")
checked = EmailVerifyCode.check_code(email, req.email_code, EmailVerifyType.change)
if not checked:
raise HTTPException(detail="邮箱验证码错误", status_code=303)
user = UserCrud.get_user_info(db, email)
# if user:
# # 邮箱已绑定
# if user.email and user.email != req.email:
# raise HTTPException(detail="该微信已绑定邮箱,请使用该微信号绑定的邮箱登录", status_code=403)
if not user:
user = UserCrud.create_user(db, email)
if not user.registered:
UserCrud.update_user_info_from_email(db, email)
auth_data = AuthCrud.get_user_auth(db, email)
user_data = user.to_dict()
user_data["auth_data"] = auth_data
token_data = TokenData(**user_data)
token = create_token(token_data)
return {'msg': "成功", 'state': 1, 'data': {'token': token, 'user_info': user_data}}
@router.post("/get_phone_verify_code", tags=["用户接口"], summary="获取短信验证码")
def get_phone_verify_code(body: UserSchemas.GetPhoneVerifyCodeReq, db=Depends(get_db)):
phone = body.phone
user = UserCrud.get_user_info_by_phone(db, phone)
if not user:
raise HTTPException(detail="手机号未录入系统", status_code=303)
try:
PhoneVerifyCode.send_code(phone, PhoneVerifyType.login)
except HTTPException as e:
raise e
except Exception as e:
print(e)
raise HTTPException(detail=f"验证码发送失败", status_code=403)
return {"msg": "验证码已发送至手机,请查看", "state": 1}
@router.post('/login_by_phone', tags=["用户接口"], summary='手机号登录')
def login_by_phone(req: UserSchemas.LoginByPhoneReq,
db: Session = Depends(get_db)):
phone = req.phone.replace(" ", "")
checked = PhoneVerifyCode.check_code(phone, req.code, PhoneVerifyType.login)
if not checked:
raise HTTPException(detail="验证码错误", status_code=303)
user = UserCrud.get_user_info_by_phone(db, phone)
if not user:
raise HTTPException(detail="手机号未录入系统", status_code=303)
if user.disable:
raise HTTPException(detail="账号已停用", status_code=303)
auth_data = AuthCrud.get_user_auth(db, user.email)
user_data = user.to_dict()
user_data["auth_data"] = auth_data
token_data = TokenData(**user_data).dict()
token = create_token(token_data)
return {'msg': "成功", 'state': 1, 'data': {'token': token, 'user_info': user_data}}
@router.post("/get_email_verify_code", tags=["用户接口"], summary="获取邮箱验证码")
def get_email_verify_code(body: UserSchemas.EmailSendReqBody, db: Session = Depends(get_db)):
try:
email = body.email
user = UserCrud.get_user_info(db, email)
# if user:
# # 邮箱已绑定
# if user.email and user.email != req.email:
# raise HTTPException(detail="该微信已绑定邮箱,请使用该微信号绑定的邮箱登录", status_code=403)
if not user:
raise HTTPException(detail="邮箱未录入系统", status_code=303)
if user.disable:
raise HTTPException(detail="账号已停用", status_code=303)
EmailVerifyCode.send_change_code(email)
except HTTPException as e:
raise e
except Exception as e:
print(e)
raise HTTPException(detail="邮箱验证码发送失败", status_code=403)
return {"msg": "验证码已发送至邮箱,请查看", "state": 1}
@router.post("/refresh_token", tags=["用户接口"], summary="重新获取token")
def refresh_token(token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)):
auth_data = AuthCrud.get_user_auth(db, token_data.email)
user = UserCrud.get_user_info(db, token_data.email)
if user.disable:
raise HTTPException(detail="账号已停用", status_code=303)
user_data = user.to_dict()
user_data["auth_data"] = auth_data
token_data = TokenData(**user_data)
token = create_token(token_data)
return {'token': token}
class GetLoginVerifyCodeReq(BaseModel):
account: str
@router.post("/get_login_verify_code", tags=["用户接口"], summary="获取登录验证码")
def get_login_verify_code(body: GetLoginVerifyCodeReq, db: Session = Depends(get_db)):
try:
account = body.account
user = db.query(User).filter(User.email == account).first()
if user:
account_type = 'email'
else:
user = db.query(User).filter(User.phone == account).first()
if user:
account_type = "phone"
else:
raise HTTPException(detail="账号未录入系统", status_code=303)
if account_type == 'email':
EmailVerifyCode.send_login_code(account)
if account_type == 'phone':
PhoneVerifyCode.send_code(account, PhoneVerifyType.login)
except HTTPException as e:
raise e
except Exception as e:
print(e)
raise HTTPException(detail="验证码发送失败", status_code=403)
return {"msg": f"验证码已发送至{'邮箱' if account_type == 'email' else '手机'},请查看", "state": 1}
class LoginByVerifyCode(BaseModel):
account: str
code: str
@router.post('/login_by_verify_code', tags=["用户接口"], summary='验证码登录')
def login_by_verify_code(req: LoginByVerifyCode,
db: Session = Depends(get_db)):
account = req.account
if PhoneVerifyCode.check_code(account, req.code, PhoneVerifyType.login):
account_type = 'phone'
else:
if EmailVerifyCode.check_code(account, req.code, EmailVerifyType.login):
account_type = 'email'
else:
raise HTTPException(detail="验证码错误", status_code=303)
user = None
if account_type == 'phone':
user = db.query(User).filter(User.phone == account).first()
if account_type == 'email':
user = db.query(User).filter(User.email == account).first()
if not user:
raise HTTPException(detail="账号未录入系统", status_code=303)
auth_data = AuthCrud.get_user_auth(db, user.email)
user_data = user.to_dict()
user_data["auth_data"] = auth_data
token_data = TokenData(**user_data).dict()
token = create_token(token_data)
return {'msg': "成功", 'state': 1, 'data': {'token': token, 'user_info': user_data}}
@router.post("/get_user_by_department", tags=["用户接口"], summary="根据部门列表获取用户信息",
response_model=UserSchemas.GetUserByDepartmentRes)
def get_user_by_department(body: UserSchemas.GetUserByDepartmentReq, token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
users = UserCrud.get_user_by_departments(db, body.departments)
return UserSchemas.GetUserByDepartmentRes(users=[UserSchemas.UserInfo(**user) for user in users])
@router.post("/get_user_by_department_type", tags=["用户接口"], summary=["根据部门类型获取用户信息"],
response_model=UserSchemas.GetUserByDepartmentTypeRes)
def get_user_by_department_type(body: UserSchemas.GetUserByDepartmentTypeReq,
token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
users = UserCrud.get_user_by_department_type(db, body.department_type)
return UserSchemas.GetUserByDepartmentTypeRes(users=[UserSchemas.UserFullInfo(**user) for user in users])
@router.post("/change_user_daily_fill_notice", tags=["用户接口"], summary=["修改用户日报填报提醒"])
def change_user_daily_fill_notice(body: UserSchemas.ChangeUserDailyFillNoticeReq,
token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
UserCrud.change_user_daily_fill_notice(db, token_data.email, body.daily_fill_notice)
return {"msg": "修改成功", "state": 1}
@router.post("/set_user_openid", tags=["用户接口"], summary="设置用户openid")
def set_user_openid(body: UserSchemas.SetUserOpenidReq,
token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
openid = code2Session(body.code).openid
if not openid:
raise HTTPException(detail="code无效", status_code=303)
UserCrud.set_user_openid(db, token_data.email, openid)
return {"msg": "修改成功", "state": 1}
@router.post("/get_user_can_watch_department", tags=["获取用户可查看部门列表"], summary=['查询'])
def get_user_can_watch_department(token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)):
print(token_data)
department_list = [item.to_dict() for item in UserCrud.get_department_list(db) if item.id not in [1, 2, 3] and item.name not in ["四川远东"]]
department_dict = {item['id']: item for item in department_list}
# 所有部门动态查看
if check_auth(token_data.auth_data, [2]):
return {'item_list': department_list, "count": len(department_list)}
can_watch_department = set()
for item in token_data.department.split(","):
# if int(item) in [1, 2, 3]:
# can_watch_department.add(1)
# can_watch_department.add(2)
# can_watch_department.add(3)
can_watch_department.add(int(item))
if token_data.manage_departments:
for item in token_data.manage_departments.split(","):
can_watch_department.add(int(item))
can_watch_department_list = [department_dict[item] for item in can_watch_department if item not in [1, 2, 3]]
return {'item_list': can_watch_department_list, "count": len(can_watch_department_list)}
department_crud = auto_create_crud(Department, 'department', "部门", auto_create_keys=['id'], tags=["部门职务操作"])
post_crud = auto_create_crud(Post, 'post', "职务", auto_create_keys=['id'], tags=["部门职务操作"])
department_crud.mount(router)
post_crud.mount(router)