570
文章
·
28663
阅读
570
文章
·
28663
阅读

有21人阅读过 使用python监测指定文件夹内容更新,发送桌面通知和邮件通知【win】
发布于2024/08/28 更新于2024/08/28
[ 教程仅保证更新时有效,请自行测试。]

代码如下:

import os
import sys
import time
import smtplib
import ctypes
import win32api
import win32gui
import win32con
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from plyer import notification

# 邮件配置
email_config = {
    'sender': '1111@qq.com',
    'receivers': ['aaa@qq.com', 'bbbb@163.com'],
    'smtp_server': 'smtp.qq.com',
    'smtp_port_ssl': 465,  # SSL 端口
    'smtp_port_tls': 587,  # STARTTLS 端口
    'password': 'exqnowhneikcxxxx',  # QQ 邮箱授权码
}

class FileChangeHandler(FileSystemEventHandler):
    def __init__(self, file_path, email_config):
        self.file_path = file_path
        self.email_config = email_config
        self.last_modified_time = 0

    def on_modified(self, event):
        if event.src_path == self.file_path:
            current_time = time.time()
            if current_time - self.last_modified_time > 2:
                self.last_modified_time = current_time
                print(f"检测到文件 {self.file_path} 已修改")
                self.notify_file_change()
                self.send_email_notification()

    def notify_file_change(self):
        print("正在发送桌面通知...")
        notification.notify(
            title="文件更新通知",
            message=f"文件 {os.path.basename(self.file_path)} 已被修改",
            timeout=5
        )

    def send_email_notification(self):
        print("正在发送邮件通知...")
        msg = MIMEMultipart()
        msg['From'] = self.email_config['sender']
        msg['To'] = ", ".join(self.email_config['receivers'])  # 将多个收件人用逗号分隔
        msg['Subject'] = "文件更新通知"
        body = f"文件 {os.path.basename(self.file_path)} 已被修改。"
        msg.attach(MIMEText(body, 'plain'))

        for attempt in range(3):
            try:
                print(f"尝试通过 SSL 端口 {self.email_config['smtp_port_ssl']} 连接...")
                server = smtplib.SMTP_SSL(self.email_config['smtp_server'], self.email_config['smtp_port_ssl'])
                server.set_debuglevel(1)
                server.login(self.email_config['sender'], self.email_config['password'])
                server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string())
                print("邮件发送成功")
                break
            except Exception as e:
                print(f"通过 SSL 发送邮件失败,尝试第 {attempt + 1} 次:{e}")
                time.sleep(2)
            finally:
                try:
                    server.quit()
                except Exception as e:
                    print(f"关闭 SSL 连接时出错:{e}")

        if attempt == 2:
            print("尝试通过 STARTTLS 端口连接...")
            try:
                server = smtplib.SMTP(self.email_config['smtp_server'], self.email_config['smtp_port_tls'])
                server.set_debuglevel(1)
                server.starttls()
                server.login(self.email_config['sender'], self.email_config['password'])
                server.sendmail(self.email_config['sender'], self.email_config['receivers'], msg.as_string())
                print("通过 STARTTLS 端口邮件发送成功")
            except Exception as e:
                print(f"通过 STARTTLS 发送邮件失败:{e}")
            finally:
                try:
                    server.quit()
                except Exception as e:
                    print(f"关闭 STARTTLS 连接时出错:{e}")

def minimize_window():
    hwnd = win32gui.GetForegroundWindow()
    win32gui.ShowWindow(hwnd, win32con.SW_MINIMIZE)

def get_application_path():
    if getattr(sys, 'frozen', False):
        return sys._MEIPASS
    else:
        return os.path.dirname(os.path.abspath(__file__))

def ensure_watch_file():
    app_path = get_application_path()
    watch_file_path = os.path.join(app_path, "watch.txt")
    
    try:
        if not os.path.exists(watch_file_path):
            with open(watch_file_path, "w", encoding="utf-8") as f:
                f.write("")  # 创建空文件
            print(f"watch.txt 文件已创建:{watch_file_path}")
            print("请将需要监控的文件路径写入 watch.txt 文件。")
        else:
            print(f"watch.txt 文件已存在:{watch_file_path}")
    except Exception as e:
        print(f"创建 watch.txt 文件失败:{e}")
        return False
    return True

def read_watch_file():
    app_path = get_application_path()
    watch_file_path = os.path.join(app_path, "watch.txt")

    try:
        with open(watch_file_path, "r", encoding="utf-8") as f:
            file_name = f.readline().strip()
        
        if not file_name:
            print("watch.txt 文件为空,请将需要监控的文件路径写入 watch.txt 文件。")
            return None
        return file_name
    except Exception as e:
        print(f"读取 watch.txt 文件失败:{e}")
        return None

def monitor_file():
    if not ensure_watch_file():
        return

    file_name = read_watch_file()
    
    if not file_name:
        return
    
    app_path = get_application_path()
    file_path = os.path.join(app_path, file_name)
    
    try:
        if not os.path.exists(file_path):
            print(f"文件 {file_name} 不存在,请确认文件路径。")
            return

        event_handler = FileChangeHandler(file_path, email_config)
        observer = Observer()
        observer.schedule(event_handler, path=app_path, recursive=False)
        observer.start()

        print("开始监控文件变动...")
        
        try:
            minimize_window()  # 最小化窗口
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("检测到键盘中断,正在停止...")
        finally:
            observer.stop()
            observer.join()
            print("监控已停止。")
    except Exception as e:
        print(f"监控文件失败:{e}")

if __name__ == "__main__":
    monitor_file()


文章对你有帮助吗?
  • 一般[0]
  • 很赞[0]
  • 没用[0]
  • 垃圾[0]
  • 无语[0]
扫一扫,手机浏览手机访问本站