用户模块修改

This commit is contained in:
王思川 2022-01-25 16:48:42 +08:00
parent 03f6f040e8
commit e91ca7ff8e
24 changed files with 377 additions and 390 deletions

View File

View File

@ -3,6 +3,36 @@ import pymongo
client = pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format('root', 'sromitdTW569kC#M', '116.63.130.34', 27018))
def delete_data(param1, param2, param3):
"""
根据查询条件删除一条文档
Parameters:
param1: str 数据库
param2: str 数据集
param3: obj 查询条件
Returns:
res: True 执行完成
"""
collection = client[param1][param2]
collection.delete_one(param3)
return True
def find_data(param1, param2, param3):
"""
根据查询条件查询符合条件的所有数据
Parameters:
param1: str 数据库
param2: str 数据集
param3: obj 查询条件
Returns:
res: list 符合条件的数据
"""
collection = client[param1][param2]
res = list(collection.find(param3, {'_id': False}))
return res
def update_user_info_by_cid(cid, data):
"""
更新用户信息

View File

@ -1,7 +1,7 @@
from flask import Blueprint, request
from TestManage.TestObj import TestObj
from common.security.APIAuth import api_secret
from common.APIAuth import api_secret
test_route = Blueprint('test', __name__)

View File

@ -2,8 +2,7 @@ import json
import requests
from TestManage.db import update_user_info_by_cid, find_user_info_by_cid
from common.db import delete_data, find_data
from TestManage.db import update_user_info_by_cid, find_user_info_by_cid, delete_data, find_data
def delete_rid_cascade_scripts(rid):
@ -20,11 +19,11 @@ def delete_rid_cascade_scripts(rid):
res: True 执行结果
"""
delete_data('tfse', '评价', '报告数据', {"评价ID": rid})
delete_data('tfse', '评价', '评价结果', {"评价ID": rid})
delete_data('tfse', '评价', '综合评价填报', {"评价ID": rid})
delete_data('tfse', '评价', '评价记录', {"评价ID": rid})
delete_data('tfse', '评价', '风险数据', {"评价ID": rid})
delete_data('评价', '报告数据', {"评价ID": rid})
delete_data('评价', '评价结果', {"评价ID": rid})
delete_data('评价', '综合评价填报', {"评价ID": rid})
delete_data('评价', '评价记录', {"评价ID": rid})
delete_data('评价', '风险数据', {"评价ID": rid})
return True
@ -37,14 +36,14 @@ def delete_cid_cascade_scripts(cid):
Returns:
res: True 执行结果
"""
delete_data('tfse', '用户', '用户信息', {"企业ID": cid})
delete_data('tfse', '企业', '指标明细', {"企业ID": cid})
delete_data('tfse', '企业', '综合评价分析', {"企业ID": cid})
delete_data('tfse', '企业', '公司基本信息', {"企业ID": cid})
delete_data('tfse', '企业', '财务要素分析', {"企业ID": cid})
delete_data('tfse', '企业', '财务要素分析', {"企业ID": cid})
delete_data('用户', '用户信息', {"企业ID": cid})
delete_data('企业', '指标明细', {"企业ID": cid})
delete_data('企业', '综合评价分析', {"企业ID": cid})
delete_data('企业', '公司基本信息', {"企业ID": cid})
delete_data('企业', '财务要素分析', {"企业ID": cid})
delete_data('企业', '财务要素分析', {"企业ID": cid})
rids = find_data('tfse', '评价', '评价记录', {"企业ID": cid})
rids = find_data('评价', '评价记录', {"企业ID": cid})
if rids:
for rid in rids:
delete_rid_cascade_scripts(rid['评价ID'])

4
app.py
View File

@ -4,7 +4,7 @@ from flask_cors import *
from TestManage.routes import test_route
from company.routes import company_route
from rating.routes import rating_route
from Routes.user_routes import user_route
from user.user_routes import user_route
from macro.routes import macro_route
from file.routes import file_route
@ -18,7 +18,7 @@ app.register_blueprint(user_route, url_prefix='/admin/user')
app.register_blueprint(company_route, url_prefix='/admin/company')
app.register_blueprint(rating_route, url_prefix='/admin/rating')
app.register_blueprint(macro_route, url_prefix='/admin/macro')
app.register_blueprint(file_route, url_prefix='/admin/file')
app.register_blueprint(file_route, url_prefix='/admin/static')
if __name__ == '__main__':
app.run()

21
common/APIAuth.py Normal file
View File

@ -0,0 +1,21 @@
import functools
from flask import request
def api_secret(func):
"""
校验接口请求密钥
"""
secret = "EZgo9ykxrYuBMYnYmmKIh" # 接口密钥
@functools.wraps(func)
def internal(*args, **kwargs):
try:
token = request.headers.get('secret')
if token != secret:
return {"info": "接口密钥错误"}, 401
except Exception:
return {"info": "请求异常"}, 401
return func(*args, **kwargs)
return internal

View File

@ -1,91 +0,0 @@
import pymongo
clients = {
"tfse": pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format('root', 'sromitdTW569kC#M', '116.63.130.34', 27018)),
"tfse_admin": pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format('root', 'RYIHrqml#LSW6#!*', '116.63.130.34', 27020)),
}
def find_data(param1, param2, param3, param4):
"""
根据查询条件查询符合条件的所有数据
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
Returns:
res: list 符合条件的数据
"""
collection = clients[param1][param2][param3]
res = list(collection.find(param4, {'_id': False}))
return res
def delete_data(param1, param2, param3, param4):
"""
根据查询条件删除一条文档
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
Returns:
res: True 执行完成
"""
collection = clients[param1][param2][param3]
collection.delete_one(param4)
return True
def insert_data(param1, param2, param3, param4):
"""
插入一条数据
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 插入数据
Returns:
True 执行成功
False 执行失败
"""
try:
collection = clients[param1][param2][param3]
collection.insert_one(param4)
return True
except Exception:
return False
def update_data(param1, param2, param3, param4, param5):
"""
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
param5: obj 插入数据
Returns:
-
"""
collection = clients[param1][param2][param3]
collection.update_one(param4, param5)
return True
def update_data_upsert(param1, param2, param3, param4, param5):
"""
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
param5: obj 插入数据
Returns:
-
"""
collection = clients[param1][param2][param3]
collection.update_one(param4, {"$set": param5}, upsert=True)
return True

View File

@ -1,95 +0,0 @@
import functools
from flask import request
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
from common.db import find_data
TOKEN_KEY = "P0eHym@&CbaLzWkq"
def api_secret(func):
"""
校验接口请求密钥
"""
secret = "EZgo9ykxrYuBMYnYmmKIh" # 接口密钥
@functools.wraps(func)
def internal(*args, **kwargs):
try:
token = request.headers.get('secret')
if token != secret:
return {"info": "接口密钥错误"}, 401
except Exception:
return {"info": "请求异常"}, 401
return func(*args, **kwargs)
return internal
def api_verification_code(func):
"""
检查验证码
"""
v_client = "tfse_admin" # 校验数据库服务
v_database = "用户" # 校验数据库名称
v_collection = "验证记录" # 校验数据表名称
@functools.wraps(func)
def internal(*args, **kwargs):
try:
email = request.json['email']
vcode = request.json['vcode']
res = find_data(v_client, v_database, v_collection, {"email": email})
if len(res) == 0:
return {"info": "验证码错误"}, 200
if res[0]['vcode'] == vcode:
pass
else:
return {"info": "验证码错误"}, 200
except Exception:
return {"info": "请求异常"}, 200
return func(*args, **kwargs)
return internal
def verify_token(func):
"""
校验token
Returns:
返回token被解析后的值
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
s = Serializer(TOKEN_KEY)
token = s.loads(request.headers.get('token'))
except TypeError:
return {"info": "参数错误"}, 401
except KeyError:
return {"info": "参数错误"}, 401
except BadSignature:
return {"info": "token错误"}, 401
except SignatureExpired:
return {"info": "token过期"}, 401
return func(*args, **kwargs, token=token)
return internal
def create_token(param):
"""
创建token
Parameters:
param: 传入参数用于创建token
Returns:
token: 用户访问令牌
"""
token_expiration = 14400
s = Serializer(TOKEN_KEY, expires_in=token_expiration)
token = '' + s.dumps(param).decode('ascii')
return token

