import re import os import json import gridfs import pymongo from urllib import parse from bson import ObjectId from gridfs import GridFS from Implements.Certificate.CertificatePathImpl import get_gen_certificate_path from Implements.Report.ReportPathImpl import get_gen_report_path class MongoHelper: def __init__(self, param): """ param: type:str desc: 选择连接哪个MongoDB数据库 """ with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f: db_configs = json.load(f) this_mongo_cfg = db_configs['MongoDB'][param] m = re.match('([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg) parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)), m.group(3)) self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config)) def find_single_column(self, param1, param2, param3, param4): """ 查询符合条件的第一条数据的某个指定字段值 param2: str 数据表 param3: dict 查询条件 param4: str 一个指定查询字段 return: type: None or dict desc: 查询结果为空,返回None; 查询结果正常,返回查询结果的第一条数据; """ collection = self.client[param1][param2] column = {**{'_id': False}, **{param4: 1}} record = list(collection.find(param3, column)) return None if record == [] else record[0][param4] def find_single_data(self, param1, param2, param3, param4): """ 查询符合条件的第一条数据 param1: str 数据库 param2: str 数据表 param3: dict 查询条件 param4: list 查询字段 return: type: bool or dict desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据; """ collection = self.client[param1][param2] columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} record = list(collection.find(param3, columns)) return False if record == [] else record[0] def find_single_data_with_single_sort(self, param1, param2, param3, param4, param5): """ 查询符合条件的第一条数据,按单个排序条件返回 param1: str 数据库 param2: str 数据表 param3: dict 查询条件 param4: list 查询字段 param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回 return: type: bool or dict desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据; """ collection = self.client[param1][param2] columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(1)) return False if record == [] else record[0] def find_all_data(self, param1, param2, param3, param4): """ 查询符合条件的所有数据 param1: str 数据库 param2: str 数据表 param3: dict 查询条件 param4: list 查询字段 return: type: list desc: 查询结果 """ collection = self.client[param1][param2] columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} record = list(collection.find(param3, columns)) return record def find_all_data_with_count(self, param1, param2, param3): """ 查询所有符合条件的数据,并返回统计数量 param1: str 数据库 param2: str 数据表 param3: str 查询条件 return: int 符合条件的数据数量 """ collection = self.client[param1][param2] num = collection.find(param3).count() return num def find_all_data_with_single_sort(self, param1, param2, param3, param4, param5): """ 查询符合条件的数据,按单个排序条件返回 param1: str 数据库 param2: str 数据表 param3: dict 查询条件 param4: list 查询字段 param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回 return: type: bool or dict desc: 查询结果为空,返回False; 查询结果正常,返回查询结果的第一条数据; """ collection = self.client[param1][param2] columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0])) return False if record == [] else record def find_data_with_aggregate(self, param1, param2, param3): """ 根据聚合条件查询 param1: str 数据库 param2: str 数据集 param3: type: list desc: 聚合条件 demo: [{'$match':{'price':{'$gte':50}}}, {'$group': {'_id': "$fName", 'count': {'$sum': 1}}}] """ collection = self.client[param1][param2] data = list(collection.aggregate(param3)) return data def find_data_by_page_with_sort(self, param1, param2, param3, param4, param5, param6, param7): """ 根据聚合翻页查询,且按照需求字段排序返回 param1: str 数据库 param2: str 数据集 param3: dict 查询条件 param4: list 显示字段 param5: dict 排序条件 例如 {"name": 1} 表示按照name字段正序返回 param6: int 即 page_size 每页数据条数 param7: int 即 page_no 当前页码 """ collection = self.client[param1][param2] columns = {**{'_id': False}, **dict(zip(param4, [1] * len(param4)))} page_size = int(param6) page_no = int(param7) skip_num = page_size * (page_no - 1) record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(page_size).skip(skip_num)) return False if record == [] else record def insert_single_data(self, param1, param2, param3): """ 插入一条数据 param1: str 数据库 param2: str 数据集 param3: obj 插入数据 return: None """ collection = self.client[param1][param2] collection.insert_one(param3) def upsert_single_data(self, param1, param2, param3, param4): """ 插入单条数据 param1: str 数据库 param2: str 数据表 param3: dict 查询条件 param4: dict 更新或新插入的数据 return: None """ collection = self.client[param1][param2] collection.update_one(param3, {"$set": param4}, upsert=True) def update_single_data(self, param1, param2, param3, param4): """ 插入单条数据 param1: str 数据库 param2: str 数据表 param3: dict 查询条件 param4: dict 更新或新插入的数据 return: None """ collection = self.client[param1][param2] collection.update_one(param3, {"$set": param4}) def delete_single_data(self, param1, param2, param3): """ 根据查询条件删除一条文档 param1: str 数据库 param2: str 数据集 param3: obj 查询条件 return: None """ collection = self.client[param1][param2] collection.delete_one(param3) return True def upsert_file(self, param1, param2, param3): """ 根据名称保存该企业报告 param1: str 数据库 param2: str 数据集 param3: str 文件名称 Returns: pdf_id: str 文件id """ try: path = get_gen_report_path(name=param3) database = param1 collection = param2 fs = GridFS(self.client[database], collection) content = open(path, 'rb').read() pdf_id = fs.put(content, content_type='application/pdf', filename=param3) return str(pdf_id) except Exception: return False def upsert_certificate_file(self, param1, param2, param3): """ 根据名称保存该企业报告 param1: str 数据库 param2: str 数据集 param3: str 文件名称 Returns: pdf_id: str 文件id """ try: path = get_gen_certificate_path(name=param3) database = param1 collection = param2 fs = GridFS(self.client[database], collection) content = open(path, 'rb').read() pdf_id = fs.put(content, content_type='application/pdf', filename=param3) return str(pdf_id) except Exception: return False def find_file(self, param1, param2, param3): """ 读取一个文件 param1: str 数据库 param2: str 存储桶 param3: str 文件id return: type: binary? desc: 二进制文件流 """ try: # 实例化一个文件存储器 gfs = GridFS(self.client[param1], collection=param2) # 二进制读取文件 data_stream = gfs.get(ObjectId(param3)).read() # 返回文件二进制流 return data_stream except gridfs.errors.NoFile: return False def delete_file(self, param1, param2, param3): """ 根据id删除文件 param1: str 数据库 param2: str 存储桶 param3: str 文件fid """ fs = GridFS(self.client[param1], param2) fs.delete(ObjectId(param3))