import os import json import functools import requests from flask import request from setting import APP_NAME, API_SECRET def real_file_path(param): """ 返回文件真实路径 避免因为文件路径问题带来的麻烦 Parameters: param: 文件相对路径 Returns: path: 文件真实路径 """ abs_path = os.path.abspath(os.path.dirname(__file__)) rel_path = abs_path[:abs_path.find(APP_NAME) + len(APP_NAME)] path = os.path.abspath(rel_path + param) return path def read_json_file(param): """ 读取json文件 Parameters: param: json文件相对路径 Returns: result: json内容 """ with open(real_file_path(param), "r", encoding='utf-8') as f: result = json.load(f) return result def verify_token(func): """ 校验token """ @functools.wraps(func) def internal(*args, **kwargs): try: token = request.headers.get('token') if token != API_SECRET: return {"info": "接口密钥错误"}, 401 except Exception: return {"info": "请求异常"}, 401 return func(*args, **kwargs) return internal def sub_dict(param1, param2): """ 获取字典的子集 Parameters: param1: 原字典 param2: 子集字段 Returns: 子集 """ return dict((key, value) for key, value in param1.items() if key in param2) def update_process_status(rid, types, update): """ 调用更新流程接口 Parameters: rid 评价id types 综合评价流程/ESG评价流程 update dict 需要更新的数据 Returns: result str 更新结果 """ url = "http://api.fecribd.com/etl_tfse/company/update_process_data" token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4" headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'} data = json.dumps({"rid": rid, "types": types, "renew_data": update}) requests.post(url, headers=headers, data=data)