diff --git a/alist_sync/d_worker.py b/alist_sync/d_worker.py index c14035a..b531088 100644 --- a/alist_sync/d_worker.py +++ b/alist_sync/d_worker.py @@ -15,24 +15,28 @@ from alist_sync.config import create_config from alist_sync.common import sha1, prefix_in_threads -from alist_sync.err import WorkerError +from alist_sync.err import WorkerError, RetryError from alist_sync.thread_pool import MyThreadPoolExecutor from alist_sync.version import __version__ sync_config = create_config() -WorkerType = Literal["delete", "copy"] -WorkerStatus = Literal[ +WorkerType = ["delete", "copy"] +# noinspection PyTypeHints +WorkerTypeModify = Literal[*WorkerType] + +WorkerStatus = [ "init", "deleted", - "back-upping", "back-upped", - "downloading", - "uploading", + "downloaded", + "uploaded", "copied", "done", "failed", ] +# noinspection PyTypeHints +WorkerStatusModify = Literal[*WorkerTypeModify] logger = logging.getLogger("alist-sync.worker") @@ -46,13 +50,13 @@ class Worker(BaseModel): owner: str = sync_config.name created_at: datetime.datetime = datetime.datetime.now() done_at: datetime.datetime | None = None - type: WorkerType + type: WorkerTypeModify need_backup: bool backup_dir: AbsAlistPathType | None = None source_path: AbsAlistPathType | None = None target_path: AbsAlistPathType # 永远只操作Target文件,删除也是作为Target - status: WorkerStatus = "init" + status: WorkerStatusModify = "init" error_info: str | None = None # 私有属性 @@ -94,6 +98,8 @@ def tmp_file(self) -> Path: return sync_config.cache_dir.joinpath(f"download_tmp_{sha1(self.source_path)}") def update(self, **field: Any): + if (status := field.get("status", "init")) not in WorkerStatus: + raise ValueError(f"Unknown Status: {status}, allow: {WorkerStatus}.") if field: if field.keys() | self.__dict__.keys() != self.__dict__.keys(): raise KeyError() @@ -120,8 +126,6 @@ def backup(self): _backup_target_json = self.backup_dir.joinpath(_target_name + ".json") _old_info = _backup_file.stat().model_dump_json() - self.update(status="back-upping") - assert ( not _backup_target.exists() and not _backup_target_json.exists() ), "备份目标冲突" @@ -152,7 +156,7 @@ def __retry( f"Worker[{self.short_id}] Retry Error [{func.__name__}]: " f"{type(_e)} - {_e}" ) - raise _e + raise RetryError(f"Retry {func.__name__} Error.") from _e logger.warning( f"Worker[{self.short_id}] {retry = } [{func.__name__}]: " f"{type(_e)} - {_e}" @@ -169,6 +173,9 @@ def downloader(self): self.source_path.get_download_uri(), follow_redirects=True, ) as _res: + logger.debug( + f"Worker[{self.short_id}] Downloading from {self.source_path}" + ) for i in _res.iter_bytes(chunk_size=1024 * 1024): _tmp.write(i) assert ( @@ -207,11 +214,21 @@ def uploader(self): def copy_type(self): """复制任务""" logger.debug(f"Worker[{self.short_id}] Start Copping") + if self.status not in ["downloaded", "uploaded"]: + self.__retry( + 3, + (TimeoutException, AssertionError), + self.downloader, + ) - self.target_path.unlink(missing_ok=True) - self.target_path.parent.mkdir(parents=True, exist_ok=True) - self.__retry(3, (TimeoutException, AssertionError), self.downloader) - self.__retry(3, (TimeoutException, AssertionError), self.uploader) + if self.status != "uploaded": + self.target_path.unlink(missing_ok=True) + self.target_path.parent.mkdir(parents=True, exist_ok=True) + self.__retry( + 3, + (TimeoutException, AssertionError), + self.uploader, + ) return self.update(status="copied") @@ -246,18 +263,25 @@ def recheck(self) -> bool: else: raise ValueError(f"Unknown Worker Type {self.type}.") - def run(self): + def run(self, is_retry=False): """启动Worker""" - logger.info(f"worker[{self.short_id}] 已经开始工作.") - self.update() + if is_retry is False: + logger.info(f"worker[{self.short_id}] 已经开始工作.") + self.update() try: if self.status in ["done", "failed"]: + self.update() return if self.need_backup and self.status in ["init"]: self.backup() - if self.type == "copy" and self.status in ["init", "back-upped"]: + if self.type == "copy" and self.status in [ + "init", + "back-upped", + "downloaded", + "uploaded", + ]: self.copy_type() elif self.type == "delete" and self.status in ["init", "back-upped"]: @@ -265,18 +289,32 @@ def run(self): assert self.recheck() self.update(status=f"done") - except Exception as _e: - logger.error( - f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}", - exc_info=_e, - ) - self.error_info = ( - f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n" - f"{traceback.format_exc()}" + return + except TimeoutException as _e: + if is_retry: + return self.__error_exec(_e) + time.sleep(2) + return self.__retry( + 3, + (TimeoutException, AssertionError), + self.run, + True, ) - self.update(status="failed") - if sync_config.debug: - raise WorkerError from _e + except Exception as _e: + return self.__error_exec(_e) + + def __error_exec(self, _e: Exception): + logger.error( + f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}", + exc_info=_e, + ) + self.error_info = ( + f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n" + f"{traceback.format_exc()}" + ) + self.update(status="failed") + if sync_config.debug: + raise WorkerError from _e class Workers: diff --git a/alist_sync/err.py b/alist_sync/err.py index c27a156..843ea36 100644 --- a/alist_sync/err.py +++ b/alist_sync/err.py @@ -24,6 +24,10 @@ class WorkerError(AlistSyncError): pass +class RetryError(WorkerError): + pass + + class DownloaderError(WorkerError): pass diff --git a/bootstrap.sh b/bootstrap.sh index 60fee96..598a9e9 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -16,8 +16,8 @@ all_clear() { case $1 in install) pip install -U pip - pip install -e . pip install git+https://github.com/lee-cq/alist-sdk --no-cache-dir --force-reinstall + pip install -e . ;; alist) @@ -50,7 +50,7 @@ test) main ) shift - python -m alist_sync $@ + python -m alist_sync "$@" ;; debugger)