diff --git a/.github/workflows/alist-sync.yaml b/.github/workflows/alist-sync.yaml index 96ac574..75614b2 100644 --- a/.github/workflows/alist-sync.yaml +++ b/.github/workflows/alist-sync.yaml @@ -1,5 +1,5 @@ name: Alist Sync - +run-name: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}" on: workflow_dispatch: inputs: @@ -17,7 +17,7 @@ on: default: false env: - _ALIST_SYNC_NAME: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}" + _ALIST_SYNC_NAME: "${{github.run-name}}" _ALIST_SYNC_DEBUG: ${{ github.event.inputs.debug }} _ALIST_ADMIN_PASSWORD: ${{ secrets.ALIST_ADMIN_PASSWORD }} @@ -67,6 +67,7 @@ jobs: _RELOAD_STORAGE: ${{ github.event.inputs.reload_storage }} run: | + echo RUNNER = ${_ALIST_SYNC_NAME} # 这将会导入全部的内容包括:设置,元数据,用户,存储器。 echo $(pwd) cat > alist-backup-config.json << EOF @@ -76,6 +77,8 @@ jobs: - name: Create Tunnel for Cloudflare run: | + echo RUNNER = ${_ALIST_SYNC_NAME} + test ! -n "${{secrets.CLOUDFLARE_TUNNEL_TOKEN}}" && { echo "CLOUDFLARE_TUNNEL_TOKEN is not set. Skip Cloudflare Tunnel Installation." exit 0 @@ -89,6 +92,7 @@ jobs: - name: RUN Alist Sync run: | + echo RUNNER = ${_ALIST_SYNC_NAME} cat > config.yaml << EOF ${{ secrets.SYNC_CONFIG }} EOF @@ -102,5 +106,3 @@ jobs: - name: Debugger if: ${{ github.event.inputs.debug == 'true' && failure() }} uses: csexton/debugger-action@master - - diff --git a/.gitignore b/.gitignore index 77eb48e..2a116b2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ config.yaml alist_sync/.alist-sync-cache/ alist/ tools/alist/ +logs/ # C extensions *.so diff --git a/alist_sync/__main__.py b/alist_sync/__main__.py index c50c33c..c5117c1 100644 --- a/alist_sync/__main__.py +++ b/alist_sync/__main__.py @@ -64,6 +64,21 @@ def sync( return main() +@app.command("check") +def check( + config_file: str = Option(None, "--config", "-c", help="配置文件路径"), +): + """检查任务""" + from alist_sync.config import create_config + from alist_sync.d_main import main_check + + if config_file and Path(config_file).exists(): + os.environ["ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute()) + os.environ["_ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute()) + create_config() + return main_check() + + @app.command("get-info") def cli_get(path: str): """""" diff --git a/alist_sync/common.py b/alist_sync/common.py index 55301ac..fba9fbb 100644 --- a/alist_sync/common.py +++ b/alist_sync/common.py @@ -95,17 +95,17 @@ def timeout_input(msg, default, timeout=3): return default -def beautify_size(speed: float): - if speed < 1024: - return f"{speed:.2f}B" - speed /= 1024 - if speed < 1024: - return f"{speed:.2f}KB" - speed /= 1024 - if speed < 1024: - return f"{speed:.2f}MB" - speed /= 1024 - return f"{speed:.2f}GB" +def beautify_size(byte_size: float): + if byte_size < 1024: + return f"{byte_size:.2f}B" + byte_size /= 1024 + if byte_size < 1024: + return f"{byte_size:.2f}KB" + byte_size /= 1024 + if byte_size < 1024: + return f"{byte_size:.2f}MB" + byte_size /= 1024 + return f"{byte_size:.2f}GB" def transfer_speed(size, start: datetime.datetime, end: datetime.datetime) -> str: diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index 144388d..975f169 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -6,6 +6,7 @@ @Date-Time : 2024/2/25 21:17 """ +import datetime import fnmatch import logging import threading @@ -14,7 +15,8 @@ from typing import Iterator from functools import lru_cache -from alist_sdk import AlistPath +from alist_sdk import AlistPath, RawItem, AlistPathType, Item +from pydantic import BaseModel from alist_sync.config import create_config, SyncGroup from alist_sync.d_worker import Worker @@ -26,6 +28,21 @@ sync_config = create_config() +class SyncRawItem(BaseModel): + path: AlistPathType | None + scan_time: datetime.datetime = datetime.datetime.now() + stat: RawItem | Item | None + + def exists(self): + return self.stat is not None + + def is_file(self): + return not self.stat.is_dir + + def is_dir(self): + return self.stat.is_dir + + class Checker: def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Queue): self.sync_group: SyncGroup = sync_group @@ -34,6 +51,7 @@ def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Que self.conflict: set = set() self.pool = MyThreadPoolExecutor(10) + self.stat_sq = threading.Semaphore(4) self.main_thread = threading.Thread( target=self.main, name=f"checker_main[{self.sync_group.name}-{self.__class__.__name__}]", @@ -52,7 +70,38 @@ def split_path(self, path: AlistPath) -> tuple[AlistPath, str]: def get_backup_dir(self, path) -> AlistPath: return self.split_path(path)[0].joinpath(self.sync_group.backup_dir) - def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None": + def create_worker(self, type_: str, source_path: AlistPath, target_path: AlistPath): + return Worker( + type=type_, + group_name=self.sync_group.name, + need_backup=self.sync_group.need_backup, + backup_dir=self.get_backup_dir(source_path), + relative_path=self.split_path(source_path)[1], + source_path=source_path, + target_path=target_path, + ) + + _stat_get_times = 0 + + @lru_cache(40_000) + def get_stat(self, path: AlistPath) -> SyncRawItem: + # BUGFIX 控制QPS而不时并发 + with self.stat_sq: + self._stat_get_times += 1 + logger.debug("get_stat: %s, times: %d", path, self._stat_get_times) + try: + stat = path.client.dict_files_items( + path.parent.as_posix(), cache_empty=True + ).get(path.name) + except FileNotFoundError: + stat = None + return SyncRawItem(path=path, stat=stat) + + def checker( + self, + source_stat: SyncRawItem, + target_stat: SyncRawItem, + ) -> "Worker|None": raise NotImplementedError def ignore(self, relative_path) -> bool: @@ -64,19 +113,21 @@ def ignore(self, relative_path) -> bool: def checker_every_dir(self, path) -> Iterator[Worker | None]: _sync_dir, _relative_path = self.split_path(path) - # if self.ignore(_relative_path): - # return + logger.debug(f"Checking [{_relative_path}] in {self.sync_group.group}") for _sd in self.sync_group.group: _sd: AlistPath if _sd == _sync_dir: continue target_path = _sd.joinpath(_relative_path) - yield self.checker(path, target_path) + yield self.checker(self.get_stat(path), self.get_stat(target_path)) def _t_checker(self, path): - for _c in self.checker_every_dir(path): - if _c: - self.worker_queue.put(_c) + try: + for _c in self.checker_every_dir(path): + if _c: + self.worker_queue.put(_c) + except Exception as _e: + logger.error("Checker Error: ", exc_info=_e) def main(self): """""" @@ -94,8 +145,14 @@ def main(self): try: _started = True - self._t_checker(self.scaner_queue.get(timeout=3)) + path = self.scaner_queue.get(timeout=3) + self.pool.submit(self._t_checker, path) except Empty: + logger.debug( + "Checker Size: %s, %d", + self.sync_group.name, + self.pool.work_qsize(), + ) if _started: continue logger.info( @@ -113,44 +170,44 @@ class CheckerCopy(Checker): """""" def checker( - self, - source_path: AlistPath, - target_path: AlistPath, + self, source_stat: SyncRawItem, target_stat: SyncRawItem ) -> "Worker|None": - if not target_path.exists(): + if not target_stat.exists(): logger.info( - f"Checked: [COPY] {source_path.as_uri()} -> {target_path.as_uri()}" + f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}" ) - return Worker( - type="copy", - need_backup=False, - source_path=source_path, - target_path=target_path, + return self.create_worker( + type_="copy", + source_path=source_stat.path, + target_path=target_stat.path, ) - logger.info(f"Checked: [JUMP] {source_path.as_uri()}") + + logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}") return None class CheckerMirror(Checker): """""" - def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None": - _main = self.sync_group.group[0] - # target如果是主存储器 - 且target不存在,source存在,删除source - if target_path == _main and not target_path.exists() and source_path.exists(): - return Worker( - type="delete", - need_backup=self.sync_group.need_backup, - backup_dir=self.get_backup_dir(source_path), - target_path=source_path, - ) - if not target_path.exists(): - return Worker( - type="copy", - need_backup=False, - source_path=source_path, - target_path=target_path, - ) + def checker( + self, source_stat: SyncRawItem, target_stat: SyncRawItem + ) -> "Worker|None": + # _main = self.sync_group.group[0] + # # target如果是主存储器 - 且target不存在,source存在,删除source + # if target_stat == _main and not target_stat.exists() and source_stat.exists(): + # return Worker( + # type="delete", + # need_backup=self.sync_group.need_backup, + # backup_dir=self.get_backup_dir(source_stat.path), + # target_path=source_stat.path, + # ) + # if not target_stat.exists(): + # return Worker( + # type="copy", + # need_backup=False, + # source_path=source_stat.path, + # target_path=target_stat.path, + # ) return None diff --git a/alist_sync/d_main.py b/alist_sync/d_main.py index 8c6a755..2f87181 100644 --- a/alist_sync/d_main.py +++ b/alist_sync/d_main.py @@ -2,6 +2,8 @@ """ """ +import collections +import fnmatch import logging import threading import time @@ -16,6 +18,7 @@ from alist_sync.d_worker import Workers from alist_sync.thread_pool import MyThreadPoolExecutor from alist_sync.config import SyncGroup, create_config, AlistServer +from alist_sync.common import beautify_size, all_thread_name from alist_sync.d_checker import get_checker sync_config = create_config() @@ -28,21 +31,50 @@ def login_alist(server: AlistServer): logger.info("Login: %s[%s] Success.", _c.base_url, _c.login_username) -def scaner(url: AlistPath, _queue): +def _make_ignore(_sync_group): + @lru_cache(64) + def split_path(_sync_group, path: AlistPath) -> tuple[AlistPath, str]: + """将Path切割为sync_dir和相对路径""" + for sr in _sync_group.group: + try: + re_path = path.relative_to(sr).rstrip("./") + if re_path.startswith("./"): + re_path = re_path[2:] + return sr, re_path + except ValueError: + pass + raise ValueError() + + def __ignore(relative_path) -> bool: + _, relative_path = split_path(_sync_group, relative_path) + for _i in _sync_group.blacklist: + if fnmatch.fnmatchcase(relative_path, _i): + logger.debug("Ignore: %s, [matched: %s]", relative_path, _i) + return True + return False + + return __ignore + + +def scaner(url: AlistPath, _queue, i_func: Callable[[str | AlistPath], bool] = None): def _scaner(_url: AlistPath, _s_num): """ """ - _s_num.append(1) + if i_func is not None and i_func(_url): + return logger.debug(f"Scaner: {_url}") try: + _s_num.append(1) for item in _url.iterdir(): if item.is_file(): - logger.debug(f"find file: {item}") + logger.debug(f"Find File: {item}") _queue.put(item) elif item.is_dir(): pool.submit(_scaner, item, _s_num) except alist_sdk.AlistError: pass - except Exception: + except Exception as _e: + logger.error("Scaner Error: %s", _e, exc_info=_e) + finally: _s_num.pop() assert url.exists(), f"目录不存在{url.as_uri()}" @@ -50,8 +82,15 @@ def _scaner(_url: AlistPath, _s_num): s_sum = [] with MyThreadPoolExecutor(5, thread_name_prefix=f"scaner_{url.as_uri()}") as pool: pool.submit(_scaner, url, s_sum) + time.sleep(5) while s_sum: time.sleep(2) + logger.debug( + "Scaner Size: %s, %d, Threads[%s]", + url, + len(s_sum), + all_thread_name(), + ) def checker(sync_group: SyncGroup, _queue_worker: Queue) -> threading.Thread | None: @@ -71,12 +110,13 @@ def checker(sync_group: SyncGroup, _queue_worker: Queue) -> threading.Thread | N _ct = get_checker(sync_group.type)(sync_group, _queue_scaner, _queue_worker).start() _sign = ["copy"] + __ignore = _make_ignore(sync_group) if sync_group.type in _sign: logger.debug(f"Copy 只需要扫描 {sync_group.group[0].as_uri() = }") - scaner(sync_group.group[0], _queue_scaner) + scaner(sync_group.group[0], _queue_scaner, __ignore) else: for uri in sync_group.group: - scaner(uri, _queue_scaner) + scaner(uri, _queue_scaner, __ignore) return _ct @@ -92,33 +132,67 @@ def main(): _tw.join() +def main_check(): + def _checker(_queue_worker: Queue): + """检查队列""" + total_size = 0 + while True: + try: + worker = _queue_worker.get() + if worker is None: + break + rest[worker.group_name][worker.relative_path] = ( + worker.type, + worker.source_path.as_uri().replace(worker.relative_path, ""), + worker.target_path.as_uri().replace(worker.relative_path, ""), + beautify_size(worker.file_size), + ) + total_size += worker.file_size + except Exception as e: + logger.error(f"Main Checker Error: {e}", exc_info=e) + + sync_config.daemon = False + queue_worker = Queue() + rest = collections.defaultdict(dict) + _tc = threading.Thread(target=_checker, args=(queue_worker,)) + _tc.start() + for sync_group in sync_config.sync_groups: + cc = checker(sync_group, queue_worker) + cc.join() + queue_worker.put(None) + _tc.join() + + from rich.console import Console + from rich.table import Table + + table = Table(title="Sync Info") + table.add_column("Group") + table.add_column("Type") + table.add_column("Path") + table.add_column("Source") + table.add_column("Target") + table.add_column("Size") + + logger.info("Creating Table...") + for group, g_items in rest.items(): + g_items = sorted(g_items.items(), key=lambda x: x[1][1]) + for relation_path, values in g_items: + # values: type, source, target, size + table.add_row(relation_path, *values) + table.add_row(end_section=True) + + console = Console(record=True, width=180) + time.sleep(2) + console.print(table) + console.save_html("sync_info.html", clear=False) + console.save_svg("sync_info.svg") + + def main_debug(): """""" - import fnmatch _tw = Workers() - def _make_ignore(_sync_group): - @lru_cache(64) - def split_path(_sync_group, path: AlistPath) -> tuple[AlistPath, str]: - """将Path切割为sync_dir和相对路径""" - for sr in _sync_group.group: - try: - return sr, path.relative_to(sr) - except ValueError: - pass - raise ValueError() - - def __ignore(relative_path) -> bool: - _, relative_path = split_path(_sync_group, relative_path) - for _i in _sync_group.blacklist: - if fnmatch.fnmatchcase(relative_path, _i): - logger.debug("Ignore: %s, [matched: %s]", relative_path, _i) - return True - return False - - return __ignore - def iter_file(url, i_func: Callable[[str], bool] | None = None): if i_func is not None and i_func(url): return diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index d443ad7..fa4f7dc 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -21,11 +21,12 @@ sync_config = create_config() -WorkerType = ["delete", "copy"] +WorkerType = ("delete", "copy") + # noinspection PyTypeHints,PyCompatibility WorkerTypeModify = Literal[*WorkerType] -WorkerStatus = [ +WorkerStatus = ( "init", "deleted", "back-upped", @@ -34,9 +35,9 @@ "copied", "done", "failed", -] +) # noinspection PyTypeHints,PyCompatibility -WorkerStatusModify = Literal[*WorkerTypeModify] +WorkerStatusModify = Literal[*WorkerStatus] logger = logging.getLogger("alist-sync.worker") @@ -48,12 +49,15 @@ # noinspection PyTypeHints class Worker(BaseModel): owner: str = sync_config.name + group_name: str = None + created_at: datetime.datetime = datetime.datetime.now() done_at: datetime.datetime | None = None type: WorkerTypeModify need_backup: bool backup_dir: AbsAlistPathType | None = None + relative_path: str | None = None source_path: AbsAlistPathType | None = None target_path: AbsAlistPathType # 永远只操作Target文件,删除也是作为Target status: WorkerStatusModify = "init" @@ -65,25 +69,18 @@ class Worker(BaseModel): model_config = { "arbitrary_types_allowed": True, - "excludes": { - "workers", - "collection", - }, + "excludes": {"workers", "collection", "tmp_file"}, } def __init__(self, **data: Any): super().__init__(**data) - logger.info(f"Worker[{self.short_id}] Created: {self.__repr__()}") + logger.info( + f"Worker[{self.short_id}] Created: " f"{self.model_dump_json(indent=2)}" + ) def __repr__(self): return f" {self.target_path}>" - def __del__(self): - try: - self.tmp_file.unlink(missing_ok=True) - finally: - pass - @computed_field() @property def file_size(self) -> int | None: @@ -100,6 +97,7 @@ def id(self) -> str: def short_id(self) -> str: return self.id[:8] + @computed_field() @property def tmp_file(self) -> Path: return sync_config.cache_dir.joinpath(f"download_tmp_{sha1(self.source_path)}") @@ -123,6 +121,7 @@ def update(self, **field: Any): f"平均传输速度: " f"{transfer_speed(self.file_size, self.done_at, self.created_at)}" ) + self.tmp_file.unlink(missing_ok=True) return sync_config.handle.delete_worker(self.id) return sync_config.handle.update_worker(self, *field.keys()) @@ -160,7 +159,6 @@ def __retry( *args, **kwargs, ): - while retry > 0: try: return func(*args, **kwargs) @@ -334,7 +332,7 @@ def __error_exec(self, _e: Exception): class Workers: def __init__(self): self.thread_pool = MyThreadPoolExecutor( - 5, + 20, "worker_", ) @@ -383,7 +381,7 @@ def run(self, queue: Queue): ) self.thread_pool.shutdown(wait=True, cancel_futures=False) logger.info(f"循环线程退出 - {threading.current_thread().name}") - break + return try: _started = True diff --git a/bootstrap.sh b/bootstrap.sh index 598a9e9..c651cbf 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -16,7 +16,7 @@ all_clear() { case $1 in install) pip install -U pip - pip install git+https://github.com/lee-cq/alist-sdk --no-cache-dir --force-reinstall + pip install -U git+https://github.com/lee-cq/alist-sdk --no-cache-dir --force-reinstall pip install -e . ;; @@ -39,6 +39,12 @@ clear) all_clear ;; +clear-log ) + rm -rf logs/* + rm -rf alist/data/log/* + ./bootstrap.sh alist restart + ;; + test) whereis pytest || pip install pytest clear