import asyncio import json import requests import re from fastapi import HTTPException # from fastapi_mail import FastMail, MessageSchema, ConnectionConfig, MessageType import smtplib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.header import Header def email_check(email: str): """ 邮箱验证 """ reg = "^[a-zA-Z0-9_-]+@fecr.com.cn$" return re.match(reg, email) # class EmailTool: # def __init__(self, conf_dic): # """ # MAIL_USERNAME="dogbillions@163.com", # MAIL_PASSWORD="RMRMGULXYJHRMATM", # MAIL_FROM="dogbillions@163.com", # MAIL_PORT=conf_dic.get('MAIL_PORT', 25), # MAIL_SERVER="smtp.163.com", # MAIL_FROM_NAME="维德团队", # """ # conf_dic['MAIL_PORT'] = int(conf_dic.get('MAIL_PORT', 25)) # conf = ConnectionConfig( # **conf_dic, # MAIL_STARTTLS=False, # MAIL_SSL_TLS=False, # USE_CREDENTIALS=True, # VALIDATE_CERTS=True, # # MAIL_PORT=int(conf_dic.get('MAIL_PORT', 25)) # ) # self.conf = conf # self.fm = FastMail(conf) # # def send_email(self, title: str, email: str, msg_body: str): # print(self.conf.MAIL_FROM) # message = MessageSchema( # subject=title, # body=f"{msg_body}", # charset='utf-8', # subtype=MessageType.plain, recipients=[email]) # try: # asyncio.run(self.fm.send_message(message)) # except Exception as e: # print(e.with_traceback(None)) # raise HTTPException(status_code=303, detail='邮件发送失败') # def send_email(title: str, email: str, msg_body: str, email_api="http://email_api:30001", sender="dogbillions@163.com"): headers = {"Content-Type": "application/json;charset=UTF-8"} data = { "title": title, "sender": sender, "recipients": [email], "msg_body": msg_body, } res = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data), timeout=5) return res.status_code == 200 # conf = ConnectionConfig( # MAIL_USERNAME="dogbillions@163.com", # MAIL_PASSWORD="RMRMGULXYJHRMATM", # MAIL_FROM="dogbillions@163.com", # MAIL_PORT=25, # MAIL_SERVER="smtp.163.com", # MAIL_FROM_NAME="维德团队", # MAIL_STARTTLS=True, # MAIL_SSL_TLS=False, # USE_CREDENTIALS=True, # VALIDATE_CERTS=True) # # fm = FastMail(conf) # # # def send_mail(title: str, email: str, msg_body: str, sender="维德团队"): # message = MessageSchema( # subject=title, # body=f"

{msg_body}

", # subtype=MessageType.html, recipients=[email]) # asyncio.run(fm.send_message(message)) class EmailTool: def __init__(self, sender_email, smtp_server, smtp_port, username, password): self.sender_email = sender_email self.smtp_server = smtp_server self.smtp_port = int(smtp_port) self.username = username self.password = password def send_email(self, title, email, body): message = MIMEMultipart() message['From'] = Header(self.sender_email) # 设置发件人信息,确保符合格式要求 message['To'] = email message['Subject'] = Header(title, 'utf-8') # 设置邮件主题 message.attach(MIMEText(body, 'plain')) # 使用SMTP对象连接到SMTP服务器 try: server = smtplib.SMTP(self.smtp_server, self.smtp_port) server.starttls() server.login(self.username, self.password) server.sendmail(self.sender_email, email, message.as_string()) server.quit() except Exception as e: print(f'邮件发送失败: {str(e)}') raise HTTPException(status_code=303,detail='发送失败')