Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

User initiated pausing #16

Merged
merged 14 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Hooks are fired in a dedicated thread for lifecycle events.

- `on_broadcast`: When a transaction is broadcasted.
- `on_finalized`: When a transaction is finalized.
- `on_pause`: When a transaction is halted by the strategy.
- `on_fault`: When a transaction reverted or another error occurred.


Expand Down
125 changes: 73 additions & 52 deletions atxm/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
TransactionFault,
Fault,
)
from atxm.tacker import _TxTracker
from atxm.tracker import _TxTracker
from atxm.strategies import (
AsyncTxStrategy,
InsufficientFundsPause,
Expand Down Expand Up @@ -48,33 +48,39 @@ class _Machine(StateMachine):
#
# State Machine:
#
# Idle <---> Busy <---> Paused
# | ^ | ^ | ^
# V | V | V |
# _ _ _
# Pause
# ^ ^
# / \
# V v
# Idle <---> Busy
# | ^ | ^
# V | V |
# _ _
#
_BUSY = State("Busy")
_IDLE = State("Idle", initial=True)
_PAUSED = State("Paused")

# - State Transitions -
_transition_to_idle = _BUSY.to(_IDLE, unless="_busy") # Busy -> Idle
_transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") # Busy -> Pause
_transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") | _IDLE.to(
_PAUSED, cond="_pause"
) # Busy/Idle -> Pause
_transition_to_idle = _BUSY.to(_IDLE, unless=["_busy", "_pause"]) | _PAUSED.to(
_IDLE, unless=["_busy", "_pause"]
) # Busy/Paused -> Idle
_transition_to_busy = _IDLE.to(_BUSY, cond="_busy") | _PAUSED.to(
_BUSY, unless="_pause"
) # Idle/Pause -> Busy
# self transitions i.e. remain in same state
_remain_busy = _BUSY.to.itself(cond="_busy", unless="_pause")
_remain_idle = _IDLE.to.itself(unless="_busy")
_remain_paused = _PAUSED.to.itself(cond="_pause")
_remain_idle = _IDLE.to.itself(unless=["_busy", "_pause"])

_cycle_state = (
_transition_to_idle
| _transition_to_paused
| _transition_to_busy
| _remain_busy
| _remain_idle
| _remain_paused
)

# internal
Expand Down Expand Up @@ -157,30 +163,17 @@ def _handle_errors(self, *args, **kwargs):
@_transition_to_paused.before
def _enter_pause_mode(self):
self.log.info("[pause] pause mode activated")
return

@_PAUSED.enter
def _process_paused(self):
# TODO what action should be taken to check if we leave the pause state?
# Perhaps a call to self.__strategize()
return

@_IDLE.enter
def _process_idle(self):
"""Return to idle mode if not already there (slow down)"""
if self._task.interval != self._IDLE_INTERVAL:
# TODO does changing the interval value actually update the LoopingCall?
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[done] returning to idle mode with "
f"{self._task.interval} second interval"
)
self._sleep()

# TODO
# 1. don't always sleep (idle for some number of cycles?)
# 2. careful sleeping - potential concurrency concerns
# 3. there is currently no difference between sleep/idle ...
self._sleep()
@_transition_to_idle.before
def _enter_idle_mode(self):
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[idle] returning to idle mode with {self._task.interval} second interval"
)

@_transition_to_busy.before
def _enter_busy_mode(self):
Expand All @@ -191,7 +184,7 @@ def _enter_busy_mode(self):
self._task.interval = max(
round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL
)
self.log.info(f"[working] cycle interval is {self._task.interval} seconds")
self.log.info(f"[working] cycle interval is now {self._task.interval} seconds")

@_BUSY.enter
def _process_busy(self):
Expand Down Expand Up @@ -234,9 +227,14 @@ def _stop(self):
self._task.stop()

def _wake(self) -> None:
if not self._task.running:
log.info("[wake] waking up")
self._start(now=True)
"""Runs the looping call immediately."""
log.info("[wake] running looping call now.")
if self._task.running:
# TODO instead of stopping/starting, can you set interval to 0
# and call reset to have looping call immediately?
self._stop()

self._start(now=True)

def _sleep(self) -> None:
if self._task.running:
Expand Down Expand Up @@ -328,19 +326,6 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx

def pause(self) -> None:
self._pause = True
self.log.warn(
f"[pause] pending transaction {self._tx_tracker.pending.txhash.hex()} has been paused."
)
hook = self._tx_tracker.pending.on_pause
if hook:
fire_hook(hook=hook, tx=self._tx_tracker.pending)

def resume(self) -> None:
self.log.info("[pause] pause lifted by strategy")
self._pause = False # resume

