dataplatform/Utils/DataBase/MongoHelperUtils.py

78 lines
2.8 KiB
Python
Raw Normal View History

2022-11-17 15:18:34 +08:00
import re
import os
import json
import pymongo
from urllib import parse
from bson import ObjectId
class MongoHelper:
def __init__(self, param):
"""
param:
typestr
desc: 选择连接哪个MongoDB数据库
"""
with open(os.path.abspath(os.path.dirname(__file__) + '/DBConfig.json')) as f:
db_configs = json.load(f)
this_mongo_cfg = db_configs['MongoDB'][param]
m = re.match(r'([\s\S].*?):([\s\S].*)@([\s\S].*)', this_mongo_cfg)
parsed_mongo_config = "{}:{}@{}".format(parse.quote_plus(m.group(1)), parse.quote_plus(m.group(2)),
m.group(3))
self.client = pymongo.MongoClient('mongodb://{}'.format(parsed_mongo_config))
def insert_data(self, dbname: str, sheet: str, data: dict):
collection = self.client[dbname][sheet]
item = collection.insert_one(data)
return item.inserted_id.__str__()
def upsert_data(self, dbname: str, sheet: str, condition: dict, data: dict):
collection = self.client[dbname][sheet]
collection.update_one(condition, {"$set": data}, upsert=True)
_id = collection.find_one(condition).get('_id', None)
return _id.__str__()
def delete_data_by_id(self, dbname: str, sheet: str, _id: str):
collection = self.client[dbname][sheet]
collection.delete_one({'_id': ObjectId(_id)})
return True
def find_data_by_id(self, dbname: str, sheet: str, _id: str):
collection = self.client[dbname][sheet]
return collection.find_one({'_id': ObjectId(_id)}, {"_id": False})
def update_data_by_id(self, dbname: str, sheet: str, _id: str, data: dict):
collection = self.client[dbname][sheet]
collection.update_one({'_id': ObjectId(_id)}, {"$set": data})
return True
def find_data(self, dbname: str, sheet: str, search: dict, field: list):
collection = self.client[dbname][sheet]
columns = {**{'_id': False}, **dict(zip(field, [1] * len(field)))}
record = list(collection.find(search, columns))
return False if record == [] else record[0]
def find_all_data(self, dbname: str, sheet: str, search: dict, field: list):
collection = self.client[dbname][sheet]
columns = {**{'_id': False}, **dict(zip(field, [1] * len(field)))}
record = list(collection.find(search, columns))
return False if record == [] else record
def update_data(self, dbname: str, sheet: str, search: dict, field: dict):
collection = self.client[dbname][sheet]
collection.update_one(search, {"$set": field})
return True
def get_mongodb():
db = MongoHelper("test")
return db
def get_tyc_mongodb():
db = MongoHelper("tyc")
return db