diff --git a/marque/__init__.py b/marque/__init__.py index 772f42f..fb460d5 100644 --- a/marque/__init__.py +++ b/marque/__init__.py @@ -1,9 +1,9 @@ from marque.flow import Flow -from marque.helpers import repeat +from marque.helpers import repeat, retry from marque.scope import Scope from marque.storage import JsonStorage, MemoryStorage, PolarsStorage -__all__ = ["Flow", "Scope", "repeat", "MemoryStorage", "PolarsStorage", "JsonStorage"] +__all__ = ["Flow", "Scope", "repeat", "retry", "MemoryStorage", "PolarsStorage", "JsonStorage"] from loguru import logger diff --git a/marque/flow.py b/marque/flow.py index 37f93a8..d0a9572 100644 --- a/marque/flow.py +++ b/marque/flow.py @@ -199,6 +199,7 @@ def recall(self, name: str) -> dict[str, t.Any]: def log(self, msg: str, level: LogLevelLiteral = "info") -> "Flow": if self.current is None: raise RuntimeError("Cannot log() outside of a running step.") + logger.log(level.upper(), msg) self.current.logs.append((level, msg)) return self @@ -254,10 +255,13 @@ def fail_fast(self) -> "Flow": self.ignore_errors = False return self - def __call__(self) -> None: + def __call__(self) -> Scope | None: if self.state != "pending": raise RuntimeError(f"Flow is already in '{self.state}' state.") + if len(self.steps) == 0: + raise RuntimeError("No steps were added to the flow.") + configure_logging(self.log_level) logger.success("") @@ -288,9 +292,6 @@ def _log(message: str) -> None: else: raise - for lvl, log in self.current.logs: - logger.log(lvl.upper(), f" |: {log}") - self.current.scope.duration = datetime.now() - start logger.info(f" |- in {format_timedelta(self.current.scope.duration)}") logger.info("") @@ -299,9 +300,14 @@ def _log(message: str) -> None: self.step_idx += 1 self.group_idx += 1 + if self.current is None: + raise RuntimeError("No steps were executed in the flow.") + self.storage.flush() self.state = "finished" logger.success("") logger.success(f"Finished flow '{self.name}' / '{self.run}'") logger.success("") + + return self.current.scope diff --git a/marque/helpers.py b/marque/helpers.py index dbdadd7..cc2db1a 100644 --- a/marque/helpers.py +++ b/marque/helpers.py @@ -1,6 +1,34 @@ -from marque.flow import StepFunc +import functools +import traceback + +from marque.flow import Flow, StepFunc def repeat(step: StepFunc | list[StepFunc], times: int) -> list[StepFunc]: steps = step if isinstance(step, list) else [step] return steps * times + + +# TODO: We should probably integrate this better with the flow +# management and handle storing errored scopes for inspection +# if we need them. + + +def retry(step: StepFunc, max_times: int) -> StepFunc: + @functools.wraps(step) + def _retry(flow: Flow) -> None: + max_times_str = "inf" if max_times == -1 else str(max_times) + i = 0 + while max_times == -1 or i < max_times: + try: + flow.log(f" |: Attempting {step.__name__} [{i+1}/{max_times_str}]") + return step(flow) + except Exception: + if max_times != -1 and i == max_times - 1: + flow.error(f" |: Exceeded max attempts ({max_times}) for {step.__name__}") + raise + flow.error(f" |: {traceback.format_exc()}") + pass + i += 1 + + return _retry diff --git a/pyproject.toml b/pyproject.toml index 7dd5594..43da0fa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "marque" -version = "0.1.0" +version = "0.1.1" description = "Minimal workflows" authors = ["Nick Landers "] readme = "README.md"