import os import threading import traceback from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer from watchdog.observers.polling import PollingObserver import log from app.conf import ModuleConf from app.helper import DbHelper from config import RMT_MEDIAEXT, Config from app.filetransfer import FileTransfer from app.utils.commons import singleton from app.utils import PathUtils, ExceptionUtils from app.utils.types import SyncType, OsType lock = threading.Lock() class FileMonitorHandler(FileSystemEventHandler): """ 目录监控响应类 """ def __init__(self, monpath, sync, **kwargs): super(FileMonitorHandler, self).__init__(**kwargs) self._watch_path = monpath self.sync = sync def on_created(self, event): self.sync.file_change_handler(event, "创建", event.src_path) def on_moved(self, event): self.sync.file_change_handler(event, "移动", event.dest_path) """ def on_modified(self, event): self.sync.file_change_handler(event, "修改", event.src_path) """ @singleton class Sync(object): filetransfer = None dbhelper = None sync_dir_config = {} _observer = [] _sync_paths = [] _sync_sys = OsType.LINUX _synced_files = [] _need_sync_paths = {} def __init__(self): self.init_config() def init_config(self): self.dbhelper = DbHelper() self.filetransfer = FileTransfer() sync = Config().get_config('sync') sync_paths = self.dbhelper.get_config_sync_paths() if sync and sync_paths: if sync.get('nas_sys') == "windows": self._sync_sys = OsType.WINDOWS self._sync_paths = sync_paths self.init_sync_dirs() def init_sync_dirs(self): """ 初始化监控文件配置 """ self.sync_dir_config = {} if self._sync_paths: for sync_item in self._sync_paths: if not sync_item: continue # ID sync_id = sync_item.ID # 启用标志 enabled = True if sync_item.ENABLED else False # 仅硬链接标志 only_link = False if sync_item.RENAME else True # 转移方式 path_syncmode = ModuleConf.RMT_MODES.get(sync_item.MODE) # 源目录|目的目录|未知目录 monpath = sync_item.SOURCE target_path = sync_item.DEST unknown_path = sync_item.UNKNOWN if target_path and unknown_path: log.info("【Sync】读取到监控目录:%s,目的目录:%s,未识别目录:%s,转移方式:%s" % ( monpath, target_path, unknown_path, path_syncmode.value)) elif target_path: log.info( "【Sync】读取到监控目录:%s,目的目录:%s,转移方式:%s" % (monpath, target_path, path_syncmode.value)) else: log.info("【Sync】读取到监控目录:%s,转移方式:%s" % (monpath, path_syncmode.value)) if not enabled: log.info("【Sync】%s 不进行监控和同步:手动关闭" % monpath) continue if only_link: log.info("【Sync】%s 不进行识别和重命名" % monpath) if target_path and not os.path.exists(target_path): log.info("【Sync】目的目录不存在,正在创建:%s" % target_path) os.makedirs(target_path) if unknown_path and not os.path.exists(unknown_path): log.info("【Sync】未识别目录不存在,正在创建:%s" % unknown_path) os.makedirs(unknown_path) # 登记关系 if os.path.exists(monpath): self.sync_dir_config[monpath] = { 'id': sync_id, 'target': target_path, 'unknown': unknown_path, 'onlylink': only_link, 'syncmod': path_syncmode } else: log.error("【Sync】%s 目录不存在!" % monpath) def get_sync_dirs(self): """ 返回所有的同步监控目录 """ if not self.sync_dir_config: return [] return [os.path.normpath(key) for key in self.sync_dir_config.keys()] def file_change_handler(self, event, text, event_path): """ 处理文件变化 :param event: 事件 :param text: 事件描述 :param event_path: 事件文件路径 """ if not event.is_directory: # 文件发生变化 try: if not os.path.exists(event_path): return log.debug("【Sync】文件%s:%s" % (text, event_path)) # 判断是否处理过了 need_handler_flag = False try: lock.acquire() if event_path not in self._synced_files: self._synced_files.append(event_path) need_handler_flag = True finally: lock.release() if not need_handler_flag: log.debug("【Sync】文件已处理过:%s" % event_path) return # 不是监控目录下的文件不处理 is_monitor_file = False for tpath in self.sync_dir_config.keys(): if PathUtils.is_path_in_path(tpath, event_path): is_monitor_file = True break if not is_monitor_file: return # 目的目录的子文件不处理 for tpath in self.sync_dir_config.values(): if not tpath: continue if PathUtils.is_path_in_path(tpath.get('target'), event_path): return if PathUtils.is_path_in_path(tpath.get('unknown'), event_path): return # 媒体库目录及子目录不处理 if self.filetransfer.is_target_dir_path(event_path): return # 回收站及隐藏的文件不处理 if PathUtils.is_invalid_path(event_path): return # 上级目录 from_dir = os.path.dirname(event_path) # 找到是哪个监控目录下的 monitor_dir = event_path is_root_path = False for m_path in self.sync_dir_config.keys(): if PathUtils.is_path_in_path(m_path, event_path): monitor_dir = m_path if os.path.normpath(m_path) == os.path.normpath(from_dir): is_root_path = True # 查找目的目录 target_dirs = self.sync_dir_config.get(monitor_dir) target_path = target_dirs.get('target') unknown_path = target_dirs.get('unknown') onlylink = target_dirs.get('onlylink') sync_mode = target_dirs.get('syncmod') # 只做硬链接,不做识别重命名 if onlylink: if self.dbhelper.is_sync_in_history(event_path, target_path): return log.info("【Sync】开始同步 %s" % event_path) ret, msg = self.filetransfer.link_sync_file(src_path=monitor_dir, in_file=event_path, target_dir=target_path, sync_transfer_mode=sync_mode) if ret != 0: log.warn("【Sync】%s 同步失败,错误码:%s" % (event_path, ret)) elif not msg: self.dbhelper.insert_sync_history(event_path, monitor_dir, target_path) log.info("【Sync】%s 同步完成" % event_path) # 识别转移 else: # 不是媒体文件不处理 name = os.path.basename(event_path) if not name: return if name.lower() != "index.bdmv": ext = os.path.splitext(name)[-1] if ext.lower() not in RMT_MEDIAEXT: return # 监控根目录下的文件发生变化时直接发走 if is_root_path: ret, ret_msg = self.filetransfer.transfer_media(in_from=SyncType.MON, in_path=event_path, target_dir=target_path, unknown_dir=unknown_path, rmt_mode=sync_mode) if not ret: log.warn("【Sync】%s 转移失败:%s" % (event_path, ret_msg)) else: try: lock.acquire() if self._need_sync_paths.get(from_dir): files = self._need_sync_paths[from_dir].get('files') if not files: files = [event_path] else: if event_path not in files: files.append(event_path) else: return self._need_sync_paths[from_dir].update({'files': files}) else: self._need_sync_paths[from_dir] = {'target': target_path, 'unknown': unknown_path, 'syncmod': sync_mode, 'files': [event_path]} finally: lock.release() except Exception as e: ExceptionUtils.exception_traceback(e) log.error("【Sync】发生错误:%s - %s" % (str(e), traceback.format_exc())) def transfer_mon_files(self): """ 批量转移文件,由定时服务定期调用执行 """ try: lock.acquire() finished_paths = [] for path in list(self._need_sync_paths): if not PathUtils.is_invalid_path(path) and os.path.exists(path): log.info("【Sync】开始转移监控目录文件...") target_info = self._need_sync_paths.get(path) bluray_dir = PathUtils.get_bluray_dir(path) if not bluray_dir: src_path = path files = target_info.get('files') else: src_path = bluray_dir files = [] if src_path not in finished_paths: finished_paths.append(src_path) else: continue target_path = target_info.get('target') unknown_path = target_info.get('unknown') sync_mode = target_info.get('syncmod') # 判断是否根目录 is_root_path = False for m_path in self.sync_dir_config.keys(): if os.path.normpath(m_path) == os.path.normpath(src_path): is_root_path = True ret, ret_msg = self.filetransfer.transfer_media(in_from=SyncType.MON, in_path=src_path, files=files, target_dir=target_path, unknown_dir=unknown_path, rmt_mode=sync_mode, root_path=is_root_path) if not ret: log.warn("【Sync】%s转移失败:%s" % (path, ret_msg)) self._need_sync_paths.pop(path) finally: lock.release() def run_service(self): """ 启动监控服务 """ self._observer = [] for monpath in self.sync_dir_config.keys(): if monpath and os.path.exists(monpath): try: if self._sync_sys == OsType.WINDOWS: # 考虑到windows的docker需要直接指定才能生效(修改配置文件为windows) observer = PollingObserver(timeout=10) else: # 内部处理系统操作类型选择最优解 observer = Observer(timeout=10) self._observer.append(observer) observer.schedule(FileMonitorHandler(monpath, self), path=monpath, recursive=True) observer.daemon = True observer.start() log.info("%s 的监控服务启动" % monpath) except Exception as e: ExceptionUtils.exception_traceback(e) log.error("%s 启动目录监控失败:%s" % (monpath, str(e))) def stop_service(self): """ 关闭监控服务 """ if self._observer: for observer in self._observer: observer.stop() self._observer = [] def transfer_all_sync(self, sid=None): """ 全量转移Sync目录下的文件,WEB界面点击目录同步时获发 """ for monpath, target_dirs in self.sync_dir_config.items(): if not monpath: continue if sid and sid != target_dirs.get('id'): continue target_path = target_dirs.get('target') unknown_path = target_dirs.get('unknown') onlylink = target_dirs.get('onlylink') sync_mode = target_dirs.get('syncmod') # 只做硬链接,不做识别重命名 if onlylink: for link_file in PathUtils.get_dir_files(monpath): if self.dbhelper.is_sync_in_history(link_file, target_path): continue log.info("【Sync】开始同步 %s" % link_file) ret, msg = self.filetransfer.link_sync_file(src_path=monpath, in_file=link_file, target_dir=target_path, sync_transfer_mode=sync_mode) if ret != 0: log.warn("【Sync】%s 同步失败,错误码:%s" % (link_file, ret)) elif not msg: self.dbhelper.insert_sync_history(link_file, monpath, target_path) log.info("【Sync】%s 同步完成" % link_file) else: for path in PathUtils.get_dir_level1_medias(monpath, RMT_MEDIAEXT): if PathUtils.is_invalid_path(path): continue ret, ret_msg = self.filetransfer.transfer_media(in_from=SyncType.MON, in_path=path, target_dir=target_path, unknown_dir=unknown_path, rmt_mode=sync_mode) if not ret: log.error("【Sync】%s 处理失败:%s" % (monpath, ret_msg)) def run_monitor(): """ 启动监控 """ try: Sync().run_service() except Exception as err: ExceptionUtils.exception_traceback(err) log.error("启动目录同步服务失败:%s" % str(err)) def stop_monitor(): """ 停止监控 """ try: Sync().stop_service() except Exception as err: ExceptionUtils.exception_traceback(err) log.error("停止目录同步服务失败:%s" % str(err)) def restart_monitor(): """ 重启监控 """ stop_monitor() run_monitor()