file-receive-backend/utils/email_utils.py

119 lines
3.9 KiB
Python

import asyncio
import json
import requests
import re
from fastapi import HTTPException
# from fastapi_mail import FastMail, MessageSchema, ConnectionConfig, MessageType
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
def email_check(email: str):
"""
邮箱验证
"""
reg = "^[a-zA-Z0-9_-]+@fecr.com.cn$"
return re.match(reg, email)
# class EmailTool:
# def __init__(self, conf_dic):
# """
# MAIL_USERNAME="dogbillions@163.com",
# MAIL_PASSWORD="RMRMGULXYJHRMATM",
# MAIL_FROM="dogbillions@163.com",
# MAIL_PORT=conf_dic.get('MAIL_PORT', 25),
# MAIL_SERVER="smtp.163.com",
# MAIL_FROM_NAME="维德团队",
# """
# conf_dic['MAIL_PORT'] = int(conf_dic.get('MAIL_PORT', 25))
# conf = ConnectionConfig(
# **conf_dic,
# MAIL_STARTTLS=False,
# MAIL_SSL_TLS=False,
# USE_CREDENTIALS=True,
# VALIDATE_CERTS=True,
# # MAIL_PORT=int(conf_dic.get('MAIL_PORT', 25))
# )
# self.conf = conf
# self.fm = FastMail(conf)
#
# def send_email(self, title: str, email: str, msg_body: str):
# print(self.conf.MAIL_FROM)
# message = MessageSchema(
# subject=title,
# body=f"{msg_body}",
# charset='utf-8',
# subtype=MessageType.plain, recipients=[email])
# try:
# asyncio.run(self.fm.send_message(message))
# except Exception as e:
# print(e.with_traceback(None))
# raise HTTPException(status_code=303, detail='邮件发送失败')
#
def send_email(title: str, email: str, msg_body: str, email_api="http://email_api:30001",
sender="dogbillions@163.com"):
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": title,
"sender": sender,
"recipients": [email],
"msg_body": msg_body,
}
res = requests.post(url=email_api + '/send_mail',
headers=headers, data=json.dumps(data), timeout=5)
return res.status_code == 200
# conf = ConnectionConfig(
# MAIL_USERNAME="dogbillions@163.com",
# MAIL_PASSWORD="RMRMGULXYJHRMATM",
# MAIL_FROM="dogbillions@163.com",
# MAIL_PORT=25,
# MAIL_SERVER="smtp.163.com",
# MAIL_FROM_NAME="维德团队",
# MAIL_STARTTLS=True,
# MAIL_SSL_TLS=False,
# USE_CREDENTIALS=True,
# VALIDATE_CERTS=True)
#
# fm = FastMail(conf)
#
#
# def send_mail(title: str, email: str, msg_body: str, sender="维德团队"):
# message = MessageSchema(
# subject=title,
# body=f"<p>{msg_body}</p>",
# subtype=MessageType.html, recipients=[email])
# asyncio.run(fm.send_message(message))
class EmailTool:
def __init__(self, sender_email, smtp_server, smtp_port, username, password):
self.sender_email = sender_email
self.smtp_server = smtp_server
self.smtp_port = int(smtp_port)
self.username = username
self.password = password
def send_email(self, title, email, body):
message = MIMEMultipart()
message['From'] = Header(self.sender_email) # 设置发件人信息,确保符合格式要求
message['To'] = email
message['Subject'] = Header(title, 'utf-8') # 设置邮件主题
message.attach(MIMEText(body, 'plain'))
# 使用SMTP对象连接到SMTP服务器
try:
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
server.login(self.username, self.password)
server.sendmail(self.sender_email, email, message.as_string())
server.quit()
except Exception as e:
print(f'邮件发送失败: {str(e)}')
raise HTTPException(status_code=303,detail='发送失败')