View File

@ -1,75 +0,0 @@
import os
import base64
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
def get_key(**kwargs):
"""
获取密钥
Parameters:
key_file str 密钥文件名称
Returns:
key 密钥
"""
key_file = kwargs['key_file'] # 密钥文件
key_path = os.path.abspath(os.path.abspath(os.path.dirname(__file__)) + '/keys/' + key_file) # 密钥文件绝对路径
with open(key_path) as f:
data = f.read()
key = RSA.importKey(data)
return key
def encrypt_data(**kwargs):
"""
用公钥加密
Parameters:
msg str 待加密信息
Returns:
加密后结果
"""
msg = kwargs['msg'] # 待加密信息
public_key = get_key(key_file='rsa_public_key.pem') # 获取密钥
cipher = PKCS1_cipher.new(public_key)
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
return encrypt_text.decode('utf-8')
def decrypt_data(**kwargs):
"""
用私钥解密
Parameters:
encrypt_msg str 加密信息
Returns:
执行正确 解密后结果
执行错误 False
"""
try:
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
private_key = get_key(key_file='rsa_private_key.pem') # 获取密钥
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
except Exception:
return False
def check_data(**kwargs):
"""
对比数据和加密数据
Parameters:
msg str 数据
encrypt_msg str 加密数据
Returns:
True 对比正确
False 对比错误
"""
msg = kwargs['msg']
decrypt_msg = decrypt_data(encrypt_msg=kwargs['encrypt_msg'])
if decrypt_msg is False:
return False
elif msg == decrypt_msg:
return True
else:
return False

