编写小程序openid获取接口

This commit is contained in:
wcq 2023-02-28 16:28:48 +08:00
parent 74dac4e961
commit 3c734dae85
10 changed files with 185 additions and 33 deletions

View File

@ -1,19 +1,28 @@
from sqlalchemy import Column, String, Boolean, Enum, Text, DateTime, func, Integer
from sqlalchemy import Column, String, Boolean, Enum, Text, DateTime, func, Integer, ForeignKey
from sqlalchemy.orm import relationship
from Utils.SqlAlchemyUtils import Base
from Schemas.DailySchemas import DailyTypeEnum
class Daily(Base):
__tablename__ = "USER_TABLE"
openid = Column(String(255), primary_key=True, comment="用户OpenID")
type = Column(Enum(''))
email = Column(String(64), unique=True, index=True, comment="邮箱")
name = Column(String(32), comment="用户名")
department = Column(Text, comment="部门")
post = Column(Text, comment="职务")
disable = Column(Boolean, default=False, comment="禁用")
auth_data = Column(Text, comment="权限数据")
__tablename__ = "daily"
id = Column(Integer, primary_key=True, comment="日报ID")
type = Column(Enum(DailyTypeEnum, values_callable=lambda x: [e.value for e in x]), nullable=False)
fill_user = Column(String(128), comment="填报人openid")
# fill_user = Column(String(128), ForeignKey('user.openid'), comment="填报人openid")
# user = relationship('User', uselist=False, backref='user')
name = Column(String(128), comment="填报人名称")
department = Column(String(128), comment="填报人部门")
post = Column(String(128), comment="填报人职务")
content = Column(Text, comment="填报内容")
daily_time = Column(DateTime, server_default=func.now(), comment='日报时间')
create_time = Column(DateTime, server_default=func.now(), comment='创建时间')
update_time = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='修改时间')
def to_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
data = {c.name: getattr(self, c.name) for c in self.__table__.columns}
# user = getattr(self, 'user')
# data['name'] = user.name
# data['department'] = user.department
# data['post'] = user.post
return data

View File

@ -1,9 +1,11 @@
from sqlalchemy import Column, String, Boolean, Enum, Text, DateTime, func, Integer
from sqlalchemy.orm import relationship
from Utils.SqlAlchemyUtils import Base
class User(Base):
__tablename__ = "USER_TABLE"
__tablename__ = "user"
openid = Column(String(255), primary_key=True, comment="用户OpenID")
email = Column(String(64), unique=True, index=True, comment="邮箱")
name = Column(String(32), comment="用户名")
@ -16,4 +18,6 @@ class User(Base):
update_time = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='修改时间')
def to_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}

View File

@ -0,0 +1,14 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from Utils.AuthUtils import token_data_depend
from Models.DailyModel import Daily
from Utils.CrudUtils import auto_create_crud
router = APIRouter(
tags=["日报"],
prefix="/api/daily/daily",
dependencies=[Depends(token_data_depend)]
)
crud = auto_create_crud(Daily, 'daily', "日报", auto_create_keys=['create_time', 'update_time', 'id'])
crud.mount(router)

View File

