first commit

This commit is contained in:
王思川 2022-10-20 16:29:54 +08:00
parent 1777f2e385
commit e674aa10cc
19 changed files with 413 additions and 92 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
.idea
*.pyc

View File

@ -1,92 +0,0 @@
# User
用户接口
## Getting started
To make it easy for you to get started with GitLab, here's a list of recommended next steps.
Already a pro? Just edit this README.md and make it your own. Want to make it easy? [Use the template at the bottom](#editing-this-readme)!
## Add your files
- [ ] [Create](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#create-a-file) or [upload](https://docs.gitlab.com/ee/user/project/repository/web_editor.html#upload-a-file) files
- [ ] [Add files using the command line](https://docs.gitlab.com/ee/gitlab-basics/add-file.html#add-a-file-using-the-command-line) or push an existing Git repository with the following command:
```
cd existing_repo
git remote add origin http://gitlab.fecribd.com/root/user.git
git branch -M main
git push -uf origin main
```
## Integrate with your tools
- [ ] [Set up project integrations](http://gitlab.fecribd.com/root/user/-/settings/integrations)
## Collaborate with your team
- [ ] [Invite team members and collaborators](https://docs.gitlab.com/ee/user/project/members/)
- [ ] [Create a new merge request](https://docs.gitlab.com/ee/user/project/merge_requests/creating_merge_requests.html)
- [ ] [Automatically close issues from merge requests](https://docs.gitlab.com/ee/user/project/issues/managing_issues.html#closing-issues-automatically)
- [ ] [Enable merge request approvals](https://docs.gitlab.com/ee/user/project/merge_requests/approvals/)
- [ ] [Automatically merge when pipeline succeeds](https://docs.gitlab.com/ee/user/project/merge_requests/merge_when_pipeline_succeeds.html)
## Test and Deploy
Use the built-in continuous integration in GitLab.
- [ ] [Get started with GitLab CI/CD](https://docs.gitlab.com/ee/ci/quick_start/index.html)
- [ ] [Analyze your code for known vulnerabilities with Static Application Security Testing(SAST)](https://docs.gitlab.com/ee/user/application_security/sast/)
- [ ] [Deploy to Kubernetes, Amazon EC2, or Amazon ECS using Auto Deploy](https://docs.gitlab.com/ee/topics/autodevops/requirements.html)
- [ ] [Use pull-based deployments for improved Kubernetes management](https://docs.gitlab.com/ee/user/clusters/agent/)
- [ ] [Set up protected environments](https://docs.gitlab.com/ee/ci/environments/protected_environments.html)
***
# Editing this README
When you're ready to make this README your own, just edit this file and use the handy template below (or feel free to structure it however you want - this is just a starting point!). Thank you to [makeareadme.com](https://www.makeareadme.com/) for this template.
## Suggestions for a good README
Every project is different, so consider which of these sections apply to yours. The sections used in the template are suggestions for most open source projects. Also keep in mind that while a README can be too long and detailed, too long is better than too short. If you think your README is too long, consider utilizing another form of documentation rather than cutting out information.
## Name
Choose a self-explaining name for your project.
## Description
Let people know what your project can do specifically. Provide context and add a link to any reference visitors might be unfamiliar with. A list of Features or a Background subsection can also be added here. If there are alternatives to your project, this is a good place to list differentiating factors.
## Badges
On some READMEs, you may see small images that convey metadata, such as whether or not all the tests are passing for the project. You can use Shields to add some to your README. Many services also have instructions for adding a badge.
## Visuals
Depending on what you are making, it can be a good idea to include screenshots or even a video (you'll frequently see GIFs rather than actual videos). Tools like ttygif can help, but check out Asciinema for a more sophisticated method.
## Installation
Within a particular ecosystem, there may be a common way of installing things, such as using Yarn, NuGet, or Homebrew. However, consider the possibility that whoever is reading your README is a novice and would like more guidance. Listing specific steps helps remove ambiguity and gets people to using your project as quickly as possible. If it only runs in a specific context like a particular programming language version or operating system or has dependencies that have to be installed manually, also add a Requirements subsection.
## Usage
Use examples liberally, and show the expected output if you can. It's helpful to have inline the smallest example of usage that you can demonstrate, while providing links to more sophisticated examples if they are too long to reasonably include in the README.
## Support
Tell people where they can go to for help. It can be any combination of an issue tracker, a chat room, an email address, etc.
## Roadmap
If you have ideas for releases in the future, it is a good idea to list them in the README.
## Contributing
State if you are open to contributions and what your requirements are for accepting them.
For people who want to make changes to your project, it's helpful to have some documentation on how to get started. Perhaps there is a script that they should run or some environment variables that they need to set. Make these steps explicit. These instructions could also be useful to your future self.
You can also document commands to lint the code or run tests. These steps help to ensure high code quality and reduce the likelihood that the changes inadvertently break something. Having instructions for running tests is especially helpful if it requires external setup, such as starting a Selenium server for testing in a browser.
## Authors and acknowledgment
Show your appreciation to those who have contributed to the project.
## License
For open source projects, say how it is licensed.
## Project status
If you have run out of energy or time for your project, put a note at the top of the README saying that development has slowed down or stopped completely. Someone may choose to fork your project or volunteer to step in as a maintainer or owner, allowing your project to keep going. You can also make an explicit request for maintainers.

0
User/__init__.py Normal file
View File

52
User/crud.py Normal file
View File

@ -0,0 +1,52 @@
from sqlalchemy import or_
from werkzeug.security import generate_password_hash
from sqlalchemy.orm import Session
from . import models, schemas
def get_user_by_id(db: Session, uid: int):
return db.query(models.User).filter(models.User.id == uid).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, body: schemas.UserSearch, skip, limit):
return db.query(models.User).filter(
or_(models.User.id == body.id, body.id is None),
or_(models.User.email == body.email, body.email is None),
or_(models.User.name.like("%" + "" if body.name is None else body.name + "%"), body.name is None),
or_(models.User.role == body.role, body.role is None),
or_(models.User.depart == body.depart, body.depart is None),
or_(models.User.is_active == body.is_active, body.is_active is None),
).offset(skip).limit(limit).all()
def create_user(db: Session, body: schemas.UserCreate):
body.passwd = generate_password_hash(body.passwd)
item = models.User(**body.dict())
db.add(item)
db.commit()
db.refresh(item)
return item
def delete_user(db: Session, _id: int):
db.query(models.User).filter_by(id=_id).delete()
db.commit()
return True
def edit_user(db: Session, uid: int, body: schemas.UserEdit):
db.query(models.User).filter_by(id=uid).update(body.dict())
db.commit()
return True
def reset_user_pwd(db: Session, uid: int, passwd: schemas.PasswdRegex):
passwd = generate_password_hash(passwd)
db.query(models.User).filter_by(id=uid).update({"passwd": passwd})
db.commit()
return True

10
User/database.py Normal file
View File

@ -0,0 +1,10 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# MYSQL
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@localhost/wr_user_admin?charset=utf8mb4"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

15
User/models.py Normal file
View File

@ -0,0 +1,15 @@
from sqlalchemy import Column, Integer, String, Boolean
from .database import Base
class User(Base):
__tablename__ = "user"
id = Column(Integer, autoincrement=True, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True)
passwd = Column(String(255))
name = Column(String(255))
role = Column(String(255))
depart = Column(String(255))
is_active = Column(Boolean, default=True)

50
User/router_user.py Normal file
View File

@ -0,0 +1,50 @@
from datetime import timedelta
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from werkzeug.security import check_password_hash
from User import crud, models, schemas
from User.database import SessionLocal, engine
from Utils.Authentication import Config, TokenUtil
models.Base.metadata.create_all(bind=engine)
router = APIRouter()
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
@router.post("/login", summary="登录")
def login(body: schemas.UserLogin, db: Session = Depends(get_db)):
data = crud.get_user_by_email(db, email=body.email)
if not data:
return {"info": "User Not Existed"}
if not check_password_hash(data.passwd, body.passwd):
return {"info": "Password Incorrect"}
user = {
"uid": data.id,
"email": data.email,
"name": data.name,
"role": data.role,
"depart": data.depart,
"is_active": data.is_active
}
access_token_expires = timedelta(minutes=Config.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = TokenUtil.create_token(
data=user, expires_delta=access_token_expires
)
user.update({"token": access_token})
return user

74
User/router_user_admin.py Normal file
View File

@ -0,0 +1,74 @@
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from User import crud, models, schemas
from User.database import SessionLocal, engine
from Utils.AccessControl.AccessUtil import access_interseptor
models.Base.metadata.create_all(bind=engine)
router = APIRouter(
tags=["用户管理"],
prefix="/api/user/admin",
dependencies=[
Depends(access_interseptor)
]
)
def get_db():
try:
db = SessionLocal()
yield db
finally:
db.close()
@router.post("/create/", summary="新建用户", response_model=schemas.InfoBase)
def create_user(body: schemas.UserCreate, db: Session = Depends(get_db)):
item = crud.get_user_by_email(db, email=body.email)
if item:
raise HTTPException(status_code=400, detail="Email already registered")
crud.create_user(db=db, body=body)
return {"info": "Success"}
@router.post("/delete/", summary="删除用户")
def delete_user(uid: int, db: Session = Depends(get_db)):
crud.delete_user(db=db, _id=uid)
return {"info": "Success"}
@router.post("/edit", summary="编辑用户")
def user_edit(uid: int, body: schemas.UserEdit, db: Session = Depends(get_db)):
data = crud.get_user_by_id(db, uid)
if not data:
raise HTTPException(status_code=400, detail="User Not found")
crud.edit_user(db=db, body=body, uid=uid)
return {"info": "Success"}
@router.post("/reset_pwd", summary="重置用户密码")
def user_reset_pwd(uid: int, db: Session = Depends(get_db)):
data = crud.get_user_by_id(db, uid)
if not data:
raise HTTPException(status_code=400, detail="User Not found")
passwd = "Fecr1988"
crud.reset_user_pwd(db=db, passwd=passwd, uid=uid)
return {"info": "Success"}
@router.post("/search", summary="查询用户", response_model=List[schemas.User])
def search_users(body: schemas.UserSearch, skip: int = 0, limit: int = 20, db: Session = Depends(get_db)):
users = crud.get_users(db, body=body, skip=skip, limit=limit)
return users
@router.post("/detail", summary="用户详情", response_model=schemas.User)
def user_detail(uid: int, db: Session = Depends(get_db)):
data = crud.get_user_by_id(db, uid=uid)
if data is None:
raise HTTPException(status_code=404, detail="User Not found")
return data

81
User/schemas.py Normal file
View File

@ -0,0 +1,81 @@
import pydantic
from typing import Union, Optional
from enum import Enum
from pydantic import BaseModel
EMailRegex = pydantic.constr(regex="^[a-zA-Z0-9_-]+@fecr.com.cn$")
PasswdRegex = pydantic.constr(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)[\s\S]{8,16}$")
class InfoBase(BaseModel):
class InfoEnum(Enum):
enum1 = "Success"
enum2 = "Failed"
info: InfoEnum
class DepartEnum(Enum):
enum1 = "数字化部"
class RoleEnum(Enum):
enum1 = "guest"
enum2 = "user"
enum3 = "admin"
class UserBase(BaseModel):
email: EMailRegex = "xxxx@fecr.com.cn"
name: str = "用户"
role: RoleEnum = "guest"
depart: Union[DepartEnum, None] = None
class Config:
use_enum_values = True
class UserCreate(UserBase):
passwd: PasswdRegex = "8-16字符长度至少1个大写字母、1个小写字母、1个数字"
class UserResetPwd(BaseModel):
passwd: PasswdRegex = "8-16字符长度至少1个大写字母、1个小写字母、1个数字"
class UserEdit(BaseModel):
email: Optional[EMailRegex] = "xxxx@fecr.com.cn"
name: Optional[str] = "用户"
role: Optional[RoleEnum] = "guest"
depart: Optional[Union[DepartEnum, None]]
is_active: Optional[bool]
class Config:
use_enum_values = True
class UserLogin(BaseModel):
email: EMailRegex = "xxxx@fecr.com.cn"
passwd: PasswdRegex = "8-16字符长度至少1个大写字母、1个小写字母、1个数字"
# verify: str
class UserSearch(BaseModel):
id: Optional[int] = None
email: Optional[str] = None
name: Optional[str] = None
role: Optional[str] = None
depart: Optional[str] = None
is_active: Optional[bool] = None
class User(UserBase):
id: int
is_active: bool
class Config:
orm_mode = True

View File

@ -0,0 +1,27 @@
import os
from typing import Optional
import casbin
from fastapi import HTTPException, status, Request, Header
from Utils.Authentication.TokenUtil import decode_token
e = casbin.Enforcer(
os.getcwd() + r"\Utils\AccessControl\model.conf",
os.getcwd() + r"\Utils\AccessControl\policy.csv"
)
def access_interseptor(request: Request, token: Optional[str] = Header(...)):
sub = decode_token(token).get("role")
obj = request.url.__str__().split(request.base_url.__str__()[:-1])[-1].split("?")[0]
act = request.method
if not e.enforce(sub, obj, act):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="No Access",
headers={"WWW-Authenticate": "Bearer"},
)
raise credentials_exception

View File

View File

@ -0,0 +1,15 @@
# Request definition
[request_definition]
r = sub, obj, act
# Policy definition
[policy_definition]
p = sub, obj, act
# Policy effect
[policy_effect]
e = some(where (p.eft == allow))
# Matchers
[matchers]
m = r.sub == p.sub && r.obj == p.obj && r.act == p.act

View File

@ -0,0 +1,8 @@
p, admin, /api/user/admin/create, POST
p, admin, /api/user/admin/delete, POST
p, admin, /api/user/admin/edit, POST
p, admin, /api/user/admin/search, POST
p, admin, /api/user/admin/detail, POST
p, user, /api/user/detail, POST
p, user, /api/user/change_pwd, POST
1 p admin /api/user/admin/create POST
2 p admin /api/user/admin/delete POST
3 p admin /api/user/admin/edit POST
4 p admin /api/user/admin/search POST
5 p admin /api/user/admin/detail POST
6 p user /api/user/detail POST
7 p user /api/user/change_pwd POST

View File

@ -0,0 +1,3 @@
SECRET_KEY = "HpGXrdwbL73ZPgQC"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 30

View File

@ -0,0 +1,45 @@
import jwt
from jwt import PyJWTError
from datetime import datetime, timedelta
from fastapi import HTTPException, status, Header
from Utils.Authentication import Config
def create_token(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy() # 复制data副本
# 设置过期时间
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire}) # 更新过期时间
# Token编码
encoded_jwt = jwt.encode(to_encode, Config.SECRET_KEY, algorithm=Config.ALGORITHM)
return encoded_jwt
def decode_token(token: str):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid Token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, Config.SECRET_KEY, algorithms=[Config.ALGORITHM])
except PyJWTError:
raise credentials_exception
return payload
async def get_token_header(authorization: str = Header(...)):
"""
获取Token并验证
:param authorization:
:return: uid
"""
token = authorization.split(' ')[-1] # 获取token
openid = decode_token(token) # 验证token
if not openid:
raise HTTPException(status_code=400, detail="无效Token")

View File

0
Utils/__init__.py Normal file
View File

22
main.py Normal file
View File

@ -0,0 +1,22 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from User import router_user_admin, router_user
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(
router_user.router,
tags=["用户"],
prefix="/api/user"
)
app.include_router(router_user_admin.router)

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
fastapi~=0.85.0
pydantic~=1.10.2
SQLAlchemy~=1.4.41
PyMySQL~=1.0.2
uvicorn
Werkzeug~=2.2.2
PyJWT~=2.5.0
requests~=2.28.1
casbin~=1.17.1