View File

@ -1,44 +0,0 @@
import os
import json
from setting import APP_NAME
def real_file_path(param):
"""
返回文件真实路径
避免因为文件路径问题带来的麻烦
Parameters:
param: 文件相对路径
Returns:
path: 文件真实路径
"""
abs_path = os.path.abspath(os.path.dirname(__file__))
rel_path = abs_path[:abs_path.find(APP_NAME) + len(APP_NAME)]
path = os.path.abspath(rel_path + param)
return path
def read_json_file(param):
"""
读取json文件
Parameters:
param: json文件相对路径
Returns:
result: json内容
"""
with open(real_file_path(param), "r", encoding='utf-8') as f:
result = json.load(f)
return result
def sub_dict(param1, param2):
"""
获取字典的子集
Parameters:
param1: 原字典
param2: 子集字段
Returns:
子集
"""
return dict((key, value) for key, value in param1.items() if key in param2)

View File

@ -1,5 +1,5 @@
from flask import Blueprint, request
from common.security.APIAuth import api_secret
from common.APIAuth import api_secret
from company.Company import Company
company_route = Blueprint('company', __name__)

View File

@ -1,6 +1,6 @@
from flask import Blueprint, request, Response
from common.security.APIAuth import api_secret
from common.APIAuth import api_secret
from file.TfseFile import TfseFile
file_route = Blueprint('file', __name__)

View File

@ -1,5 +1,5 @@
from flask import Blueprint, request
from common.security.APIAuth import api_secret
from common.APIAuth import api_secret
from macro.Macro import Macro
macro_route = Blueprint('macro', __name__)

View File

@ -1,5 +1,5 @@
from flask import Blueprint, request
from common.security.APIAuth import api_secret
from common.APIAuth import api_secret
from rating.Rating import Rating
rating_route = Blueprint('rating', __name__)

