import pandas as pd from sqlalchemy.orm import Session from Utils.SqlAlchemyUtils import get_db_i from Models.PostModel import Post from Models.DepartmentModel import Department from Models.UserModel import UserInfo, User from Models.DailyModel import Daily import json def user_table_to_db(): data = json.load(open("Config/Company1.json", "r", encoding="utf-8")) db = get_db_i() d_name_dic = {item.name: item.id for item in db.query(Department).all()} d_id_dic = {item.id: item.name for item in db.query(Department).all()} # 创建职务表 # db.query(Post).delete() # for d_type in data: # item = data[d_type] # for d in item: # p_list = item[d] # for p_name in p_list: # d_id = d_name_dic[d] # db.add(Post(belong=d_id, name=p_name)) # db.commit() items = [item for item in db.query(Post).all()] post_dic = {} for item in db.query(Post).all(): if item.belong not in post_dic: post_dic[item.belong] = {} if item.name not in post_dic[item.belong]: post_dic[item.belong][item.name] = item.id dt = pd.read_excel("远东员工花名册.xlsx") line_count = dt.shape[0] for i in range(line_count): row = dt.loc[i] email = row["邮箱"] name = row["姓名"] d1 = row["部门1"] d2 = row["部门2"] d3 = row["部门3"] p1 = row["职务1"] p2 = row["职务2"] p3 = row["职务3"] ds = set() ps = set() if not pd.isna(d1): print(d1, p1) ds.add(d_name_dic[d1]) ps.add(post_dic[d_name_dic[d1]][p1]) if not pd.isna(d2): print(d2, p2) ds.add(d_name_dic[d2]) ps.add(post_dic[d_name_dic[d2]][p2]) if not pd.isna(d3): print(d3, p3) ds.add(d_name_dic[d3]) ps.add(post_dic[d_name_dic[d3]][p3]) d_t = ",".join([str(item) for item in ds]) p_t = ",".join([str(item) for item in ps]) user = UserInfo(email=email, name=name, department=d_t, post=p_t) db.add(user) db.commit() # row = dt.loc[i] # if row['部门1'] not in post_dic: # post_dic[[row['部门1']]] = [] # if row['职务1'] not in post_dic[row['部门1']]: # post_dic[row['部门1']].append(row['职务1']) # # if row['部门2'] not in post_dic: # post_dic[[row['部门2']]] = [] # if row['职务2'] not in post_dic[row['部门2']]: # post_dic[row['部门2']].append(row['职务2']) # # if row['部门3'] not in post_dic: # post_dic[[row['部门3']]] = [] # if row['职务3'] not in post_dic[row['部门3']]: # post_dic[row['部门3']].append(row['职务3']) def load_daily(): db = get_db_i() data = pd.read_excel('动态类数据.xlsx') length = data.shape[0] d_name_dic = {item.name: item.id for item in db.query(Department).all()} post_dic = {} for item in db.query(Post).all(): if item.belong not in post_dic: post_dic[item.belong] = {} if item.name not in post_dic[item.belong]: post_dic[item.belong][item.name] = item.id for i in range(length): row = data.loc[i].to_dict() 报送类型 = row['报送类型'] 报送人 = row['报送人'] 报送时间 = row['报送时间'] 标题 = row['标题'] 标题 = "" if pd.isna(标题) else 标题 正文 = row['正文'] 部门 = row['部门'] 职务 = row['职务'] email = db.query(User).filter_by(name=报送人).first().email department_id = d_name_dic[部门] post_id = post_dic[department_id][职务] db.add(Daily(type=报送类型, fill_user=email, title=标题, content=正文, post=post_id, department=department_id, daily_time=报送时间)) db.commit() def load_user_data(db: Session): dt = pd.read_excel("远东员工花名册(1).xlsx") line_count = dt.shape[0] user_list = [] post_list = [] dp_id_dic = {item.name: item.id for item in db.query(Department).all()} dt = dt.fillna(value="") for i in range(line_count): row = dt.loc[i] 部门1 = row["部门"] 部门2 = row["部门2"] 职务1 = row["职务"] 职务2 = row["职务2"] user_info = { "email": row["邮箱"], "name": row["姓名"], "manage_departments": row["所看部门"].split(',') if row["所看部门"] else [], "phone": row["移动电话"], "department": [item for item in [部门1, 部门2] if item], "post": [item for item in [[部门1, 职务1], [部门2, 职务2]] if item[0]] } post_list.extend(user_info["post"]) user_list.append(user_info) post_list.sort(key=lambda x: dp_id_dic[x[0]]) db.query(Post).delete() key_dic = set() for post in post_list: key = f"{post[0]},{post[1]}" if key not in key_dic: key_dic.add(key) new_post = Post(name=post[1], belong=dp_id_dic[post[0]]) db.add(new_post) db.commit() dp_post_dic = {} for item in db.query(Post).all(): if not dp_post_dic.get(item.belong): dp_post_dic[item.belong] = {} dp_post_dic[item.belong][item.name] = item.id db.query(User).delete() for user_info in user_list: new_user = User() new_user.email = user_info['email'] new_user.name = user_info['name'] new_user.manage_departments = ",".join([str(dp_id_dic[item]) for item in user_info['manage_departments']]) new_user.post = ",".join([str(dp_post_dic[dp_id_dic[item[0]]][item[1]]) for item in user_info['post']]) new_user.department = ",".join([str(dp_id_dic[item]) for item in user_info['department']]) new_user.phone = str(int(user_info['phone'])) if user_info['phone'] else None db.add(new_user) db.commit() def load_daily_data(db: Session, file): dt = pd.read_excel(file) line_count = dt.shape[0] for i in range(line_count): row = dt.loc[i] name = row['姓名'] daily_time = row['日期'] content = row['内容'] user = db.query(User).filter_by(name=name).first() if not user: raise Exception(f'{name},无此姓名') if db.query(Daily).filter_by(type='董监高日报', content=content, daily_time=daily_time, fill_user=user.email).first(): continue else: old_item = db.query(Daily).filter_by(type='董监高日报', fill_user=user.email).first() new_item = Daily() new_item.fill_user = user.email new_item.department = old_item.department new_item.post = old_item.post new_item.content = content new_item.daily_time = daily_time new_item.type = '董监高日报' db.add(new_item) db.commit()