Skip to content

Commit

Permalink
添加超时重试
Browse files Browse the repository at this point in the history
  • Loading branch information
lee-cq committed Mar 3, 2024
1 parent d38ad48 commit d767018
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 32 deletions.
98 changes: 68 additions & 30 deletions alist_sync/d_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,28 @@

from alist_sync.config import create_config
from alist_sync.common import sha1, prefix_in_threads
from alist_sync.err import WorkerError
from alist_sync.err import WorkerError, RetryError
from alist_sync.thread_pool import MyThreadPoolExecutor
from alist_sync.version import __version__

sync_config = create_config()

WorkerType = Literal["delete", "copy"]
WorkerStatus = Literal[
WorkerType = ["delete", "copy"]
# noinspection PyTypeHints
WorkerTypeModify = Literal[*WorkerType]

WorkerStatus = [
"init",
"deleted",
"back-upping",
"back-upped",
"downloading",
"uploading",
"downloaded",
"uploaded",
"copied",
"done",
"failed",
]
# noinspection PyTypeHints
WorkerStatusModify = Literal[*WorkerTypeModify]

logger = logging.getLogger("alist-sync.worker")

Expand All @@ -46,13 +50,13 @@ class Worker(BaseModel):
owner: str = sync_config.name
created_at: datetime.datetime = datetime.datetime.now()
done_at: datetime.datetime | None = None
type: WorkerType
type: WorkerTypeModify
need_backup: bool
backup_dir: AbsAlistPathType | None = None

source_path: AbsAlistPathType | None = None
target_path: AbsAlistPathType # 永远只操作Target文件,删除也是作为Target
status: WorkerStatus = "init"
status: WorkerStatusModify = "init"
error_info: str | None = None

# 私有属性
Expand Down Expand Up @@ -94,6 +98,8 @@ def tmp_file(self) -> Path:
return sync_config.cache_dir.joinpath(f"download_tmp_{sha1(self.source_path)}")

def update(self, **field: Any):
if (status := field.get("status", "init")) not in WorkerStatus:
raise ValueError(f"Unknown Status: {status}, allow: {WorkerStatus}.")
if field:
if field.keys() | self.__dict__.keys() != self.__dict__.keys():
raise KeyError()
Expand All @@ -120,8 +126,6 @@ def backup(self):
_backup_target_json = self.backup_dir.joinpath(_target_name + ".json")
_old_info = _backup_file.stat().model_dump_json()

self.update(status="back-upping")

assert (
not _backup_target.exists() and not _backup_target_json.exists()
), "备份目标冲突"
Expand Down Expand Up @@ -152,7 +156,7 @@ def __retry(
f"Worker[{self.short_id}] Retry Error [{func.__name__}]: "
f"{type(_e)} - {_e}"
)
raise _e
raise RetryError(f"Retry {func.__name__} Error.") from _e
logger.warning(
f"Worker[{self.short_id}] {retry = } [{func.__name__}]: "
f"{type(_e)} - {_e}"
Expand All @@ -169,6 +173,9 @@ def downloader(self):
self.source_path.get_download_uri(),
follow_redirects=True,
) as _res:
logger.debug(
f"Worker[{self.short_id}] Downloading from {self.source_path}"
)
for i in _res.iter_bytes(chunk_size=1024 * 1024):
_tmp.write(i)
assert (
Expand Down Expand Up @@ -207,11 +214,21 @@ def uploader(self):
def copy_type(self):
"""复制任务"""
logger.debug(f"Worker[{self.short_id}] Start Copping")
if self.status not in ["downloaded", "uploaded"]:
self.__retry(
3,
(TimeoutException, AssertionError),
self.downloader,
)

self.target_path.unlink(missing_ok=True)
self.target_path.parent.mkdir(parents=True, exist_ok=True)
self.__retry(3, (TimeoutException, AssertionError), self.downloader)
self.__retry(3, (TimeoutException, AssertionError), self.uploader)
if self.status != "uploaded":
self.target_path.unlink(missing_ok=True)
self.target_path.parent.mkdir(parents=True, exist_ok=True)
self.__retry(
3,
(TimeoutException, AssertionError),
self.uploader,
)

return self.update(status="copied")

Expand Down Expand Up @@ -246,37 +263,58 @@ def recheck(self) -> bool:
else:
raise ValueError(f"Unknown Worker Type {self.type}.")

def run(self):
def run(self, is_retry=False):
"""启动Worker"""
logger.info(f"worker[{self.short_id}] 已经开始工作.")
self.update()
if is_retry is False:
logger.info(f"worker[{self.short_id}] 已经开始工作.")
self.update()

try:
if self.status in ["done", "failed"]:
self.update()
return
if self.need_backup and self.status in ["init"]:
self.backup()

if self.type == "copy" and self.status in ["init", "back-upped"]:
if self.type == "copy" and self.status in [
"init",
"back-upped",
"downloaded",
"uploaded",
]:
self.copy_type()

elif self.type == "delete" and self.status in ["init", "back-upped"]:
self.delete_type()

assert self.recheck()
self.update(status=f"done")
except Exception as _e:
logger.error(
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}",
exc_info=_e,
)
self.error_info = (
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n"
f"{traceback.format_exc()}"
return
except TimeoutException as _e:
if is_retry:
return self.__error_exec(_e)
time.sleep(2)
return self.__retry(
3,
(TimeoutException, AssertionError),
self.run,
True,
)
self.update(status="failed")
if sync_config.debug:
raise WorkerError from _e
except Exception as _e:
return self.__error_exec(_e)

def __error_exec(self, _e: Exception):
logger.error(
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}",
exc_info=_e,
)
self.error_info = (
f"Worker[{self.short_id}] 出现错误:: ({type(_e)}){_e}\n"
f"{traceback.format_exc()}"
)
self.update(status="failed")
if sync_config.debug:
raise WorkerError from _e


class Workers:
Expand Down
4 changes: 4 additions & 0 deletions alist_sync/err.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class WorkerError(AlistSyncError):
pass


class RetryError(WorkerError):
pass


class DownloaderError(WorkerError):
pass

Expand Down
4 changes: 2 additions & 2 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ all_clear() {
case $1 in
install)
pip install -U pip
pip install -e .
pip install git+https://github.com/lee-cq/alist-sdk --no-cache-dir --force-reinstall
pip install -e .
;;

alist)
Expand Down Expand Up @@ -50,7 +50,7 @@ test)

main )
shift
python -m alist_sync $@
python -m alist_sync "$@"
;;

debugger)
Expand Down

0 comments on commit d767018

Please sign in to comment.