View File

@ -0,0 +1,10 @@
{
"tfse_admin": {
"user": "root",
"pwd": "RYIHrqml#LSW6#!*",
"host": "116.63.130.34",
"port": "27020"
}
}

View File

@ -1,9 +1,8 @@
{
"admin": ["Board", "Manage", "Model", "Test", "Setting"],
"member": {
"developer": ["Board", "Manage", "Model", "Test", "Setting"],
"analysts": ["Board", "Manage", "Model"],
"manager": ["Board", "Manage", "Model"]
},
"developer": ["Board", "Manage", "Model", "Test", "Setting"],
"analysts": ["Board", "Manage", "Model"],
"manager": ["Board", "Manage", "Model"],
"guest": ["Board"]
}

View File

@ -0,0 +1,93 @@
import os
import json
import pymongo
with open(os.path.abspath(os.path.dirname(__file__)+'/static/db_setting.json')) as f:
db = json.load(f)
tfse_admin = db['tfse_admin']
clients = {
"tfse_admin": pymongo.MongoClient('mongodb://{}:{}@{}:{}'.format(tfse_admin['user'], tfse_admin['pwd'], tfse_admin['host'], tfse_admin['port']))
}
def insert_data(param1, param2, param3, param4):
"""
插入一条数据
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 插入数据
Returns:
True 执行成功
False 执行失败
"""
collection = clients[param1][param2][param3]
collection.insert_one(param4)
return True
def delete_data(param1, param2, param3, param4):
"""
根据查询条件删除一条文档
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
Returns:
res: True 执行完成
"""
collection = clients[param1][param2][param3]
collection.delete_one(param4)
return True
def update_data(param1, param2, param3, param4, param5):
"""
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
param5: obj 插入数据
Returns:
-
"""
collection = clients[param1][param2][param3]
collection.update_one(param4, param5)
return True
def update_data_upsert(param1, param2, param3, param4, param5):
"""
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
param5: obj 插入数据
Returns:
-
"""
collection = clients[param1][param2][param3]
collection.update_one(param4, {"$set": param5}, upsert=True)
return True
def find_data(param1, param2, param3, param4):
"""
根据查询条件查询符合条件的所有数据
Parameters:
param1: str 服务器
param2: str 数据库
param3: str 数据集
param4: obj 查询条件
Returns:
res: list 符合条件的数据
"""
collection = clients[param1][param2][param3]
records = list(collection.find(param4, {'_id': False}))
return records

View File