def __strategize(self) -> Optional[PendingTx]:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
Expand All @@ -351,8 +336,7 @@ def __strategize(self) -> Optional[PendingTx]:
try:
params = strategy.execute(pending=_active_copy)
except Wait as e:
log.info(f"[pause] strategy {strategy.__class__} signalled pause: {e}")
self.pause()
log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}")
return
except TransactionFault as e:
self._tx_tracker.fault(
Expand All @@ -366,9 +350,6 @@ def __strategize(self) -> Optional[PendingTx]:
# keep the parameters as they are.
_active_copy.params.update(params)

if self._pause:
self.resume()

# (!) retry the transaction with the new parameters
retry_params = TxParams(_active_copy.params)
_names = " -> ".join(s.name for s in self.strategies)
Expand Down Expand Up @@ -417,3 +398,43 @@ def __monitor_finalized(self) -> None:
self.log.info(
f"[monitor] transaction {tx.txhash.hex()} has {confirmations} confirmations"
)

#
# Exposed functions
#
def pause(self) -> None:
"""
Pause the machine's tx processing loop; no txs are processed until unpaused (resume()).
"""
if not self._pause:
self._pause = True
self.log.info("[pause] pause mode requested")
self._cycle_state() # force a move to PAUSED state (don't wait for next iteration)

def resume(self) -> None:
"""Unpauses (resumes) the machine's tx processing loop."""
if self._pause:
self._pause = False
self.log.info("[pause] pause mode deactivated")
self._wake()

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
previously_busy_or_paused = self._busy or self._pause

if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not previously_busy_or_paused:
self._wake()

return tx
17 changes: 0 additions & 17 deletions atxm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,6 @@ def faults(self) -> List[AsyncTx]:
"""Return a set of faulted transactions."""
return list(self._tx_tracker.faulty)

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
if signer.address not in self.signers:
self.signers[signer.address] = signer
tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not self._task.running:
self._wake()
return tx

def queue_transactions(
self, params: List[TxParams], signer: LocalAccount, *args, **kwargs
) -> List[FutureTx]:
Expand Down
2 changes: 0 additions & 2 deletions atxm/tacker.py → atxm/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def _queue(
info: Dict[str, str] = None,
on_broadcast: Optional[Callable] = None,
on_finalized: Optional[Callable] = None,
on_pause: Optional[Callable] = None,
on_fault: Optional[Callable] = None,
) -> FutureTx:
"""Queue a new transaction for broadcast and subsequent tracking."""
Expand All @@ -213,7 +212,6 @@ def _queue(
# configure hooks
tx.on_broadcast = on_broadcast
tx.on_finalized = on_finalized
tx.on_pause = on_pause
tx.on_fault = on_fault

self.__queue.append(tx)
Expand Down
1 change: 0 additions & 1 deletion atxm/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class AsyncTx(ABC):
on_finalized: Optional[Callable[["FinalizedTx"], None]] = field(
default=None, init=False
)
on_pause: Optional[Callable[[PendingTx], None]] = field(default=None, init=False)
on_fault: Optional[Callable] = field(default=None, init=False)

def __repr__(self):
Expand Down
5 changes: 4 additions & 1 deletion atxm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]:
# If it is equals 0 the transaction was reverted by EVM.
# https://web3py.readthedocs.io/en/stable/web3.eth.html#web3.eth.Eth.get_transaction_receipt
log.warn(
f"Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}"
f"[reverted] Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}"
)
raise TransactionReverted(receipt)

Expand Down Expand Up @@ -159,4 +159,7 @@ def _make_tx_params(data: TxData) -> TxParams:
params["type"] = "0x02"
params["maxFeePerGas"] = data["maxFeePerGas"]
params["maxPriorityFeePerGas"] = data["maxPriorityFeePerGas"]
else:
raise ValueError(f"unrecognized tx data: {data}")

return params
6 changes: 4 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def machine(w3):
clock = Clock()
_machine = AutomaticTxMachine(w3=w3)
_machine._task.clock = clock
return _machine
yield _machine

_machine.stop()


@pytest.fixture
Expand All @@ -77,7 +79,7 @@ def interval(machine):
return 1


@pytest.fixture(autouse=True)
@pytest.fixture
def mock_wake_sleep(machine, mocker):
wake = mocker.patch.object(machine, "_wake")
sleep = mocker.patch.object(machine, "_sleep")
Expand Down
10 changes: 9 additions & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@


@pytest_twisted.inlineCallbacks
def test_machine(account, w3, legacy_transaction, eip1559_transaction, machine, clock):
def test_machine(
account,
w3,
legacy_transaction,
eip1559_transaction,
machine,
clock,
mock_wake_sleep,
):
assert not machine.busy
async_txs = machine.queue_transactions(
params=[legacy_transaction, eip1559_transaction],
Expand Down
Loading
Loading