From a314ffe4d97f59a4937f351a39217ff009940f45 Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Mon, 4 Mar 2024 16:31:04 +0800 Subject: [PATCH 1/7] =?UTF-8?q?=E7=BC=93=E5=AD=98AlistPath.stat()=20?= =?UTF-8?q?=E4=BB=A5=E5=87=8F=E5=B0=91=E7=BD=91=E7=BB=9C=E7=BD=91=E7=BB=9C?= =?UTF-8?q?=E8=AE=BF=E9=97=AE,=20=E6=B7=BB=E5=8A=A0Worker=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E8=87=B320?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/alist-sync.yaml | 6 +-- alist_sync/d_checker.py | 76 ++++++++++++++++++++----------- alist_sync/d_worker.py | 2 +- bootstrap.sh | 2 +- 4 files changed, 53 insertions(+), 33 deletions(-) diff --git a/.github/workflows/alist-sync.yaml b/.github/workflows/alist-sync.yaml index 96ac574..ded03d2 100644 --- a/.github/workflows/alist-sync.yaml +++ b/.github/workflows/alist-sync.yaml @@ -1,5 +1,5 @@ name: Alist Sync - +run-name: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}" on: workflow_dispatch: inputs: @@ -17,7 +17,7 @@ on: default: false env: - _ALIST_SYNC_NAME: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}" + _ALIST_SYNC_NAME: "${{github.run-name}}" _ALIST_SYNC_DEBUG: ${{ github.event.inputs.debug }} _ALIST_ADMIN_PASSWORD: ${{ secrets.ALIST_ADMIN_PASSWORD }} @@ -102,5 +102,3 @@ jobs: - name: Debugger if: ${{ github.event.inputs.debug == 'true' && failure() }} uses: csexton/debugger-action@master - - diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index 144388d..bc58b75 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -14,7 +14,8 @@ from typing import Iterator from functools import lru_cache -from alist_sdk import AlistPath +from alist_sdk import AlistPath, RawItem, AlistPathType +from pydantic import BaseModel from alist_sync.config import create_config, SyncGroup from alist_sync.d_worker import Worker @@ -26,6 +27,14 @@ sync_config = create_config() +class SyncRawItem(BaseModel): + path: AlistPathType | None + stat: RawItem + + def exists(self): + return self.stat is not None + + class Checker: def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Queue): self.sync_group: SyncGroup = sync_group @@ -52,7 +61,20 @@ def split_path(self, path: AlistPath) -> tuple[AlistPath, str]: def get_backup_dir(self, path) -> AlistPath: return self.split_path(path)[0].joinpath(self.sync_group.backup_dir) - def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None": + @lru_cache(40_000) + def get_stat(self, path: AlistPath) -> SyncRawItem: + try: + stat = path.re_stat() + except FileNotFoundError: + stat = None + + return SyncRawItem(path=path, stat=stat) + + def checker( + self, + source_stat: SyncRawItem, + target_stat: SyncRawItem, + ) -> "Worker|None": raise NotImplementedError def ignore(self, relative_path) -> bool: @@ -113,44 +135,44 @@ class CheckerCopy(Checker): """""" def checker( - self, - source_path: AlistPath, - target_path: AlistPath, + self, source_stat: SyncRawItem, target_stat: SyncRawItem ) -> "Worker|None": - if not target_path.exists(): + if not target_stat.exists(): logger.info( - f"Checked: [COPY] {source_path.as_uri()} -> {target_path.as_uri()}" + f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}" ) return Worker( type="copy", need_backup=False, - source_path=source_path, - target_path=target_path, + source_path=source_stat.path, + target_path=target_stat.path, ) - logger.info(f"Checked: [JUMP] {source_path.as_uri()}") + logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}") return None class CheckerMirror(Checker): """""" - def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None": - _main = self.sync_group.group[0] - # target如果是主存储器 - 且target不存在,source存在,删除source - if target_path == _main and not target_path.exists() and source_path.exists(): - return Worker( - type="delete", - need_backup=self.sync_group.need_backup, - backup_dir=self.get_backup_dir(source_path), - target_path=source_path, - ) - if not target_path.exists(): - return Worker( - type="copy", - need_backup=False, - source_path=source_path, - target_path=target_path, - ) + def checker( + self, source_stat: SyncRawItem, target_stat: SyncRawItem + ) -> "Worker|None": + # _main = self.sync_group.group[0] + # # target如果是主存储器 - 且target不存在,source存在,删除source + # if target_stat == _main and not target_stat.exists() and source_stat.exists(): + # return Worker( + # type="delete", + # need_backup=self.sync_group.need_backup, + # backup_dir=self.get_backup_dir(source_stat.path), + # target_path=source_stat.path, + # ) + # if not target_stat.exists(): + # return Worker( + # type="copy", + # need_backup=False, + # source_path=source_stat.path, + # target_path=target_stat.path, + # ) return None diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index acea0a2..1de7224 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -336,7 +336,7 @@ def __error_exec(self, _e: Exception): class Workers: def __init__(self): self.thread_pool = MyThreadPoolExecutor( - 5, + 20, "worker_", ) diff --git a/bootstrap.sh b/bootstrap.sh index 598a9e9..ffa91cf 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -16,7 +16,7 @@ all_clear() { case $1 in install) pip install -U pip - pip install git+https://github.com/lee-cq/alist-sdk --no-cache-dir --force-reinstall + pip install -U git+https://github.com/lee-cq/alist-sdk --no-cache-dir --force-reinstall pip install -e . ;; From f78dbbe61be34d2c6750e2d42530d44afb7aeae8 Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Mon, 4 Mar 2024 17:28:56 +0800 Subject: [PATCH 2/7] =?UTF-8?q?BUGFIX=20&&=20=E6=B7=BB=E5=8A=A0=E6=97=A5?= =?UTF-8?q?=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/alist-sync.yaml | 4 ++++ alist_sync/d_checker.py | 9 ++++++++- alist_sync/d_main.py | 2 +- alist_sync/d_worker.py | 10 ++++++---- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/.github/workflows/alist-sync.yaml b/.github/workflows/alist-sync.yaml index ded03d2..75614b2 100644 --- a/.github/workflows/alist-sync.yaml +++ b/.github/workflows/alist-sync.yaml @@ -67,6 +67,7 @@ jobs: _RELOAD_STORAGE: ${{ github.event.inputs.reload_storage }} run: | + echo RUNNER = ${_ALIST_SYNC_NAME} # 这将会导入全部的内容包括:设置,元数据,用户,存储器。 echo $(pwd) cat > alist-backup-config.json << EOF @@ -76,6 +77,8 @@ jobs: - name: Create Tunnel for Cloudflare run: | + echo RUNNER = ${_ALIST_SYNC_NAME} + test ! -n "${{secrets.CLOUDFLARE_TUNNEL_TOKEN}}" && { echo "CLOUDFLARE_TUNNEL_TOKEN is not set. Skip Cloudflare Tunnel Installation." exit 0 @@ -89,6 +92,7 @@ jobs: - name: RUN Alist Sync run: | + echo RUNNER = ${_ALIST_SYNC_NAME} cat > config.yaml << EOF ${{ secrets.SYNC_CONFIG }} EOF diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index bc58b75..224beab 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -34,6 +34,12 @@ class SyncRawItem(BaseModel): def exists(self): return self.stat is not None + def is_file(self): + return not self.stat.is_dir + + def is_dir(self): + return self.stat.is_dir + class Checker: def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Queue): @@ -116,7 +122,8 @@ def main(self): try: _started = True - self._t_checker(self.scaner_queue.get(timeout=3)) + path = self.scaner_queue.get(timeout=3) + self.pool.submit(self._t_checker, path) except Empty: if _started: continue diff --git a/alist_sync/d_main.py b/alist_sync/d_main.py index 8c6a755..4eba618 100644 --- a/alist_sync/d_main.py +++ b/alist_sync/d_main.py @@ -36,7 +36,7 @@ def _scaner(_url: AlistPath, _s_num): try: for item in _url.iterdir(): if item.is_file(): - logger.debug(f"find file: {item}") + logger.debug(f"Find File: {item}") _queue.put(item) elif item.is_dir(): pool.submit(_scaner, item, _s_num) diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index 1de7224..7ee052e 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -1,4 +1,3 @@ - import atexit import datetime import logging @@ -70,12 +69,15 @@ class Worker(BaseModel): "excludes": { "workers", "collection", + "tmp_file" }, } def __init__(self, **data: Any): super().__init__(**data) - logger.info(f"Worker[{self.short_id}] Created: {self.__repr__()}") + logger.info( + f"Worker[{self.short_id}] Created: " f"{self.model_dump_json(indent=2)}" + ) def __repr__(self): return f" {self.target_path}>" @@ -102,6 +104,7 @@ def id(self) -> str: def short_id(self) -> str: return self.id[:8] + @computed_field() @property def tmp_file(self) -> Path: return sync_config.cache_dir.joinpath(f"download_tmp_{sha1(self.source_path)}") @@ -162,7 +165,6 @@ def __retry( *args, **kwargs, ): - while retry > 0: try: return func(*args, **kwargs) @@ -385,7 +387,7 @@ def run(self, queue: Queue): ) self.thread_pool.shutdown(wait=True, cancel_futures=False) logger.info(f"循环线程退出 - {threading.current_thread().name}") - break + return try: _started = True From e808e16f1c528fc81f973d4881c1f53487812bbf Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Mon, 4 Mar 2024 17:44:36 +0800 Subject: [PATCH 3/7] =?UTF-8?q?=E5=A4=9A=E7=BA=BF=E7=A8=8B=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=BF=BD=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- alist_sync/d_main.py | 54 ++++++++++++++++++++++++-------------------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/alist_sync/d_main.py b/alist_sync/d_main.py index 4eba618..ef64446 100644 --- a/alist_sync/d_main.py +++ b/alist_sync/d_main.py @@ -2,6 +2,7 @@ """ """ +import fnmatch import logging import threading import time @@ -28,9 +29,33 @@ def login_alist(server: AlistServer): logger.info("Login: %s[%s] Success.", _c.base_url, _c.login_username) -def scaner(url: AlistPath, _queue): +def _make_ignore(_sync_group): + @lru_cache(64) + def split_path(_sync_group, path: AlistPath) -> tuple[AlistPath, str]: + """将Path切割为sync_dir和相对路径""" + for sr in _sync_group.group: + try: + return sr, path.relative_to(sr) + except ValueError: + pass + raise ValueError() + + def __ignore(relative_path) -> bool: + _, relative_path = split_path(_sync_group, relative_path) + for _i in _sync_group.blacklist: + if fnmatch.fnmatchcase(relative_path, _i): + logger.debug("Ignore: %s, [matched: %s]", relative_path, _i) + return True + return False + + return __ignore + + +def scaner(url: AlistPath, _queue, i_func: Callable[[str | AlistPath], bool] = None): def _scaner(_url: AlistPath, _s_num): """ """ + if i_func is not None and i_func(url): + return _s_num.append(1) logger.debug(f"Scaner: {_url}") try: @@ -71,12 +96,13 @@ def checker(sync_group: SyncGroup, _queue_worker: Queue) -> threading.Thread | N _ct = get_checker(sync_group.type)(sync_group, _queue_scaner, _queue_worker).start() _sign = ["copy"] + __ignore = _make_ignore(sync_group) if sync_group.type in _sign: logger.debug(f"Copy 只需要扫描 {sync_group.group[0].as_uri() = }") - scaner(sync_group.group[0], _queue_scaner) + scaner(sync_group.group[0], _queue_scaner, __ignore) else: for uri in sync_group.group: - scaner(uri, _queue_scaner) + scaner(uri, _queue_scaner, __ignore) return _ct @@ -94,31 +120,9 @@ def main(): def main_debug(): """""" - import fnmatch _tw = Workers() - def _make_ignore(_sync_group): - @lru_cache(64) - def split_path(_sync_group, path: AlistPath) -> tuple[AlistPath, str]: - """将Path切割为sync_dir和相对路径""" - for sr in _sync_group.group: - try: - return sr, path.relative_to(sr) - except ValueError: - pass - raise ValueError() - - def __ignore(relative_path) -> bool: - _, relative_path = split_path(_sync_group, relative_path) - for _i in _sync_group.blacklist: - if fnmatch.fnmatchcase(relative_path, _i): - logger.debug("Ignore: %s, [matched: %s]", relative_path, _i) - return True - return False - - return __ignore - def iter_file(url, i_func: Callable[[str], bool] | None = None): if i_func is not None and i_func(url): return From 71f8c3b78e683a1147f6d9e0607e9f8240bdea55 Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Mon, 4 Mar 2024 17:57:56 +0800 Subject: [PATCH 4/7] BUGFIX --- alist_sync/d_checker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index 224beab..83ac61b 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -99,7 +99,7 @@ def checker_every_dir(self, path) -> Iterator[Worker | None]: if _sd == _sync_dir: continue target_path = _sd.joinpath(_relative_path) - yield self.checker(path, target_path) + yield self.checker(self.get_stat(path), self.get_stat(target_path)) def _t_checker(self, path): for _c in self.checker_every_dir(path): From 1dff72887550522ba21743ce4886a39481259c2f Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Mon, 4 Mar 2024 18:21:12 +0800 Subject: [PATCH 5/7] BUGFIX --- alist_sync/d_checker.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index 83ac61b..6848ae5 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -92,8 +92,7 @@ def ignore(self, relative_path) -> bool: def checker_every_dir(self, path) -> Iterator[Worker | None]: _sync_dir, _relative_path = self.split_path(path) - # if self.ignore(_relative_path): - # return + logger.debug(f"Checking [{_relative_path}] in {self.sync_group.group}") for _sd in self.sync_group.group: _sd: AlistPath if _sd == _sync_dir: @@ -102,9 +101,12 @@ def checker_every_dir(self, path) -> Iterator[Worker | None]: yield self.checker(self.get_stat(path), self.get_stat(target_path)) def _t_checker(self, path): - for _c in self.checker_every_dir(path): - if _c: - self.worker_queue.put(_c) + try: + for _c in self.checker_every_dir(path): + if _c: + self.worker_queue.put(_c) + except Exception as _e: + logger.error("Checker Error: ", exc_info=_e) def main(self): """""" From 890c7e22393d0894327d64eaec5e2322406be8ba Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Tue, 5 Mar 2024 00:10:46 +0800 Subject: [PATCH 6/7] =?UTF-8?q?=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90Check?= =?UTF-8?q?=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- alist_sync/__main__.py | 15 +++++++++++ alist_sync/d_checker.py | 35 +++++++++++++++++++------- alist_sync/d_main.py | 56 ++++++++++++++++++++++++++++++++++++++--- alist_sync/d_worker.py | 9 +++---- 4 files changed, 97 insertions(+), 18 deletions(-) diff --git a/alist_sync/__main__.py b/alist_sync/__main__.py index c50c33c..c5117c1 100644 --- a/alist_sync/__main__.py +++ b/alist_sync/__main__.py @@ -64,6 +64,21 @@ def sync( return main() +@app.command("check") +def check( + config_file: str = Option(None, "--config", "-c", help="配置文件路径"), +): + """检查任务""" + from alist_sync.config import create_config + from alist_sync.d_main import main_check + + if config_file and Path(config_file).exists(): + os.environ["ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute()) + os.environ["_ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute()) + create_config() + return main_check() + + @app.command("get-info") def cli_get(path: str): """""" diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index 6848ae5..0e45a63 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -49,6 +49,7 @@ def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Que self.conflict: set = set() self.pool = MyThreadPoolExecutor(10) + self.stat_sq = threading.Semaphore(3) self.main_thread = threading.Thread( target=self.main, name=f"checker_main[{self.sync_group.name}-{self.__class__.__name__}]", @@ -67,14 +68,30 @@ def split_path(self, path: AlistPath) -> tuple[AlistPath, str]: def get_backup_dir(self, path) -> AlistPath: return self.split_path(path)[0].joinpath(self.sync_group.backup_dir) + def create_worker(self, type_: str, source_path: AlistPath, target_path: AlistPath): + return Worker( + type=type_, + group_name=self.sync_group.name, + need_backup=self.sync_group.need_backup, + backup_dir=self.get_backup_dir(source_path), + relative_path=self.split_path(source_path)[1], + source_path=source_path, + target_path=target_path, + ) + + _stat_get_times = 0 + @lru_cache(40_000) def get_stat(self, path: AlistPath) -> SyncRawItem: - try: - stat = path.re_stat() - except FileNotFoundError: - stat = None - - return SyncRawItem(path=path, stat=stat) + # BUGFIX 控制QPS而不时并发 + with self.stat_sq: + self._stat_get_times += 1 + logger.debug("get_stat: %s, times: %d", path, self._stat_get_times) + try: + stat = path.stat() + except FileNotFoundError: + stat = None + return SyncRawItem(path=path, stat=stat) def checker( self, @@ -150,12 +167,12 @@ def checker( logger.info( f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}" ) - return Worker( - type="copy", - need_backup=False, + return self.create_worker( + type_="copy", source_path=source_stat.path, target_path=target_stat.path, ) + logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}") return None diff --git a/alist_sync/d_main.py b/alist_sync/d_main.py index ef64446..23b756e 100644 --- a/alist_sync/d_main.py +++ b/alist_sync/d_main.py @@ -2,6 +2,7 @@ """ """ +import collections import fnmatch import logging import threading @@ -35,7 +36,10 @@ def split_path(_sync_group, path: AlistPath) -> tuple[AlistPath, str]: """将Path切割为sync_dir和相对路径""" for sr in _sync_group.group: try: - return sr, path.relative_to(sr) + re_path = path.relative_to(sr).rstrip("./") + if re_path.startswith("./"): + re_path = re_path[2:] + return sr, re_path except ValueError: pass raise ValueError() @@ -54,7 +58,7 @@ def __ignore(relative_path) -> bool: def scaner(url: AlistPath, _queue, i_func: Callable[[str | AlistPath], bool] = None): def _scaner(_url: AlistPath, _s_num): """ """ - if i_func is not None and i_func(url): + if i_func is not None and i_func(_url): return _s_num.append(1) logger.debug(f"Scaner: {_url}") @@ -65,10 +69,9 @@ def _scaner(_url: AlistPath, _s_num): _queue.put(item) elif item.is_dir(): pool.submit(_scaner, item, _s_num) + _s_num.pop() except alist_sdk.AlistError: pass - except Exception: - _s_num.pop() assert url.exists(), f"目录不存在{url.as_uri()}" @@ -118,6 +121,51 @@ def main(): _tw.join() +def main_check(): + def _checker(_queue_worker: Queue): + """检查队列""" + while True: + try: + worker = _queue_worker.get() + if worker is None: + break + rest[worker.group_name][worker.relative_path] = ( + worker.type, + worker.source_path.as_uri().replace(worker.relative_path), + worker.target_path.as_uri().replace(worker.relative_path), + worker.file_size, + ) + except Exception as e: + logger.error(e) + + sync_config.daemon = False + queue_worker = Queue() + rest = collections.defaultdict(dict) + _tc = threading.Thread(target=_checker, args=(queue_worker,)) + _tc.start() + for sync_group in sync_config.sync_groups: + checker(sync_group, queue_worker) + queue_worker.put(None) + _tc.join() + + from rich.table import Table + + table = Table(title="Sync Info") + table.add_column("Group") + table.add_column("Type") + table.add_column("Path") + table.add_column("Source") + table.add_column("Target") + table.add_column("Size") + + for group, g_items in rest.items(): + g_items = sorted(g_items.items(), key=lambda x: x[1][1]) + for relation_path, values in g_items: + # values: type, source, target, size + table.add_row(relation_path, *values) + table.add_row(end_section=True) + + def main_debug(): """""" diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index 7ee052e..57844fc 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -49,12 +49,15 @@ # noinspection PyTypeHints class Worker(BaseModel): owner: str = sync_config.name + group_name: str = None + created_at: datetime.datetime = datetime.datetime.now() done_at: datetime.datetime | None = None type: WorkerTypeModify need_backup: bool backup_dir: AbsAlistPathType | None = None + relative_path: str | None = None source_path: AbsAlistPathType | None = None target_path: AbsAlistPathType # 永远只操作Target文件,删除也是作为Target status: WorkerStatusModify = "init" @@ -66,11 +69,7 @@ class Worker(BaseModel): model_config = { "arbitrary_types_allowed": True, - "excludes": { - "workers", - "collection", - "tmp_file" - }, + "excludes": {"workers", "collection", "tmp_file"}, } def __init__(self, **data: Any): From ca93b03754767d34bbd28df5a96a711524024dac Mon Sep 17 00:00:00 2001 From: LeeCQ Date: Sun, 10 Mar 2024 22:02:35 +0800 Subject: [PATCH 7/7] =?UTF-8?q?=E6=94=AF=E6=8C=81check=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + alist_sync/common.py | 22 +++++++++++----------- alist_sync/d_checker.py | 17 +++++++++++++---- alist_sync/d_main.py | 36 +++++++++++++++++++++++++++++------- alist_sync/d_worker.py | 23 +++++++++-------------- bootstrap.sh | 6 ++++++ 6 files changed, 69 insertions(+), 36 deletions(-) diff --git a/.gitignore b/.gitignore index 77eb48e..2a116b2 100644 --- a/.gitignore +++ b/.gitignore @@ -13,6 +13,7 @@ config.yaml alist_sync/.alist-sync-cache/ alist/ tools/alist/ +logs/ # C extensions *.so diff --git a/alist_sync/common.py b/alist_sync/common.py index 55301ac..fba9fbb 100644 --- a/alist_sync/common.py +++ b/alist_sync/common.py @@ -95,17 +95,17 @@ def timeout_input(msg, default, timeout=3): return default -def beautify_size(speed: float): - if speed < 1024: - return f"{speed:.2f}B" - speed /= 1024 - if speed < 1024: - return f"{speed:.2f}KB" - speed /= 1024 - if speed < 1024: - return f"{speed:.2f}MB" - speed /= 1024 - return f"{speed:.2f}GB" +def beautify_size(byte_size: float): + if byte_size < 1024: + return f"{byte_size:.2f}B" + byte_size /= 1024 + if byte_size < 1024: + return f"{byte_size:.2f}KB" + byte_size /= 1024 + if byte_size < 1024: + return f"{byte_size:.2f}MB" + byte_size /= 1024 + return f"{byte_size:.2f}GB" def transfer_speed(size, start: datetime.datetime, end: datetime.datetime) -> str: diff --git a/alist_sync/d_checker.py b/alist_sync/d_checker.py index 0e45a63..975f169 100644 --- a/alist_sync/d_checker.py +++ b/alist_sync/d_checker.py @@ -6,6 +6,7 @@ @Date-Time : 2024/2/25 21:17 """ +import datetime import fnmatch import logging import threading @@ -14,7 +15,7 @@ from typing import Iterator from functools import lru_cache -from alist_sdk import AlistPath, RawItem, AlistPathType +from alist_sdk import AlistPath, RawItem, AlistPathType, Item from pydantic import BaseModel from alist_sync.config import create_config, SyncGroup @@ -29,7 +30,8 @@ class SyncRawItem(BaseModel): path: AlistPathType | None - stat: RawItem + scan_time: datetime.datetime = datetime.datetime.now() + stat: RawItem | Item | None def exists(self): return self.stat is not None @@ -49,7 +51,7 @@ def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Que self.conflict: set = set() self.pool = MyThreadPoolExecutor(10) - self.stat_sq = threading.Semaphore(3) + self.stat_sq = threading.Semaphore(4) self.main_thread = threading.Thread( target=self.main, name=f"checker_main[{self.sync_group.name}-{self.__class__.__name__}]", @@ -88,7 +90,9 @@ def get_stat(self, path: AlistPath) -> SyncRawItem: self._stat_get_times += 1 logger.debug("get_stat: %s, times: %d", path, self._stat_get_times) try: - stat = path.stat() + stat = path.client.dict_files_items( + path.parent.as_posix(), cache_empty=True + ).get(path.name) except FileNotFoundError: stat = None return SyncRawItem(path=path, stat=stat) @@ -144,6 +148,11 @@ def main(self): path = self.scaner_queue.get(timeout=3) self.pool.submit(self._t_checker, path) except Empty: + logger.debug( + "Checker Size: %s, %d", + self.sync_group.name, + self.pool.work_qsize(), + ) if _started: continue logger.info( diff --git a/alist_sync/d_main.py b/alist_sync/d_main.py index 23b756e..2f87181 100644 --- a/alist_sync/d_main.py +++ b/alist_sync/d_main.py @@ -18,6 +18,7 @@ from alist_sync.d_worker import Workers from alist_sync.thread_pool import MyThreadPoolExecutor from alist_sync.config import SyncGroup, create_config, AlistServer +from alist_sync.common import beautify_size, all_thread_name from alist_sync.d_checker import get_checker sync_config = create_config() @@ -60,26 +61,36 @@ def _scaner(_url: AlistPath, _s_num): """ """ if i_func is not None and i_func(_url): return - _s_num.append(1) logger.debug(f"Scaner: {_url}") try: + _s_num.append(1) for item in _url.iterdir(): if item.is_file(): logger.debug(f"Find File: {item}") _queue.put(item) elif item.is_dir(): pool.submit(_scaner, item, _s_num) - _s_num.pop() except alist_sdk.AlistError: pass + except Exception as _e: + logger.error("Scaner Error: %s", _e, exc_info=_e) + finally: + _s_num.pop() assert url.exists(), f"目录不存在{url.as_uri()}" s_sum = [] with MyThreadPoolExecutor(5, thread_name_prefix=f"scaner_{url.as_uri()}") as pool: pool.submit(_scaner, url, s_sum) + time.sleep(5) while s_sum: time.sleep(2) + logger.debug( + "Scaner Size: %s, %d, Threads[%s]", + url, + len(s_sum), + all_thread_name(), + ) def checker(sync_group: SyncGroup, _queue_worker: Queue) -> threading.Thread | None: @@ -124,6 +135,7 @@ def main(): def main_check(): def _checker(_queue_worker: Queue): """检查队列""" + total_size = 0 while True: try: worker = _queue_worker.get() @@ -131,12 +143,13 @@ def _checker(_queue_worker: Queue): break rest[worker.group_name][worker.relative_path] = ( worker.type, - worker.source_path.as_uri().replace(worker.relative_path), - worker.target_path.as_uri().replace(worker.relative_path), - worker.file_size, + worker.source_path.as_uri().replace(worker.relative_path, ""), + worker.target_path.as_uri().replace(worker.relative_path, ""), + beautify_size(worker.file_size), ) + total_size += worker.file_size except Exception as e: - logger.error(e) + logger.error(f"Main Checker Error: {e}", exc_info=e) sync_config.daemon = False queue_worker = Queue() @@ -144,10 +157,12 @@ def _checker(_queue_worker: Queue): _tc = threading.Thread(target=_checker, args=(queue_worker,)) _tc.start() for sync_group in sync_config.sync_groups: - checker(sync_group, queue_worker) + cc = checker(sync_group, queue_worker) + cc.join() queue_worker.put(None) _tc.join() + from rich.console import Console from rich.table import Table table = Table(title="Sync Info") @@ -158,6 +173,7 @@ def _checker(_queue_worker: Queue): table.add_column("Target") table.add_column("Size") + logger.info("Creating Table...") for group, g_items in rest.items(): g_items = sorted(g_items.items(), key=lambda x: x[1][1]) for relation_path, values in g_items: @@ -165,6 +181,12 @@ def _checker(_queue_worker: Queue): table.add_row(relation_path, *values) table.add_row(end_section=True) + console = Console(record=True, width=180) + time.sleep(2) + console.print(table) + console.save_html("sync_info.html", clear=False) + console.save_svg("sync_info.svg") + def main_debug(): """""" diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index 57844fc..954f5fa 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -7,7 +7,6 @@ from pathlib import Path from queue import Queue, Empty from typing import Literal, Any, Type -from typing_extensions import Unpack from pydantic import BaseModel, computed_field, Field from pymongo.collection import Collection @@ -22,11 +21,11 @@ sync_config = create_config() -WorkerType = ["delete", "copy"] -# noinspection PyTypeHints -WorkerTypeModify = Literal[Unpack[WorkerType]] +WorkerType = ("delete", "copy") +# noinspection PyTypeHints,PyCompatibility +WorkerTypeModify = Literal[*WorkerType] -WorkerStatus = [ +WorkerStatus = ( "init", "deleted", "back-upped", @@ -35,9 +34,10 @@ "copied", "done", "failed", -] -# noinspection PyTypeHints -WorkerStatusModify = Literal[Unpack[WorkerTypeModify]] +) + +# noinspection PyTypeHints,PyCompatibility +WorkerStatusModify = Literal[*WorkerStatus] logger = logging.getLogger("alist-sync.worker") @@ -81,12 +81,6 @@ def __init__(self, **data: Any): def __repr__(self): return f" {self.target_path}>" - def __del__(self): - try: - self.tmp_file.unlink(missing_ok=True) - finally: - pass - @computed_field() @property def file_size(self) -> int | None: @@ -127,6 +121,7 @@ def update(self, **field: Any): f"平均传输速度: " f"{transfer_speed(self.file_size, self.done_at, self.created_at)}" ) + self.tmp_file.unlink(missing_ok=True) return sync_config.handle.delete_worker(self.id) return sync_config.handle.update_worker(self, *field.keys()) diff --git a/bootstrap.sh b/bootstrap.sh index ffa91cf..c651cbf 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -39,6 +39,12 @@ clear) all_clear ;; +clear-log ) + rm -rf logs/* + rm -rf alist/data/log/* + ./bootstrap.sh alist restart + ;; + test) whereis pytest || pip install pytest clear