570
文章
·
28755
阅读
570
文章
·
28755
阅读

有27人阅读过 使用python监测文件夹变动,发送邮件通知
发布于2024/08/28 更新于2024/08/28
[ 教程仅保证更新时有效,请自行测试。]

代码如下:

import os
import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

# 邮件配置
email_config = {
    'sender': '1111@qq.com',
    'receivers': ['2222@qq.com', '3333@qq.com'],
    'smtp_server': 'smtp.qq.com',
    'smtp_port_ssl': 465,  # SSL 端口
    'smtp_port_tls': 587,  # STARTTLS 端口
    'password': '444444',  # QQ 邮箱授权码
}

class TestHandler(FileSystemEventHandler):
    def __init__(self, email_config):
        self.email_config = email_config
        self.last_modified_time = 0
        self.last_email_time = 0
        self.email_interval = 10  # 限制邮件发送间隔,单位:秒

    def on_modified(self, event):
        current_time = time.time()
        if current_time - self.last_modified_time > 2:
            self.last_modified_time = current_time
            print(f"文件 {event.src_path} 已被修改")
            if current_time - self.last_email_time > self.email_interval:
                self.last_email_time = current_time
                self.send_email_notification(event.src_path)

    def send_email_notification(self, file_path):
        print("正在发送邮件通知...")
        msg = MIMEMultipart()
        msg['From'] = self.email_config['sender']
        msg['To'] = ", ".join(self.email_config['receivers'])
        #msg['Subject'] = "文件变动通知"
        #body = f"文件 {os.path.basename(file_path)} 已被修改。"
        msg['Subject'] = "新采购订单通知"
        body = f"有新的采购订单需求,请及时查看并处理!"
        msg.attach(MIMEText(body, 'plain'))

        for attempt in range(3):
            try:
                print(f"尝试通过 SSL 端口 {self.email_config['smtp_port_ssl']} 连接...")
                server = smtplib.SMTP_SSL(self.email_config['smtp_server'], self.email_config['smtp_port_ssl'])
                server.set_debuglevel(1)
                server.login(self.email_config['sender'], self.email_config['password'])
                server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string())
                print("邮件发送成功")
                break
            except Exception as e:
                print(f"通过 SSL 发送邮件失败,尝试第 {attempt + 1} 次:{e}")
                time.sleep(2)
            finally:
                try:
                    server.quit()
                except Exception as e:
                    print(f"关闭 SSL 连接时出错:{e}")

        if attempt == 2:
            print("尝试通过 STARTTLS 端口连接...")
            try:
                server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port_tls'])
                server.set_debuglevel(1)
                server.starttls()
                server.login(self.email_config['sender'], self.email_config['password'])
                server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string())
                print("通过 STARTTLS 端口邮件发送成功")
            except Exception as e:
                print(f"通过 STARTTLS 发送邮件失败:{e}")
            finally:
                try:
                    server.quit()
                except Exception as e:
                    print(f"关闭 STARTTLS 连接时出错:{e}")

if __name__ == "__main__":
    path = "assets"
    event_handler = TestHandler(email_config)
    observer = Observer()
    observer.schedule(event_handler, path=path, recursive=False)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    
    observer.join()


文章对你有帮助吗?
  • 一般[0]
  • 很赞[0]
  • 没用[0]
  • 垃圾[0]
  • 无语[0]
扫一扫,手机浏览手机访问本站