from context.common import common_db from mods.rate.utils.rate_utils import sheet_parse from mods.user.router import * from mods.user.mods.user.models import User from utils.sqlalchemy_common_utils import QueryParams, query_common def run_c(): db = common_db.get_db_i() # d2=Department(id=3,name='d2') # d = db.query(Department).filter_by(id=1).first() # u = User(name="xxx2", departments={d,d2}) q = QueryParams.parse_obj({'include': ['name', 'id'], 'ex_include': ['id'], 'page': 1, 'page_size': 10}) count, query = query_common(db, User, q) for u in query: print(u.to_full_dict(include=['name', 'id'], ex_include='id')) # print(u.to_dict()) db.close() def run_d(): db = common_db.get_db_i() u = db.query(User).filter(User.name == '伍春全').first() if not u.other_info: u.other_info = u.__class__.other_info.property.entity.entity(address="xxx") else: u.other_info.address = "xcxcxcxc" db.commit() db.close() # def run_a(): # db = common_db.get_db_i() # user_update(db, UserUpdate(id='c4316ba243c442aca42e37dbe0d8f476', departments=[1, 2, 3])) # db.close() # # # def run_b(): # db = common_db.get_db_i() # d = db.query(Department).first() # u = db.query(User).first() # u.departments.add(d) # db.commit() # db.close() # # run_b() # run_a() # run_c() # from mods.user.comment.models import Comment # # print(Comment.model_config) def test_json_manage(): """ [mongo] host = 127.0.0.1 port = 27017 db = 三方数据 ;user = fecr ;password = fecr1988.wcq :return: """ from utils.json_data_utils import JsonDataManage, json_schema_to_pydantic from pymongo import MongoClient client = MongoClient(host="127.0.0.1", port=27017) jdm = JsonDataManage(client) jdm.load_schemas() json_schema_to_pydantic({'title': 'User', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, 'age': {'title': 'Age', 'type': 'integer'}}, 'required': ['name', 'age']}, 'User') # jdm.add_schema("用户", {'title': 'User', 'type': 'object', # 'properties': {'name': {'title': 'Name', 'type': 'string'}, # 'age': {'title': 'Age', 'type': 'integer'}}, # 'required': ['name', 'age']} # ) jdm.insert_data('用户', {"name": "张三", "age": 11}, index={"company_name": "远东"}, check_repeated=True) def test_user_json(): db = common_db.get_db_i() # d2=Department(id=3,name='d2') # d = db.query(Department).filter_by(id=1).first() # u = User(name="xxx2", departments={d,d2}) q = QueryParams.parse_obj({'include': ['name', 'id'], 'ex_include': ['id'], 'page': 1, 'page_size': 10}) count, query, page, page_size = query_common(db, User, q) query[0].config # test_user_json() # test_json_manage() # run_d() import re def replace_words(content, word_list): # 对词语列表中的每个词,使用正则表达式找到所有的匹配项,并将它们替换为词语列表中的形式 for word in word_list: content = re.sub(rf"(? dict: import pandas as pd # Load “经营问卷” data from Excel business_data = pd.read_excel(excel_path, sheet_name="经营问卷") business_dict = {row["问题"]: row["填写(文本类无则不填写)"] for _, row in business_data.iterrows()} for k, v in business_dict.items(): if pd.isna(v): business_dict[k] = None # Load “财务问卷” data from Excel finance_data = pd.read_excel(excel_path, sheet_name="财务问卷") # Convert multi-year data into a nested dictionary format finance_report_dict = {} report = None for i, item in enumerate(finance_data['报表']): if item in ["单位", "是否审计", "会计事务所"]: if item in ["单位"]: continue if item in ["是否审计", "会计事务所"]: finance_report_dict[item] = finance_data['科目'][i] continue if not pd.isna(item): report = item finance_report_dict[report] = {} subject = finance_data['科目'][i] finance_report_dict[report][subject] = [ finance_data['2020年'][i], finance_data['2021年'][i], finance_data['2022年'][i], ] if pd.isna(subject): continue data_dict = { "经营问卷": business_dict, "财务问卷": finance_report_dict } return data_dict # # # Use the function to extract data # extracted_data = extract_data_to_json("填报问卷 (2).xlsx") # # print(extracted_data) def sheet_parsesheet_parse(): sheet_parse(r"C:\Users\wcq\Desktop\北京探路者户外用品股份有限公司.xlsx") # sheet_parsesheet_parse() def tasd(): from context.common import common_db_v2 db = common_db_v2.get_db_i() from mods.rate.mods.rate_serve.utils import RateServeFlowNodeCheck RateServeFlowNodeCheck.rate_post_data_check_node(db, 'XP23091648527424WFWOA') # tasd()