Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

9 create check cmd #13

Merged
merged 8 commits into from
Mar 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions .github/workflows/alist-sync.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: Alist Sync

run-name: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}"
on:
workflow_dispatch:
inputs:
Expand All @@ -17,7 +17,7 @@ on:
default: false

env:
_ALIST_SYNC_NAME: "action-${{github.actor}}-${{github.run_id}}-${{github.run_number}}"
_ALIST_SYNC_NAME: "${{github.run-name}}"
_ALIST_SYNC_DEBUG: ${{ github.event.inputs.debug }}
_ALIST_ADMIN_PASSWORD: ${{ secrets.ALIST_ADMIN_PASSWORD }}

Expand Down Expand Up @@ -67,6 +67,7 @@ jobs:

_RELOAD_STORAGE: ${{ github.event.inputs.reload_storage }}
run: |
echo RUNNER = ${_ALIST_SYNC_NAME}
# 这将会导入全部的内容包括:设置,元数据,用户,存储器。
echo $(pwd)
cat > alist-backup-config.json << EOF
Expand All @@ -76,6 +77,8 @@ jobs:

- name: Create Tunnel for Cloudflare
run: |
echo RUNNER = ${_ALIST_SYNC_NAME}

test ! -n "${{secrets.CLOUDFLARE_TUNNEL_TOKEN}}" && {
echo "CLOUDFLARE_TUNNEL_TOKEN is not set. Skip Cloudflare Tunnel Installation."
exit 0
Expand All @@ -89,6 +92,7 @@ jobs:

- name: RUN Alist Sync
run: |
echo RUNNER = ${_ALIST_SYNC_NAME}
cat > config.yaml << EOF
${{ secrets.SYNC_CONFIG }}
EOF
Expand All @@ -102,5 +106,3 @@ jobs:
- name: Debugger
if: ${{ github.event.inputs.debug == 'true' && failure() }}
uses: csexton/debugger-action@master


1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ config.yaml
alist_sync/.alist-sync-cache/
alist/
tools/alist/
logs/

# C extensions
*.so
Expand Down
15 changes: 15 additions & 0 deletions alist_sync/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,21 @@ def sync(
return main()


@app.command("check")
def check(
config_file: str = Option(None, "--config", "-c", help="配置文件路径"),
):
"""检查任务"""
from alist_sync.config import create_config
from alist_sync.d_main import main_check

if config_file and Path(config_file).exists():
os.environ["ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute())
os.environ["_ALIST_SYNC_CONFIG"] = str(Path(config_file).resolve().absolute())
create_config()
return main_check()


@app.command("get-info")
def cli_get(path: str):
""""""
Expand Down
22 changes: 11 additions & 11 deletions alist_sync/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,17 @@ def timeout_input(msg, default, timeout=3):
return default


def beautify_size(speed: float):
if speed < 1024:
return f"{speed:.2f}B"
speed /= 1024
if speed < 1024:
return f"{speed:.2f}KB"
speed /= 1024
if speed < 1024:
return f"{speed:.2f}MB"
speed /= 1024
return f"{speed:.2f}GB"
def beautify_size(byte_size: float):
if byte_size < 1024:
return f"{byte_size:.2f}B"
byte_size /= 1024
if byte_size < 1024:
return f"{byte_size:.2f}KB"
byte_size /= 1024
if byte_size < 1024:
return f"{byte_size:.2f}MB"
byte_size /= 1024
return f"{byte_size:.2f}GB"


def transfer_speed(size, start: datetime.datetime, end: datetime.datetime) -> str:
Expand Down
131 changes: 94 additions & 37 deletions alist_sync/d_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@Date-Time : 2024/2/25 21:17

"""
import datetime
import fnmatch
import logging
import threading
Expand All @@ -14,7 +15,8 @@
from typing import Iterator
from functools import lru_cache

from alist_sdk import AlistPath
from alist_sdk import AlistPath, RawItem, AlistPathType, Item
from pydantic import BaseModel

from alist_sync.config import create_config, SyncGroup
from alist_sync.d_worker import Worker
Expand All @@ -26,6 +28,21 @@
sync_config = create_config()


class SyncRawItem(BaseModel):
path: AlistPathType | None
scan_time: datetime.datetime = datetime.datetime.now()
stat: RawItem | Item | None

def exists(self):
return self.stat is not None

def is_file(self):
return not self.stat.is_dir

def is_dir(self):
return self.stat.is_dir


class Checker:
def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Queue):
self.sync_group: SyncGroup = sync_group
Expand All @@ -34,6 +51,7 @@ def __init__(self, sync_group: SyncGroup, scaner_queue: Queue, worker_queue: Que

self.conflict: set = set()
self.pool = MyThreadPoolExecutor(10)
self.stat_sq = threading.Semaphore(4)
self.main_thread = threading.Thread(
target=self.main,
name=f"checker_main[{self.sync_group.name}-{self.__class__.__name__}]",
Expand All @@ -52,7 +70,38 @@ def split_path(self, path: AlistPath) -> tuple[AlistPath, str]:
def get_backup_dir(self, path) -> AlistPath:
return self.split_path(path)[0].joinpath(self.sync_group.backup_dir)

def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None":
def create_worker(self, type_: str, source_path: AlistPath, target_path: AlistPath):
return Worker(
type=type_,
group_name=self.sync_group.name,
need_backup=self.sync_group.need_backup,
backup_dir=self.get_backup_dir(source_path),
relative_path=self.split_path(source_path)[1],
source_path=source_path,
target_path=target_path,
)

_stat_get_times = 0

@lru_cache(40_000)
def get_stat(self, path: AlistPath) -> SyncRawItem:
# BUGFIX 控制QPS而不时并发
with self.stat_sq:
self._stat_get_times += 1
logger.debug("get_stat: %s, times: %d", path, self._stat_get_times)
try:
stat = path.client.dict_files_items(
path.parent.as_posix(), cache_empty=True
).get(path.name)
except FileNotFoundError:
stat = None
return SyncRawItem(path=path, stat=stat)

def checker(
self,
source_stat: SyncRawItem,
target_stat: SyncRawItem,
) -> "Worker|None":
raise NotImplementedError

def ignore(self, relative_path) -> bool:
Expand All @@ -64,19 +113,21 @@ def ignore(self, relative_path) -> bool:

def checker_every_dir(self, path) -> Iterator[Worker | None]:
_sync_dir, _relative_path = self.split_path(path)
# if self.ignore(_relative_path):
# return
logger.debug(f"Checking [{_relative_path}] in {self.sync_group.group}")
for _sd in self.sync_group.group:
_sd: AlistPath
if _sd == _sync_dir:
continue
target_path = _sd.joinpath(_relative_path)
yield self.checker(path, target_path)
yield self.checker(self.get_stat(path), self.get_stat(target_path))

def _t_checker(self, path):
for _c in self.checker_every_dir(path):
if _c:
self.worker_queue.put(_c)
try:
for _c in self.checker_every_dir(path):
if _c:
self.worker_queue.put(_c)
except Exception as _e:
logger.error("Checker Error: ", exc_info=_e)

def main(self):
""""""
Expand All @@ -94,8 +145,14 @@ def main(self):

try:
_started = True
self._t_checker(self.scaner_queue.get(timeout=3))
path = self.scaner_queue.get(timeout=3)
self.pool.submit(self._t_checker, path)
except Empty:
logger.debug(
"Checker Size: %s, %d",
self.sync_group.name,
self.pool.work_qsize(),
)
if _started:
continue
logger.info(
Expand All @@ -113,44 +170,44 @@ class CheckerCopy(Checker):
""""""

def checker(
self,
source_path: AlistPath,
target_path: AlistPath,
self, source_stat: SyncRawItem, target_stat: SyncRawItem
) -> "Worker|None":
if not target_path.exists():
if not target_stat.exists():
logger.info(
f"Checked: [COPY] {source_path.as_uri()} -> {target_path.as_uri()}"
f"Checked: [COPY] {source_stat.path.as_uri()} -> {target_stat.path.as_uri()}"
)
return Worker(
type="copy",
need_backup=False,
source_path=source_path,
target_path=target_path,
return self.create_worker(
type_="copy",
source_path=source_stat.path,
target_path=target_stat.path,
)
logger.info(f"Checked: [JUMP] {source_path.as_uri()}")

logger.info(f"Checked: [JUMP] {source_stat.path.as_uri()}")
return None


class CheckerMirror(Checker):
""""""

def checker(self, source_path: AlistPath, target_path: AlistPath) -> "Worker|None":
_main = self.sync_group.group[0]
# target如果是主存储器 - 且target不存在,source存在,删除source
if target_path == _main and not target_path.exists() and source_path.exists():
return Worker(
type="delete",
need_backup=self.sync_group.need_backup,
backup_dir=self.get_backup_dir(source_path),
target_path=source_path,
)
if not target_path.exists():
return Worker(
type="copy",
need_backup=False,
source_path=source_path,
target_path=target_path,
)
def checker(
self, source_stat: SyncRawItem, target_stat: SyncRawItem
) -> "Worker|None":
# _main = self.sync_group.group[0]
# # target如果是主存储器 - 且target不存在,source存在,删除source
# if target_stat == _main and not target_stat.exists() and source_stat.exists():
# return Worker(
# type="delete",
# need_backup=self.sync_group.need_backup,
# backup_dir=self.get_backup_dir(source_stat.path),
# target_path=source_stat.path,
# )
# if not target_stat.exists():
# return Worker(
# type="copy",
# need_backup=False,
# source_path=source_stat.path,
# target_path=target_stat.path,
# )
return None


Expand Down
Loading
Loading