有27人阅读过
使用python监测文件夹变动,发送邮件通知
发布于2024/08/28 更新于2024/08/28
[ 教程仅保证更新时有效,请自行测试。]
发布于2024/08/28 更新于2024/08/28
[ 教程仅保证更新时有效,请自行测试。]
[ 教程仅保证更新时有效,请自行测试。]
代码如下:
import os import time import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # 邮件配置 email_config = { 'sender': '1111@qq.com', 'receivers': ['2222@qq.com', '3333@qq.com'], 'smtp_server': 'smtp.qq.com', 'smtp_port_ssl': 465, # SSL 端口 'smtp_port_tls': 587, # STARTTLS 端口 'password': '444444', # QQ 邮箱授权码 } class TestHandler(FileSystemEventHandler): def __init__(self, email_config): self.email_config = email_config self.last_modified_time = 0 self.last_email_time = 0 self.email_interval = 10 # 限制邮件发送间隔,单位:秒 def on_modified(self, event): current_time = time.time() if current_time - self.last_modified_time > 2: self.last_modified_time = current_time print(f"文件 {event.src_path} 已被修改") if current_time - self.last_email_time > self.email_interval: self.last_email_time = current_time self.send_email_notification(event.src_path) def send_email_notification(self, file_path): print("正在发送邮件通知...") msg = MIMEMultipart() msg['From'] = self.email_config['sender'] msg['To'] = ", ".join(self.email_config['receivers']) #msg['Subject'] = "文件变动通知" #body = f"文件 {os.path.basename(file_path)} 已被修改。" msg['Subject'] = "新采购订单通知" body = f"有新的采购订单需求,请及时查看并处理!" msg.attach(MIMEText(body, 'plain')) for attempt in range(3): try: print(f"尝试通过 SSL 端口 {self.email_config['smtp_port_ssl']} 连接...") server = smtplib.SMTP_SSL(self.email_config['smtp_server'], self.email_config['smtp_port_ssl']) server.set_debuglevel(1) server.login(self.email_config['sender'], self.email_config['password']) server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string()) print("邮件发送成功") break except Exception as e: print(f"通过 SSL 发送邮件失败,尝试第 {attempt + 1} 次:{e}") time.sleep(2) finally: try: server.quit() except Exception as e: print(f"关闭 SSL 连接时出错:{e}") if attempt == 2: print("尝试通过 STARTTLS 端口连接...") try: server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port_tls']) server.set_debuglevel(1) server.starttls() server.login(self.email_config['sender'], self.email_config['password']) server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string()) print("通过 STARTTLS 端口邮件发送成功") except Exception as e: print(f"通过 STARTTLS 发送邮件失败:{e}") finally: try: server.quit() except Exception as e: print(f"关闭 STARTTLS 连接时出错:{e}") if __name__ == "__main__": path = "assets" event_handler = TestHandler(email_config) observer = Observer() observer.schedule(event_handler, path=path, recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join()
文章对你有帮助吗?
- 一般[0]
- 很赞[0]
- 没用[0]
- 垃圾[0]
- 无语[0]