Skip to content

Commit

Permalink
9 create check cmd (#13)
Browse files Browse the repository at this point in the history
* 缓存AlistPath.stat() 以减少网络网络访问,
添加Worker线程至20
* BUGFIX && 添加日志
* 多线程添加忽略
* BUGFIX
* 初步完成Check命名
* 支持check命令
  • Loading branch information
lee-cq authored Mar 10, 2024
1 parent c934c81 commit 47e42b9
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 99 deletions.
10 changes: 6 additions & 4 deletions .github/workflows/alist-sync.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: Alist Sync

run-name: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}"
on:
workflow_dispatch:
inputs:
Expand All @@ -17,7 +17,7 @@ on:
default: false

env:
_ALIST_SYNC_NAME: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}"
_ALIST_SYNC_NAME: "${{github.run-name}}"
_ALIST_SYNC_DEBUG: ${{ github.event.inputs.debug }}
_ALIST_ADMIN_PASSWORD: ${{ secrets.ALIST_ADMIN_PASSWORD }}

Expand Down Expand Up @@ -67,6 +67,7 @@ jobs:

_RELOAD_STORAGE: ${{ github.event.inputs.reload_storage }}
run: |
echo RUNNER = ${_ALIST_SYNC_NAME}
# 这将会导入全部的内容包括:设置,元数据,用户,存储器。
echo $(pwd)
cat > alist-backup-config.json << EOF
Expand All @@ -76,6 +77,8 @@ jobs:
- name: Create Tunnel for Cloudflare
run: |
echo RUNNER = ${_ALIST_SYNC_NAME}
test ! -n "${{secrets.CLOUDFLARE_TUNNEL_TOKEN}}" && {
echo "CLOUDFLARE_TUNNEL_TOKEN is not set. Skip Cloudflare Tunnel Installation."
exit 0
Expand All @@ -89,6 +92,7 @@ jobs:
- name: RUN Alist Sync
run: |
echo RUNNER = ${_ALIST_SYNC_NAME}
cat > config.yaml << EOF
${{ secrets.SYNC_CONFIG }}
EOF
Expand All @@ -102,5 +106,3 @@ jobs:
- name: Debugger
if: ${{ github.event.inputs.debug == 'true' && failure() }}
uses: csexton/debugger-action@master


1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ config.yaml
alist_sync/.alist-sync-cache/
alist/
tools/alist/
logs/

# C extensions
*.so
Expand Down
15 changes: 15 additions & 0 deletions alist_sync/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ def sync(
return main()


@app.command("check")
def check(
config_file: str = Option(None, "--config", "-c", help="配置文件路径"),
):
"""检查任务"""
from alist_sync.config import create_config
from alist_sync.d_main import main_check

if config_file and Path(config_file).exists():
os.environ["ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute())
os.environ["_ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute())
create_config()
return main_check()


@app.command("get-info")
def cli_get(path: str):
""""""
Expand Down
22 changes: 11 additions & 11 deletions alist_sync/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ def timeout_input(msg, default, timeout=3):
return default


def beautify_size(speed: float):
if speed < 1024:
return f"{speed:.2f}B"
speed /= 1024
if speed < 1024:
return f"{speed:.2f}KB"
speed /= 1024
if speed < 1024:
return f"{speed:.2f}MB"
speed /= 1024
return f"{speed:.2f}GB"
def beautify_size(byte_size: float):
if byte_size < 1024:
return f"{byte_size:.2f}B"
byte_size /= 1024
if byte_size < 1024:
return f"{byte_size:.2f}KB"
byte_size /= 1024
if byte_size < 1024:
return f"{byte_size:.2f}MB"
byte_size /= 1024
return f"{byte_size:.2f}GB"


def transfer_speed(size, start: datetime.datetime, end: datetime.datetime) -> str:
Expand Down
131 changes: 94 additions & 37 deletions alist_sync/d_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@Date-Time : 2024/2/25 21:17
"""
import datetime
import fnmatch
import logging
import threading
Expand All @@ -14,7 +15,8 @@
from typing import Iterator
from functools import lru_cache

from alist_sdk import AlistPath
from alist_sdk import AlistPath, RawItem, AlistPathType, Item
from pydantic import BaseModel

from alist_sync.config import create_config, SyncGroup
from alist_sync.d_worker import Worker
Expand All @@ -26,6 +28,21 @@
sync_config = create_config()


class SyncRawItem(BaseModel):
path: AlistPathType | None
scan_time: datetime.datetime = datetime.datetime.now()
stat: RawItem | Item | None

def exists(self):
return self.stat is not None

def is_file(self):
return not self.stat.is_dir

def is_dir(self):
return self.stat.is_dir


class Checker:
def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Queue):
self.sync_group: SyncGroup = sync_group
Expand All @@ -34,6 +51,7 @@ def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Que

self.conflict: set = set()
self.pool = MyThreadPoolExecutor(10)
self.stat_sq = threading.Semaphore(4)
self.main_thread = threading.Thread(
target=self.main,
name=f"checker_main[{self.sync_group.name}-{self.__class__.__name__}]",
Expand All @@ -52,7 +70,38 @@ def split_path(self, path: AlistPath) -> tuple[AlistPath, str]:
def get_backup_dir(self, path) -> AlistPath:
return self.split_path(path)[0].joinpath(self.sync_group.backup_dir)

def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None":
def create_worker(self, type_: str, source_path: AlistPath, target_path: AlistPath):
return Worker(
type=type_,
group_name=self.sync_group.name,
need_backup=self.sync_group.need_backup,
backup_dir=self.get_backup_dir(source_path),
relative_path=self.split_path(source_path)[1],
source_path=source_path,
target_path=target_path,
)

_stat_get_times = 0

@lru_cache(40_000)
def get_stat(self, path: AlistPath) -> SyncRawItem:
# BUGFIX 控制QPS而不时并发
with self.stat_sq:
self._stat_get_times += 1
logger.debug("get_stat: %s, times: %d", path, self._stat_get_times)
try:
stat = path.client.dict_files_items(
path.parent.as_posix(), cache_empty=True
).get(path.name)
except FileNotFoundError:
stat = None
return SyncRawItem(path=path, stat=stat)

def checker(
self,
source_stat: SyncRawItem,
target_stat: SyncRawItem,
) -> "Worker|None":
raise NotImplementedError

def ignore(self, relative_path) -> bool:
Expand All @@ -64,19 +113,21 @@ def ignore(self, relative_path) -> bool:

def checker_every_dir(self, path) -> Iterator[Worker | None]:
_sync_dir, _relative_path = self.split_path(path)
# if self.ignore(_relative_path):
# return
logger.debug(f"Checking [{_relative_path}] in {self.sync_group.group}")
for _sd in self.sync_group.group:
_sd: AlistPath
if _sd == _sync_dir:
continue
target_path = _sd.joinpath(_relative_path)
yield self.checker(path, target_path)
yield self.checker(self.get_stat(path), self.get_stat(target_path))

def _t_checker(self, path):
for _c in self.checker_every_dir(path):
if _c:
self.worker_queue.put(_c)
try:
for _c in self.checker_every_dir(path):
if _c:
self.worker_queue.put(_c)
except Exception as _e:
logger.error("Checker Error: ", exc_info=_e)

def main(self):
""""""
Expand All @@ -94,8 +145,14 @@ def main(self):

try:
_started = True
self._t_checker(self.scaner_queue.get(timeout=3))
path = self.scaner_queue.get(timeout=3)
self.pool.submit(self._t_checker, path)
except Empty:
logger.debug(
"Checker Size: %s, %d",
self.sync_group.name,
self.pool.work_qsize(),
)
if _started:
continue
logger.info(
Expand All @@ -113,44 +170,44 @@ class CheckerCopy(Checker):
""""""

def checker(
self,
source_path: AlistPath,
target_path: AlistPath,
self, source_stat: SyncRawItem, target_stat: SyncRawItem
) -> "Worker|None":
if not target_path.exists():
if not target_stat.exists():
logger.info(
f"Checked: [COPY] {source_path.as_uri()} -> {target_path.as_uri()}"
f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}"
)
return Worker(
type="copy",
need_backup=False,
source_path=source_path,
target_path=target_path,
return self.create_worker(
type_="copy",
source_path=source_stat.path,
target_path=target_stat.path,
)
logger.info(f"Checked: [JUMP] {source_path.as_uri()}")

logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}")
return None


class CheckerMirror(Checker):
""""""

def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None":
_main = self.sync_group.group[0]
# target如果是主存储器 - 且target不存在,source存在,删除source
if target_path == _main and not target_path.exists() and source_path.exists():
return Worker(
type="delete",
need_backup=self.sync_group.need_backup,
backup_dir=self.get_backup_dir(source_path),
target_path=source_path,
)
if not target_path.exists():
return Worker(
type="copy",
need_backup=False,
source_path=source_path,
target_path=target_path,
)
def checker(
self, source_stat: SyncRawItem, target_stat: SyncRawItem
) -> "Worker|None":
# _main = self.sync_group.group[0]
# # target如果是主存储器 - 且target不存在,source存在,删除source
# if target_stat == _main and not target_stat.exists() and source_stat.exists():
# return Worker(
# type="delete",
# need_backup=self.sync_group.need_backup,
# backup_dir=self.get_backup_dir(source_stat.path),
# target_path=source_stat.path,
# )
# if not target_stat.exists():
# return Worker(
# type="copy",
# need_backup=False,
# source_path=source_stat.path,
# target_path=target_stat.path,
# )
return None


Expand Down
Loading

0 comments on commit 47e42b9

Please sign in to comment.