Skip to content

Commit

Permalink
Merge pull request #81 from dandi/traverse-api
Browse files Browse the repository at this point in the history
Traverse assets via Archive API
  • Loading branch information
yarikoptic authored Jul 15, 2024
2 parents eef70d9 + af6090f commit 4e1669f
Show file tree
Hide file tree
Showing 15 changed files with 450 additions and 223 deletions.
1 change: 1 addition & 0 deletions code/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ install_requires =
async_generator ~= 1.10; python_version < '3.10'
click >= 8.0
ghreq ~= 0.1
httpx ~= 0.22
hdmf
packaging
pydantic ~= 2.0
Expand Down
30 changes: 13 additions & 17 deletions code/src/healthstatus/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import anyio
import click
from packaging.version import Version
from .checker import AssetReport, Dandiset, HealthStatus
from .checker import DandisetReporter, HealthStatus
from .core import AssetPath, AssetTestResult, DandisetStatus, Outcome, TestSummary, log
from .mounts import (
AssetInDandiset,
Expand Down Expand Up @@ -103,7 +103,7 @@ def check(
for t in TESTS:
pkg_versions.update(t.prepare())
hs = HealthStatus(
backup_root=mount_point,
mount_point=mount_point,
reports_root=Path.cwd(),
dandisets=dandisets,
dandiset_jobs=dandiset_jobs,
Expand Down Expand Up @@ -134,7 +134,7 @@ def report() -> None:
assets_seen = 0
for p in Path("results").iterdir():
if re.fullmatch(r"\d{6,}", p.name) and p.is_dir():
status = DandisetStatus.from_file(p.name, p / "status.yaml")
status = DandisetStatus.from_file(p / "status.yaml")
passed, failed, timedout = status.combined_counts()
asset_qtys[Outcome.PASS] += passed
asset_qtys[Outcome.FAIL] += failed
Expand Down Expand Up @@ -211,25 +211,22 @@ def test_files(testname: str, files: tuple[Path, ...], save_results: bool) -> No
pkg_versions.update(t.prepare(minimal=t.NAME != testname))
testfunc = TESTS.get(testname)
ok = True
dandiset_cache: dict[Path, tuple[Dandiset, set[AssetPath]]] = {}
dandiset_cache: dict[Path, DandisetReporter] = {}
for f in files:
if save_results and (path := find_dandiset(Path(f))) is not None:
try:
dandiset, asset_paths = dandiset_cache[path]
reporter = dandiset_cache[path]
except KeyError:
dandiset = Dandiset(
reporter = DandisetReporter(
identifier=path.name,
path=path,
reports_root=Path.cwd(),
draft_modified=None,
reportdir=Path("results", path.name),
versions=pkg_versions,
)
asset_paths = anyio.run(dandiset.get_asset_paths)
dandiset_cache[path] = (dandiset, asset_paths)
report = AssetReport(dandiset=dandiset)
dandiset_cache[path] = reporter
ap = AssetPath(Path(f).relative_to(path).as_posix())
else:
report = None
asset_paths = None
reporter = None
ap = None
log.info("Testing %s ...", f)
r = anyio.run(testfunc.run, f)
Expand All @@ -239,16 +236,15 @@ def test_files(testname: str, files: tuple[Path, ...], save_results: bool) -> No
if r.outcome is not Outcome.PASS:
ok = False
if save_results:
assert report is not None
assert asset_paths is not None
assert reporter is not None
assert ap is not None
atr = AssetTestResult(
testname=testname,
asset_path=AssetPath(ap),
result=r,
)
report.register_test_result(atr)
report.dump(asset_paths)
reporter.register_test_result(atr)
reporter.dump()
sys.exit(0 if ok else 1)


Expand Down
119 changes: 119 additions & 0 deletions code/src/healthstatus/adandi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from __future__ import annotations
from collections.abc import AsyncGenerator
from dataclasses import InitVar, dataclass, field
from datetime import datetime
import platform
import sys
from typing import Any
from anyio.abc import AsyncResource
import httpx
from pydantic import BaseModel
from .aioutil import arequest

if sys.version_info[:2] >= (3, 10):
from contextlib import aclosing
else:
from async_generator import aclosing

USER_AGENT = "dandisets-healthstatus ({}) httpx/{} {}/{}".format(
"https://github.com/dandi/dandisets-healthstatus",
httpx.__version__,
platform.python_implementation(),
platform.python_version(),
)


@dataclass
class AsyncDandiClient(AsyncResource):
api_url: str
token: InitVar[str | None] = None
session: httpx.AsyncClient = field(init=False)

def __post_init__(self, token: str | None) -> None:
headers = {"User-Agent": USER_AGENT}
if token is not None:
headers["Authorization"] = f"token {token}"
self.session = httpx.AsyncClient(
base_url=self.api_url,
headers=headers,
follow_redirects=True,
)

async def aclose(self) -> None:
await self.session.aclose()

def get_url(self, path: str) -> str:
if path.lower().startswith(("http://", "https://")):
return path
else:
return self.api_url.rstrip("/") + "/" + path.lstrip("/")

async def get(self, path: str, **kwargs: Any) -> Any:
return (await arequest(self.session, "GET", path, **kwargs)).json()

async def paginate(
self,
path: str,
page_size: int | None = None,
params: dict | None = None,
**kwargs: Any,
) -> AsyncGenerator:
"""
Paginate through the resources at the given path: GET the path, yield
the values in the ``"results"`` key, and repeat with the URL in the
``"next"`` key until it is ``null``.
"""
if page_size is not None:
if params is None:
params = {}
params["page_size"] = page_size
r = await self.get(path, params=params, **kwargs)
while True:
for item in r["results"]:
yield item
if r.get("next"):
r = await self.get(r["next"], **kwargs)
else:
break

async def get_dandiset(self, dandiset_id: str) -> DandisetInfo:
return DandisetInfo.from_raw_response(
await self.get(f"/dandisets/{dandiset_id}/")
)

async def get_dandisets(self) -> AsyncGenerator[DandisetInfo, None]:
async with aclosing(self.paginate("/dandisets/")) as ait:
async for data in ait:
yield DandisetInfo.from_raw_response(data)

async def get_asset_paths(self, dandiset_id: str) -> AsyncGenerator[str, None]:
async with aclosing(
self.paginate(
f"/dandisets/{dandiset_id}/versions/draft/assets/",
params={"order": "created", "page_size": "1000"},
)
) as ait:
async for item in ait:
yield item["path"]


@dataclass
class DandisetInfo:
identifier: str
draft_modified: datetime

@classmethod
def from_raw_response(cls, data: dict[str, Any]) -> DandisetInfo:
resp = DandisetResponse.model_validate(data)
return cls(
identifier=resp.identifier, draft_modified=resp.draft_version.modified
)


class VersionInfo(BaseModel):
modified: datetime


class DandisetResponse(BaseModel):
identifier: str
draft_version: VersionInfo
71 changes: 69 additions & 2 deletions code/src/healthstatus/aioutil.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations
from collections.abc import AsyncGenerator, Callable
from collections.abc import AsyncGenerator, Callable, Container, Iterator
import math
import random
import ssl
import sys
from typing import Awaitable, TypeVar
from typing import Any, Awaitable, TypeVar
import anyio
from anyio.streams.memory import MemoryObjectReceiveStream
import httpx
from .core import log

if sys.version_info[:2] >= (3, 10):
from contextlib import aclosing
Expand Down Expand Up @@ -32,3 +36,66 @@ async def dowork(rec: MemoryObjectReceiveStream[T]) -> None:
async with sender, aclosing(inputs):
async for item in inputs:
await sender.send(item)


async def arequest(
client: httpx.AsyncClient,
method: str,
url: str,
retry_on: Container[int] = (),
**kwargs: Any,
) -> httpx.Response:
waits = exp_wait(attempts=15, base=2)
kwargs.setdefault("timeout", 60)
while True:
try:
r = await client.request(method, url, follow_redirects=True, **kwargs)
r.raise_for_status()
except (httpx.HTTPError, ssl.SSLError) as e:
if isinstance(e, (httpx.RequestError, ssl.SSLError)) or (
isinstance(e, httpx.HTTPStatusError)
and (
e.response.status_code >= 500 or e.response.status_code in retry_on
)
):
try:
delay = next(waits)
except StopIteration:
raise e
log.warning(
"Retrying %s request to %s in %f seconds as it raised %s: %s",
method.upper(),
url,
delay,
type(e).__name__,
str(e),
)
await anyio.sleep(delay)
continue
else:
raise
return r


def exp_wait(
base: float = 1.25,
multiplier: float = 1,
attempts: int | None = None,
jitter: float = 0.1,
) -> Iterator[float]:
"""
Returns a generator of values usable as `sleep()` times when retrying
something with exponential backoff.
:param float base:
:param float multiplier: value to multiply values by after exponentiation
:param Optional[int] attempts: how many values to yield; set to `None` to
yield forever
:param Optional[float] jitter: add +1 of that jitter ratio for the time
randomly so that wait track is unique.
:rtype: Iterator[float]
"""
n = 0
while attempts is None or n < attempts:
yield (base**n * multiplier) * (1 + (random.random() - 0.5) * jitter)
n += 1
Loading

0 comments on commit 4e1669f

Please sign in to comment.