wd-smebiz/the_test.py

205 lines
6.7 KiB
Python
Raw Normal View History

2023-08-02 14:24:28 +08:00
from context.common import common_db
from mods.user.router import *
from mods.user.mods.user.models import User
from utils.sqlalchemy_common_utils import QueryParams, query_common
def run_c():
db = common_db.get_db_i()
# d2=Department(id=3,name='d2')
# d = db.query(Department).filter_by(id=1).first()
# u = User(name="xxx2", departments={d,d2})
q = QueryParams.parse_obj({'include': ['name', 'id'], 'ex_include': ['id'], 'page': 1, 'page_size': 10})
count, query = query_common(db, User, q)
for u in query:
print(u.to_full_dict(include=['name', 'id'], ex_include='id'))
# print(u.to_dict())
db.close()
def run_d():
db = common_db.get_db_i()
u = db.query(User).filter(User.name == '伍春全').first()
if not u.other_info:
u.other_info = u.__class__.other_info.property.entity.entity(address="xxx")
else:
u.other_info.address = "xcxcxcxc"
db.commit()
db.close()
# def run_a():
# db = common_db.get_db_i()
# user_update(db, UserUpdate(id='c4316ba243c442aca42e37dbe0d8f476', departments=[1, 2, 3]))
# db.close()
#
#
# def run_b():
# db = common_db.get_db_i()
# d = db.query(Department).first()
# u = db.query(User).first()
# u.departments.add(d)
# db.commit()
# db.close()
#
# run_b()
# run_a()
# run_c()
# from mods.user.comment.models import Comment
#
# print(Comment.model_config)
def test_json_manage():
"""
[mongo]
host = 127.0.0.1
port = 27017
db = 三方数据
;user = fecr
;password = fecr1988.wcq
:return:
"""
from utils.json_data_utils import JsonDataManage, json_schema_to_pydantic
from pymongo import MongoClient
client = MongoClient(host="127.0.0.1", port=27017)
jdm = JsonDataManage(client)
jdm.load_schemas()
json_schema_to_pydantic({'title': 'User', 'type': 'object',
'properties': {'name': {'title': 'Name', 'type': 'string'},
'age': {'title': 'Age', 'type': 'integer'}},
'required': ['name', 'age']}, 'User')
# jdm.add_schema("用户", {'title': 'User', 'type': 'object',
# 'properties': {'name': {'title': 'Name', 'type': 'string'},
# 'age': {'title': 'Age', 'type': 'integer'}},
# 'required': ['name', 'age']}
# )
jdm.insert_data('用户', {"name": "张三", "age": 11}, index={"company_name": "远东"}, check_repeated=True)
def test_user_json():
db = common_db.get_db_i()
# d2=Department(id=3,name='d2')
# d = db.query(Department).filter_by(id=1).first()
# u = User(name="xxx2", departments={d,d2})
q = QueryParams.parse_obj({'include': ['name', 'id'], 'ex_include': ['id'], 'page': 1, 'page_size': 10})
count, query, page, page_size = query_common(db, User, q)
query[0].config
# test_user_json()
# test_json_manage()
# run_d()
import re
def replace_words(content, word_list):
# 对词语列表中的每个词,使用正则表达式找到所有的匹配项,并将它们替换为词语列表中的形式
for word in word_list:
content = re.sub(rf"(?<![A-Za-z]){word}(?![A-Za-z])", word, content, flags=re.IGNORECASE)
return content
# 测试函数
# content = "使用Absasd和abs还有Gep和GEP"
# word_list = ["ABs", "GEP"]
# new_content = replace_words(content, word_list)
# print(new_content)
import inspect
def test_code_make():
from mods.uir.models import CompanyIndex, CompanyPlusProperty, CompanyBaseInfo
# print(inspect.getsource(User))
from utils.code_make_utils.make_from_model import make_code_from_model
# make_code_from_model("utils/code_make_utils/templates/crud", "temp", User, "用户")
make_code_from_model("utils/code_make_utils/templates/crud_without_model", "mods/uir", CompanyIndex, "企业索引")
make_code_from_model("utils/code_make_utils/templates/crud_without_model", "mods/uir", CompanyPlusProperty,
"企业属性")
make_code_from_model("utils/code_make_utils/templates/crud_without_model", "mods/uir", CompanyBaseInfo,
"企业工商信息")
def test_code_make2():
from mods.uir.models import CompanyIndex, CompanyPlusProperty, CompanyBaseInfo
# print(inspect.getsource(User))
from utils.code_make_utils.make_from_model import make_mod_from_models
make_mod_from_models("uir", "mods",
[
(CompanyIndex, "企业索引"),
(CompanyPlusProperty, "企业属性"),
(CompanyBaseInfo, "企业工商信息"),
],
)
2023-08-11 14:12:51 +08:00
# test_code_make2()
2023-08-02 14:24:28 +08:00
def test_make_table_model():
from mods.uir.models import CompanyIndex, CompanyPlusProperty, CompanyBaseInfo
# print(inspect.getsource(User))
from utils.code_make_utils.make_from_model import make_mod_from_models, make_table_model
2023-08-11 14:12:51 +08:00
make_table_model(CompanyIndex, "企业索引")
make_table_model(CompanyPlusProperty, "企业属性")
make_table_model(CompanyBaseInfo, "企业工商信息")
# test_make_table_model()
def extract_data_to_json(excel_path: str) -> dict:
import pandas as pd
# Load “经营问卷” data from Excel
business_data = pd.read_excel(excel_path, sheet_name="经营问卷")
business_dict = {row["问题"]: row["填写(文本类无则不填写)"] for _, row in business_data.iterrows()}
for k, v in business_dict.items():
if pd.isna(v):
business_dict[k] = None
# Load “财务问卷” data from Excel
finance_data = pd.read_excel(excel_path, sheet_name="财务问卷")
# Convert multi-year data into a nested dictionary format
finance_report_dict = {}
report = None
for i, item in enumerate(finance_data['报表']):
if item in ["单位", "是否审计", "会计事务所"]:
if item in ["单位"]:
continue
if item in ["是否审计", "会计事务所"]:
finance_report_dict[item] = finance_data['科目'][i]
continue
if not pd.isna(item):
report = item
finance_report_dict[report] = {}
subject = finance_data['科目'][i]
finance_report_dict[report][subject] = [
finance_data['2020年'][i],
finance_data['2021年'][i],
finance_data['2022年'][i],
]
if pd.isna(subject):
continue
data_dict = {
"经营问卷": business_dict,
"财务问卷": finance_report_dict
}
return data_dict
# Use the function to extract data
extracted_data = extract_data_to_json("填报问卷 (2).xlsx")
print(extracted_data)