Skip to content

Commit

Permalink
v0.1.1
Browse files Browse the repository at this point in the history
- Added retry helper
- Flows return the last step scope now
  • Loading branch information
monoxgas committed Mar 19, 2024
1 parent 61cbb16 commit f0fd561
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 8 deletions.
4 changes: 2 additions & 2 deletions marque/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from marque.flow import Flow
from marque.helpers import repeat
from marque.helpers import repeat, retry
from marque.scope import Scope
from marque.storage import JsonStorage, MemoryStorage, PolarsStorage

__all__ = ["Flow", "Scope", "repeat", "MemoryStorage", "PolarsStorage", "JsonStorage"]
__all__ = ["Flow", "Scope", "repeat", "retry", "MemoryStorage", "PolarsStorage", "JsonStorage"]

from loguru import logger

Expand Down
14 changes: 10 additions & 4 deletions marque/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def recall(self, name: str) -> dict[str, t.Any]:
def log(self, msg: str, level: LogLevelLiteral = "info") -> "Flow":
if self.current is None:
raise RuntimeError("Cannot log() outside of a running step.")
logger.log(level.upper(), msg)
self.current.logs.append((level, msg))
return self

Expand Down Expand Up @@ -254,10 +255,13 @@ def fail_fast(self) -> "Flow":
self.ignore_errors = False
return self

def __call__(self) -> None:
def __call__(self) -> Scope | None:
if self.state != "pending":
raise RuntimeError(f"Flow is already in '{self.state}' state.")

if len(self.steps) == 0:
raise RuntimeError("No steps were added to the flow.")

configure_logging(self.log_level)

logger.success("")
Expand Down Expand Up @@ -288,9 +292,6 @@ def _log(message: str) -> None:
else:
raise

for lvl, log in self.current.logs:
logger.log(lvl.upper(), f" |: {log}")

self.current.scope.duration = datetime.now() - start
logger.info(f" |- in {format_timedelta(self.current.scope.duration)}")
logger.info("")
Expand All @@ -299,9 +300,14 @@ def _log(message: str) -> None:
self.step_idx += 1
self.group_idx += 1

if self.current is None:
raise RuntimeError("No steps were executed in the flow.")

self.storage.flush()
self.state = "finished"

logger.success("")
logger.success(f"Finished flow '{self.name}' / '{self.run}'")
logger.success("")

return self.current.scope
30 changes: 29 additions & 1 deletion marque/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
from marque.flow import StepFunc
import functools
import traceback

from marque.flow import Flow, StepFunc


def repeat(step: StepFunc | list[StepFunc], times: int) -> list[StepFunc]:
steps = step if isinstance(step, list) else [step]
return steps * times


# TODO: We should probably integrate this better with the flow
# management and handle storing errored scopes for inspection
# if we need them.


def retry(step: StepFunc, max_times: int) -> StepFunc:
@functools.wraps(step)
def _retry(flow: Flow) -> None:
max_times_str = "inf" if max_times == -1 else str(max_times)
i = 0
while max_times == -1 or i < max_times:
try:
flow.log(f" |: Attempting {step.__name__} [{i+1}/{max_times_str}]")
return step(flow)
except Exception:
if max_times != -1 and i == max_times - 1:
flow.error(f" |: Exceeded max attempts ({max_times}) for {step.__name__}")
raise
flow.error(f" |: {traceback.format_exc()}")
pass
i += 1

return _retry
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "marque"
version = "0.1.0"
version = "0.1.1"
description = "Minimal workflows"
authors = ["Nick Landers <monoxgas@gmail.com>"]
readme = "README.md"
Expand Down

0 comments on commit f0fd561

Please sign in to comment.