@ -1,18 +1,16 @@
import json
import os
import time
import random
import requests
from werkzeug.security import generate_password_hash, check_password_hash
from common.db import insert_data, find_data, update_data_upsert
from common.utils import read_json_file
from common.security.APIAuth import create_token
from user.user_utils import check_mail_fmt, check_pwd_fmt
from common.security.rsa import decrypt_data
from user.user_db import insert_data, find_data, update_data_upsert, delete_data
from user.user_utils import check_mail_fmt, check_pwd_fmt, create_token, decrypt_data
def create_user_process(email, name, pwd, role, duty):
def create_user_impl(email, name, pwd, role):
"""
创建新用户流程
Parameters:
@ -20,7 +18,6 @@ def create_user_process(email, name, pwd, role, duty):
name str 姓名
pwd str 密码
role str 角色
duty str 职责
Returns:
执行成功 bool True
执行失败 str 异常信息
@ -29,14 +26,10 @@ def create_user_process(email, name, pwd, role, duty):
"""
参数检查
"""
roles = ['admin', 'member', 'guest']
roles = ['admin', 'developer', 'analysts', 'manager', 'guest']
if role not in roles:
return "用户角色异常"
duties = ['developer', 'analysts', 'manager', None]
if duty not in duties:
return "成员职责异常"
if not check_mail_fmt(email):
return "邮箱格式错误"
@ -79,7 +72,7 @@ def create_user_process(email, name, pwd, role, duty):
return gen_id()
def process_main():
def start_impl():
"""
执行流程
"""
@ -94,17 +87,16 @@ def create_user_process(email, name, pwd, role, duty):
user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['status'] = 'normal'
user['role'] = role
user['duty'] = duty
user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
insert_data("tfse_admin", "用户", "用户信息", user)
return True
return process_main()
return start_impl()
def login_process(email, pwd):
def login_impl(email, pwd, vcode):
user_info = find_data('tfse_admin', '用户', '用户信息', {"email": email})
def check_email():
@ -114,6 +106,22 @@ def login_process(email, pwd):
return "邮箱格式错误"
return True
def check_vcode():
records = find_data("tfse_admin", "用户", "验证记录", {"email": email})
if len(records) == 0:
return "无验证信息"
if records[0]['vcode'] != vcode:
return "验证码错误"
if time.time() - records[0]['timestamp'] > 300:
return "验证码过期"
delete_data("tfse_admin", "用户", "验证记录", {"email": email})
return True
def check_pwd():
hash_pwd = user_info[0]['pwd']
try:
@ -121,38 +129,50 @@ def login_process(email, pwd):
return "密码错误"
except Exception:
return "密码错误"
return True
def check_menus():
def make_menus():
role = user_info[0]['role']
duty = user_info[0]['duty']
duties = read_json_file('/user/static/menus.json')
if role == 'admin':
return duties[role]
elif role == 'member':
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
if role == 'member':
return duties[role][duty]
else:
return duties[role]
def gen_res_data():
def make_session_id():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
update_data_upsert("tfse_admin", "用户", "token记录", {"UID": user_info[0]['UID']}, {"session_id": session_id})
return session_id
def make_result_data():
res_data = dict()
res_data['token'] = create_token({"UID": user_info[0]['UID']})
res_data['menus'] = check_menus()
res_data['token'] = create_token(make_session_id())
res_data['menus'] = make_menus()
return res_data
def main_process():
res = check_email()
if res is not True:
return res
def start_impl():
result = check_email()
if result is not True:
return result
res = check_pwd()
if res is not True:
return res
result = check_pwd()
if result is not True:
return result
return gen_res_data()
result = check_vcode()
if result is not True:
return result
return main_process()
return make_result_data()
return start_impl()
def send_vcode_to_user_process(email):
@ -182,7 +202,7 @@ def send_vcode_to_user_process(email):
update_data_upsert('tfse_admin', '用户', '验证记录', {"email": email}, {"vcode": vcode, "timestamp": timestamp})
return True
def main_process():
def start_process():
res = check_param()
if res is not True:
return res
@ -193,4 +213,4 @@ def send_vcode_to_user_process(email):
return True
return main_process()
return start_process()

View File

@ -1,4 +1,4 @@
from user.user_impl import create_user_process, login_process, send_vcode_to_user_process
from user.user_impl import create_user_impl, login_impl, send_vcode_to_user_process
class User:
@ -19,7 +19,6 @@ class User:
self.name = None
self.pwd = None
self.role = None
self.duty = None
class UserManage(User):
@ -34,7 +33,7 @@ class UserManage(User):
创建成功 bool 返回True
创建失败 str 返回异常信息
"""
return create_user_process(self.email, self.name, self.pwd, self.role, self.duty)
return create_user_impl(self.email, self.name, self.pwd, self.role)
# 禁用用户
def disable_user(self):
@ -67,11 +66,12 @@ class UserOperation(User):
super().__init__()
self.token = None
self.block = None
self.vcode = None
self.session_id = None
# 登录
def login(self):
"""
用户登录
"""
return login_process(self.email, self.pwd)
return login_impl(self.email, self.pwd, self.vcode)

View File