@ -18,6 +18,7 @@ router = APIRouter(
# 授权登录后调用的接口,
@router.post('/login_to_app', response_model=UserSchemas.LoginToAppRes)
def login_to_app(req: UserSchemas.LoginToAppReq, db: Session = Depends(get_db)):
user = UserCrud.get_user_info(db, req.openid)
if not user:
user = UserCrud.create_user(db, req.openid)
@ -32,7 +33,7 @@ def get_user_info(token_data: TokenData = Depends(token_data_depend), db: Sessio
# 修改用户的邮箱、姓名、部门信息,需要进行邮箱验证码验证
@router.post('/change_user_info', response_model=UserSchemas.ChangeUserInfoRes)
@router.post('/change_user_info')
def change_user_info(req: UserSchemas.ChangeUserInfoReq, token_data: TokenData = Depends(token_data_depend),
db: Session = Depends(get_db)):
checked = EmailVerifyCode.check_code(req.email, req.email_code, EmailVerifyType.change)

View File

@ -0,0 +1,11 @@
from enum import Enum, unique
from pydantic import BaseModel
@unique
class DailyTypeEnum(Enum):
enum01 = "董监高日报"
enum02 = "部门子公司日报"
enum03 = "监管和同业动态"
enum04 = "行业信息专题分析"

View File

@ -1,3 +1,5 @@
from typing import Union
from pydantic import BaseModel
@ -11,7 +13,7 @@ class LoginToAppRes(BaseModel):
class GetUserInfoReq(BaseModel):
openid: str
code: str
class GetUserInfoRes(BaseModel):
@ -42,3 +44,5 @@ class ChangeUserInfoReq(UserInfoChange):
class EmailSendReqBody(BaseModel):
email: str = "xxxx@fecr.com.cn"

View File

@ -1,11 +1,14 @@
from typing import TypeVar, Generic, Any, List, get_args, Union, Optional, Tuple
from sqlalchemy import func
from sqlalchemy.orm import Session
from datetime import datetime
from datetime import datetime, date
from Utils.CommonUtils import get_sqlalchemy_model_fields
from Utils.SqlAlchemyUtils import Base, get_db
from pydantic import BaseModel, create_model
from fastapi import APIRouter, Depends
from pydantic.generics import GenericModel
from Schemas.DailySchemas import DailyTypeEnum
class QueryBase(BaseModel):
@ -39,7 +42,11 @@ class CRUDBase(Generic[DbModelType, ModelType, IdSchemaType, CreateSchemaType, U
id_schema_type: IdSchemaType,
create_schema_type: CreateSchemaType,
update_schema_type: UpdateSchemaType,
query_schema_type: QuerySchemaType, name: str, chinese_name: str = ''):
query_schema_type: QuerySchemaType,
name: str,
chinese_name: str = '',
array_keys=[],
):
self.db_model = db_model
self.name = name
self.model_type = model_type
@ -48,6 +55,8 @@ class CRUDBase(Generic[DbModelType, ModelType, IdSchemaType, CreateSchemaType, U
self.update_schema_type = update_schema_type
self.query_schema_type = query_schema_type
self.summary_name = chinese_name if chinese_name else self.name
self.query_func = None
self.array_keys = array_keys
def mount(self, router: APIRouter):
for key in self.__dir__():
@ -145,6 +154,17 @@ class CRUDBase(Generic[DbModelType, ModelType, IdSchemaType, CreateSchemaType, U
query = db.query(self.db_model)
for key, value in params_dict.items():
if key not in ['page', 'page_size'] and value is not None:
# 在存储的数组值内查询 如存的 1,2,3,4 查询时则使用的 [1,2]这样的数据来查
if key in self.array_keys:
for item in value:
query = query.filter(func.find_in_set(str(item), getattr(self.db_model, key)))
continue
# 如果执行query_func后有返回值即使用了key,value则跳过后面操作
if self.query_func:
query_temp = self.query_func(query, key, value)
if query_temp:
query = query_temp
continue
if type(value) == str:
query = query.filter(getattr(self.db_model, key).like(f'%{value}%'))
if type(value) in [int, float, bool]:
@ -157,10 +177,10 @@ class CRUDBase(Generic[DbModelType, ModelType, IdSchemaType, CreateSchemaType, U
query = query.filter(getattr(self.db_model, key) <= datetime.fromtimestamp(value[1]))
count = query.count()
page=None
page_size=None
page = None
page_size = None
if 'page' in params_dict:
page=params_dict['page']
page = params_dict['page']
if 'page_size' in params_dict:
page_size = params_dict['page_size']
if page is not None and page_size is not None:
@ -171,13 +191,15 @@ class CRUDBase(Generic[DbModelType, ModelType, IdSchemaType, CreateSchemaType, U
return db.query(self.db_model).get(item_id)
def create_crud_model(db_model: Base, name, auto_create_keys=['id'], index_key='id'):
def create_crud_model(db_model: Base, name, auto_create_keys=['id'], index_key='id',
array_keys=[]):
"""
创建CRUD所需模型
:param db_model: 数据库模型
:param name: 名称
:param auto_create_keys: 自动生成的keys
:param index_key: 索引key
:param array_keys: 用id数组存储数据的key的列表
:return:
"""
fields_dict = get_sqlalchemy_model_fields(db_model)
@ -194,28 +216,41 @@ def create_crud_model(db_model: Base, name, auto_create_keys=['id'], index_key='
eval(f'Optional[{item[0].__name__}]', globals()), item[1]) for key, item in
fields_dict.items() if key not in auto_create_keys})
query_fields_dic = {}
for key, item in fields_dict.items():
print(item[0].__name__)
if key in array_keys:
query_fields_dic[key] = (
eval(f'Optional[List[Union[int,str]]]', globals(), locals()), None)
else:
if item[0].__name__ == 'datetime':
query_fields_dic[key] = (
eval(f'Optional[List[Optional[int]]]', globals(), locals()), None)
else:
query_fields_dic[key] = (eval(f'Optional[{item[0].__name__}]', globals()), None)
print(query_fields_dic)
model_query = create_model(f"{name}Query", __config__=config,
**query_fields_dic)
return [model, model_id, model_create, model_update, model_query]
def auto_create_crud(db_model: Base, name, chinese_name="", auto_create_keys=['id'], index_key='id'):
def auto_create_crud(db_model: Base, name, chinese_name="", auto_create_keys=['id'], index_key='id',
array_keys=[]):
"""
自动创建CRUD类
:param db_model:
:param name:
:param chinese_name:
:param auto_create_keys: 自动生成的字段key
:param index_key:
:param array_keys: 由id数组组成的字符串key列表
:return:
"""
[model, model_id, model_create, model_update, model_query] = create_crud_model(db_model, name,
auto_create_keys, index_key)
auto_create_keys, index_key,
array_keys=array_keys)
class Crud(CRUDBase[db_model, model, model_id, model_create, model_update, model_query]):
pass
crud = Crud(db_model, model, model_id, model_create, model_update, model_query, name, chinese_name)
crud = Crud(db_model, model, model_id, model_create, model_update, model_query, name, chinese_name,
array_keys=array_keys)
return crud

View File

@ -4,15 +4,15 @@ from sqlalchemy.orm import sessionmaker
Base = declarative_base()
user = ""
password = ""
host = ""
db = ''
user = "root"
password = "12345"
host = "127.0.0.1"
db = 'daily'
def get_engine():
engine = create_engine(
f"mysql+pymysql://{'user'}:{'password'}@{'host'}/{'db'}?charset=utf8mb4")
f"mysql+pymysql://{user}:{password}@{host}/{db}?charset=utf8mb4")
return engine
@ -26,6 +26,13 @@ def get_db() -> sessionmaker:
db.close()
def get_db_i() -> sessionmaker:
engine = get_engine()
Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = Session()
return db
# def get_db():
# engine = create_engine("sqlite:///./data.db", connect_args={"check_same_thread": False})
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

23
Utils/wxAppUtils.py Normal file
View File

@ -0,0 +1,23 @@
import requests as rq
from pydantic import BaseModel
from typing import Union
appid = ""
secret = ""
class WxAppCode2SessionRes(BaseModel):
session_key: Union[str, None]
unionid: Union[str, None]
errmsg: Union[str, None]
openid: Union[str, None]
errcode: Union[int, None]
# 通过code获取用户唯一标识
def code2Session(code: str) -> WxAppCode2SessionRes:
res = rq.get("https://api.weixin.qq.com/sns/jscode2session",
{"appid": appid, "secret": secret, "js_code": code, "grant_type": "authorization_code"})
data = res.json()
res = WxAppCode2SessionRes(**data)
return res

44
main.py Normal file
View File

@ -0,0 +1,44 @@
import os
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
from Models.DailyModel import Daily
from Router import UserRouter, DailyRouter
from Utils.SqlAlchemyUtils import init_database, get_db_i
from fastapi.staticfiles import StaticFiles
init_database()
app = FastAPI(
title="日报小程序",
description="",
version="v1.0.0"
)
if not os.path.exists('static_data'):
os.mkdir('static_data')
app.mount("/static_data", StaticFiles(directory="static_data"), name="static_data")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(UserRouter.router)
app.include_router(DailyRouter.router)
def test():
from Utils.SqlAlchemyUtils import get_db
db: Session = get_db_i()
c = db.query(Daily).all()
e = 2
db.close()
test()
uvicorn.run(app=app, port=8006)