监控指定后缀的文件
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import shutil
import time
# 配置日志基本设置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 日志文件路径
log_file_path = 'log/output.log'
# 确保日志目录存在
log_directory = os.path.dirname(log_file_path)
if not os.path.exists(log_directory):
os.makedirs(log_directory)
# 创建TimedRotatingFileHandler,每天轮换日志文件
handler = TimedRotatingFileHandler(log_file_path, encoding='utf-8', when="midnight", interval=1, backupCount=30)
# 使用日志格式器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 为日志记录系统添加handler
logging.getLogger().addHandler(handler)
try:
from watchdog.observers import Observer
from watchdog.events import *
except ImportError:
logging.info("错误:未找到watchdog模块。")
logging.info("请通过以下命令安装:pip install watchdog")
sys.exit(1) # 退出程序,返回非0状态码表示错误
# 处理器
class FileEventHandler(PatternMatchingEventHandler):
def __init__(self, base_path, save_path):
# 设置匹配模式
# 关键参数说明:
# patterns:一个字符串列表,定义要监控的文件模式,例如 ['*.txt', '*.csv']。
# ignore_patterns:一个字符串列表,定义要忽略的文件模式。
# ignore_directories:设置为 True 可以忽略目录事件。
# case_sensitive:设置为 True 则进行大小写敏感匹配。
super().__init__(patterns=['*.dat', '*.jpg', '*.png', '*.mp3', '*.mp4'], ignore_directories=True, case_sensitive=False)
self.base_path = base_path # 监控的根目录
self.save_path = save_path # 保存的根目录
def is_file_ready(self, file_path):
"""检查文件是否准备就绪(未被其他进程占用)"""
try:
# 尝试以读写方式打开文件,如果文件被占用,则会失败
with open(file_path, 'a+b'):
return True
except IOError:
return False
def on_moved(self, event):
if event.is_directory:
logging.info("directory moved from {0} to {1}".format(event.src_path, event.dest_path))
else:
logging.info("file moved from {0} to {1}".format(event.src_path, event.dest_path))
localtime = time.strftime("%Y-%m-%d_%H_%M_%S",time.localtime(time.time()))
i = 1
while i <= 5:
i += 1
# os.system("copy \"{0}\" \"F:\image\{1}_moved_{2}\"".format(event.dest_path, localtime, os.path.basename(event.dest_path)))
# command = "copy \"{0}\" \"{1}\moved{2}\"".format(event.dest_path, self.save_path, os.path.relpath(event.dest_path, self.base_path))
target_path = "{0}\moved{1}".format(self.save_path, os.path.relpath(event.dest_path, self.base_path))
# 确保目标目录存在
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
if os.path.exists(event.dest_path) and self.is_file_ready(event.dest_path):
try:
# logging.info(command)
# os.system(command)
# 使用shutil.copy进行拷贝
shutil.copy2(event.dest_path, target_path)
break
except Exception as e:
logging.info(f"发生错误:{e}")
break
else:
logging.info(f"{event.dest_path} 文件不存在或被占用")
time.sleep(0.5)
# if event.src_path.endswith(".dat") and os.access(event.src_path, os.R_OK):
# #logging.info(event.src_path.endswith(".png"))
# logging.info("File[{0}] is accessible to read".format(os.path.basename(event.src_path)))
# os.system("copy \"{0}\" \"F:\image\{1}\"".format(event.src_path, os.path.basename(event.src_path)))
def on_created(self, event):
if event.is_directory:
logging.info("directory created:{0}".format(event.src_path))
else:
logging.info("file created:{0}".format(event.src_path))
def on_deleted(self, event):
if event.is_directory:
logging.info("directory deleted:{0}".format(event.src_path))
else:
logging.info("file deleted:{0}".format(event.src_path))
def on_modified(self, event):
if event.is_directory:
logging.info("directory modified:{0}".format(event.src_path))
else:
logging.info("file modified:{0}".format(event.src_path))
localtime = time.strftime("%Y-%m-%d_%H_%M_%S",time.localtime(time.time()))
i = 1
while i <= 5:
i += 1
# os.system("copy \"{0}\" \"F:\image\{1}_modified_{2}\"".format(event.src_path, localtime, os.path.basename(event.src_path)))
# command = "copy \"{0}\" \"{1}\modified{2}\"".format(event.src_path, self.save_path, os.path.relpath(event.src_path, self.base_path))
target_path = "{0}\modified{1}".format(self.save_path, os.path.relpath(event.src_path, self.base_path))
# 确保目标目录存在
target_dir = os.path.dirname(target_path)
os.makedirs(target_dir, exist_ok=True)
if os.path.exists(event.src_path) and self.is_file_ready(event.src_path):
try:
# logging.info(command)
# os.system(command)
# 使用shutil.copy进行拷贝
shutil.copy2(event.src_path, target_path)
break
except Exception as e:
logging.info(f"发生错误:{e}")
break
else:
logging.info(f"{event.src_path} 文件不存在或被占用")
time.sleep(0.5)
# shutil.copyfile(event.src_path, "F:\image\{0}".format(os.path.basename(event.src_path)))
# if event.src_path.endswith(".dat") and os.access(event.src_path, os.R_OK):
# #logging.info(event.src_path.endswith(".png"))
# logging.info("File[{0}] is accessible to read".format(os.path.basename(event.src_path)))
# os.system("copy \"{0}\" \"F:\image\{1}\"".format(event.src_path, os.path.basename(event.src_path)))
if __name__ == '__main__':
observer = Observer()
# 监控的目录
path_to_watch = r"D:\WeChat Files\wxid_\FileStorage\MsgAttach"
# 保存的目录
path_to_save = r"F:\image\MsgAttach"
# 处理器初始化时候这里可以传些参进去
event_handler = FileEventHandler(path_to_watch, path_to_save)
# 监控的文件夹
observer.schedule(event_handler, path_to_watch, True)
path_to_watch1 = r"D:\xwechat_files\wxid_\msg"
path_to_save1 = r"F:\image\msg"
event_handler1 = FileEventHandler(path_to_watch1, path_to_save1)
observer.schedule(event_handler1, path_to_watch1, True)
observer.start()
observer.join()