@ -1,32 +1,32 @@
from flask import Blueprint, request
from common.security.APIAuth import api_secret, api_verification_code, verify_token
from user.user_obj import UserManage, UserOperation
from user.user_utils import verify_token, authority_scope
user_route = Blueprint('user', __name__)
@user_route.route('/create_user', methods=['POST'])
@verify_token
@authority_scope(['admin'])
def create_user_route(**kwargs):
"""
新建用户
"""
req = request.json
um = UserManage()
um.email = request.json['email']
um.name = request.json['name']
um.pwd = request.json['pwd']
um.role = request.json['role']
um.duty = request.json['duty']
res = um.create_user()
if res is True:
um.email, um.name, um.pwd, um.role = req['email'], req['name'], req['pwd'], req['role']
result = um.create_user()
if result is True:
return {"info": "新建用户成功"}, 200
else:
return {"info": res}, 200
return {"info": result}, 200
@user_route.route('/send_vcode_to_user', methods=['POST'])
@api_secret
def send_vcode_to_user_route():
"""
发送验证码
@ -41,16 +41,17 @@ def send_vcode_to_user_route():
@user_route.route('/login', methods=['POST'])
@api_verification_code
def login_route():
"""
登录
"""
req = request.json
uo = UserOperation()
uo.email = request.json['email']
uo.pwd = request.json['pwd']
res = uo.login()
if type(res) == dict:
return {"info": "登录成功", "result": res}, 200
uo.email, uo.pwd, uo.vcode = req['email'], req['pwd'], req['vcode']
result = uo.login()
if type(result) == dict:
return {"info": "登录成功", "result": result}, 200
else:
return {"info": res}, 200
return {"info": result}, 200

View File

@ -1,6 +1,18 @@
import os
import re
import base64
import functools
from common.security.rsa import decrypt_data
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
from flask import request
from itsdangerous import Serializer
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
from user.user_db import find_data
TOKEN_KEY = "P0eHym@&CbaLzWkq"
def check_mail_fmt(email):
@ -34,3 +46,110 @@ def check_pwd_fmt(pwd):
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
return result
def create_token(param):
"""
创建token
Parameters:
param: 传入参数用于创建token
Returns:
token: 用户访问令牌
"""
token_expiration = 14400
s = Serializer(TOKEN_KEY, expires_in=token_expiration)
token = '' + s.dumps(param).decode('ascii')
return token
def verify_token(func):
"""
校验token
Returns:
返回token被解析后的值
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
s = Serializer(TOKEN_KEY)
session_id = s.loads(request.headers.get('token'))
records = find_data("tfse_admin", "用户", "token记录", {"session_id": session_id})
if not records:
return {"info": "提示: 账号已在别处登录"}, 401
uid = records[0]['UID']
except TypeError:
return {"info": "验证参数错误"}, 401
except KeyError:
return {"info": "验证参数错误"}, 401
except BadSignature:
return {"info": "token错误"}, 401
except SignatureExpired:
return {"info": "token过期"}, 401
return func(*args, **kwargs, uid=uid)
return internal
def authority_scope(scope):
def decorate(func):
def internal(*args, ** kwargs):
records = find_data("tfse_admin", "用户", "用户信息", {"UID": kwargs['uid']})
if not records:
return {"info": "提示: 不存在该用户"}, 401
if records[0]['status'] != "normal":
return {"info": "提示: 该账户已被禁用"}, 401
if records[0]['role'] not in scope:
return {"info": "提示: 该账户没有权限"}, 401
return func(*args, ** kwargs)
return internal
return decorate
def encrypt_data(**kwargs):
"""
用公钥加密
Parameters:
msg str 待加密信息
Returns:
加密后结果
"""
msg = kwargs['msg'] # 待加密信息
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f:
data = f.read()
public_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(public_key)
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
return encrypt_text.decode('utf-8')
def decrypt_data(**kwargs):
"""
用私钥解密
Parameters:
encrypt_msg str 加密信息
Returns:
执行正确 解密后结果
执行错误 False
"""
try:
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f:
data = f.read()
private_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
except Exception:
return False