daily/Utils/DataToDbUtils.py

190 lines
6.9 KiB
Python
Raw Normal View History

2023-03-02 09:27:05 +08:00
import pandas as pd
2023-04-04 17:08:38 +08:00
from sqlalchemy.orm import Session
2023-03-02 09:27:05 +08:00
from Utils.SqlAlchemyUtils import get_db_i
from Models.PostModel import Post
from Models.DepartmentModel import Department
2023-04-04 17:08:38 +08:00
from Models.UserModel import UserInfo, User
2023-03-07 16:42:34 +08:00
from Models.DailyModel import Daily
2023-03-02 09:27:05 +08:00
import json
def user_table_to_db():
data = json.load(open("Config/Company1.json", "r", encoding="utf-8"))
db = get_db_i()
d_name_dic = {item.name: item.id for item in db.query(Department).all()}
d_id_dic = {item.id: item.name for item in db.query(Department).all()}
# 创建职务表
# db.query(Post).delete()
# for d_type in data:
# item = data[d_type]
# for d in item:
# p_list = item[d]
# for p_name in p_list:
# d_id = d_name_dic[d]
# db.add(Post(belong=d_id, name=p_name))
# db.commit()
items = [item for item in db.query(Post).all()]
post_dic = {}
for item in db.query(Post).all():
if item.belong not in post_dic:
post_dic[item.belong] = {}
if item.name not in post_dic[item.belong]:
post_dic[item.belong][item.name] = item.id
dt = pd.read_excel("远东员工花名册.xlsx")
line_count = dt.shape[0]
for i in range(line_count):
row = dt.loc[i]
email = row["邮箱"]
name = row["姓名"]
d1 = row["部门1"]
d2 = row["部门2"]
d3 = row["部门3"]
p1 = row["职务1"]
p2 = row["职务2"]
p3 = row["职务3"]
ds = set()
ps = set()
if not pd.isna(d1):
print(d1, p1)
ds.add(d_name_dic[d1])
ps.add(post_dic[d_name_dic[d1]][p1])
if not pd.isna(d2):
print(d2, p2)
ds.add(d_name_dic[d2])
ps.add(post_dic[d_name_dic[d2]][p2])
if not pd.isna(d3):
print(d3, p3)
ds.add(d_name_dic[d3])
ps.add(post_dic[d_name_dic[d3]][p3])
d_t = ",".join([str(item) for item in ds])
p_t = ",".join([str(item) for item in ps])
user = UserInfo(email=email, name=name, department=d_t, post=p_t)
db.add(user)
db.commit()
# row = dt.loc[i]
# if row['部门1'] not in post_dic:
# post_dic[[row['部门1']]] = []
# if row['职务1'] not in post_dic[row['部门1']]:
# post_dic[row['部门1']].append(row['职务1'])
#
# if row['部门2'] not in post_dic:
# post_dic[[row['部门2']]] = []
# if row['职务2'] not in post_dic[row['部门2']]:
# post_dic[row['部门2']].append(row['职务2'])
#
# if row['部门3'] not in post_dic:
# post_dic[[row['部门3']]] = []
# if row['职务3'] not in post_dic[row['部门3']]:
# post_dic[row['部门3']].append(row['职务3'])
2023-03-07 16:42:34 +08:00
def load_daily():
2023-04-04 17:08:38 +08:00
db = get_db_i()
data = pd.read_excel('动态类数据.xlsx')
length = data.shape[0]
2023-03-07 16:42:34 +08:00
d_name_dic = {item.name: item.id for item in db.query(Department).all()}
post_dic = {}
for item in db.query(Post).all():
if item.belong not in post_dic:
post_dic[item.belong] = {}
if item.name not in post_dic[item.belong]:
post_dic[item.belong][item.name] = item.id
for i in range(length):
2023-04-04 17:08:38 +08:00
row = data.loc[i].to_dict()
报送类型 = row['报送类型']
报送人 = row['报送人']
报送时间 = row['报送时间']
标题 = row['标题']
标题 = "" if pd.isna(标题) else 标题
正文 = row['正文']
部门 = row['部门']
职务 = row['职务']
email = db.query(User).filter_by(name=报送人).first().email
department_id = d_name_dic[部门]
post_id = post_dic[department_id][职务]
db.add(Daily(type=报送类型, fill_user=email, title=标题, content=正文, post=post_id, department=department_id,
daily_time=报送时间))
db.commit()
def load_user_data(db: Session):
dt = pd.read_excel("远东员工花名册(1).xlsx")
line_count = dt.shape[0]
user_list = []
post_list = []
dp_id_dic = {item.name: item.id for item in db.query(Department).all()}
dt = dt.fillna(value="")
for i in range(line_count):
row = dt.loc[i]
部门1 = row["部门"]
部门2 = row["部门2"]
职务1 = row["职务"]
职务2 = row["职务2"]
user_info = {
"email": row["邮箱"],
"name": row["姓名"],
"manage_departments": row["所看部门"].split(',') if row["所看部门"] else [],
"phone": row["移动电话"],
"department": [item for item in [部门1, 部门2] if item],
"post": [item for item in [[部门1, 职务1], [部门2, 职务2]] if item[0]]
}
post_list.extend(user_info["post"])
user_list.append(user_info)
2023-04-04 17:11:10 +08:00
post_list.sort(key=lambda x: dp_id_dic[x[0]])
db.query(Post).delete()
key_dic = set()
for post in post_list:
key = f"{post[0]},{post[1]}"
if key not in key_dic:
key_dic.add(key)
new_post = Post(name=post[1], belong=dp_id_dic[post[0]])
db.add(new_post)
db.commit()
2023-04-04 17:08:38 +08:00
dp_post_dic = {}
for item in db.query(Post).all():
if not dp_post_dic.get(item.belong):
dp_post_dic[item.belong] = {}
dp_post_dic[item.belong][item.name] = item.id
db.query(User).delete()
for user_info in user_list:
new_user = User()
new_user.email = user_info['email']
new_user.name = user_info['name']
new_user.manage_departments = ",".join([str(dp_id_dic[item]) for item in user_info['manage_departments']])
new_user.post = ",".join([str(dp_post_dic[dp_id_dic[item[0]]][item[1]]) for item in user_info['post']])
new_user.department = ",".join([str(dp_id_dic[item]) for item in user_info['department']])
new_user.phone = str(int(user_info['phone'])) if user_info['phone'] else None
db.add(new_user)
db.commit()
2023-04-20 15:34:30 +08:00
def load_daily_data(db: Session, file):
dt = pd.read_excel(file)
line_count = dt.shape[0]
for i in range(line_count):
row = dt.loc[i]
name = row['姓名']
daily_time = row['日期']
content = row['内容']
user = db.query(User).filter_by(name=name).first()
if not user:
raise Exception(f'{name},无此姓名')
if db.query(Daily).filter_by(type='董监高日报', content=content, daily_time=daily_time,
fill_user=user.email).first():
continue
else:
old_item = db.query(Daily).filter_by(type='董监高日报', fill_user=user.email).first()
new_item = Daily()
new_item.fill_user = user.email
new_item.department = old_item.department
new_item.post = old_item.post
new_item.content = content
new_item.daily_time = daily_time
new_item.type = '董监高日报'
db.add(new_item)
db.commit()