Skip to content

Commit

Permalink
Merge pull request #16 from derekpierre/user-pausing
Browse files Browse the repository at this point in the history
User initiated pausing
  • Loading branch information
KPrasch authored Mar 4, 2024
2 parents 1356c3a + 7a98fe6 commit ee41fa7
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 110 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ Hooks are fired in a dedicated thread for lifecycle events.

- `on_broadcast`: When a transaction is broadcasted.
- `on_finalized`: When a transaction is finalized.
- `on_pause`: When a transaction is halted by the strategy.
- `on_fault`: When a transaction reverted or another error occurred.


Expand Down
125 changes: 73 additions & 52 deletions atxm/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
TransactionFault,
Fault,
)
from atxm.tacker import _TxTracker
from atxm.tracker import _TxTracker
from atxm.strategies import (
AsyncTxStrategy,
InsufficientFundsPause,
Expand Down Expand Up @@ -48,33 +48,39 @@ class _Machine(StateMachine):
#
# State Machine:
#
# Idle <---> Busy <---> Paused
# | ^ | ^ | ^
# V | V | V |
# _ _ _
# Pause
# ^ ^
# / \
# V v
# Idle <---> Busy
# | ^ | ^
# V | V |
# _ _
#
_BUSY = State("Busy")
_IDLE = State("Idle", initial=True)
_PAUSED = State("Paused")

# - State Transitions -
_transition_to_idle = _BUSY.to(_IDLE, unless="_busy") # Busy -> Idle
_transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") # Busy -> Pause
_transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") | _IDLE.to(
_PAUSED, cond="_pause"
) # Busy/Idle -> Pause
_transition_to_idle = _BUSY.to(_IDLE, unless=["_busy", "_pause"]) | _PAUSED.to(
_IDLE, unless=["_busy", "_pause"]
) # Busy/Paused -> Idle
_transition_to_busy = _IDLE.to(_BUSY, cond="_busy") | _PAUSED.to(
_BUSY, unless="_pause"
) # Idle/Pause -> Busy
# self transitions i.e. remain in same state
_remain_busy = _BUSY.to.itself(cond="_busy", unless="_pause")
_remain_idle = _IDLE.to.itself(unless="_busy")
_remain_paused = _PAUSED.to.itself(cond="_pause")
_remain_idle = _IDLE.to.itself(unless=["_busy", "_pause"])

_cycle_state = (
_transition_to_idle
| _transition_to_paused
| _transition_to_busy
| _remain_busy
| _remain_idle
| _remain_paused
)

# internal
Expand Down Expand Up @@ -157,30 +163,17 @@ def _handle_errors(self, *args, **kwargs):
@_transition_to_paused.before
def _enter_pause_mode(self):
self.log.info("[pause] pause mode activated")
return

@_PAUSED.enter
def _process_paused(self):
# TODO what action should be taken to check if we leave the pause state?
# Perhaps a call to self.__strategize()
return

@_IDLE.enter
def _process_idle(self):
"""Return to idle mode if not already there (slow down)"""
if self._task.interval != self._IDLE_INTERVAL:
# TODO does changing the interval value actually update the LoopingCall?
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[done] returning to idle mode with "
f"{self._task.interval} second interval"
)
self._sleep()

# TODO
# 1. don't always sleep (idle for some number of cycles?)
# 2. careful sleeping - potential concurrency concerns
# 3. there is currently no difference between sleep/idle ...
self._sleep()
@_transition_to_idle.before
def _enter_idle_mode(self):
self._task.interval = self._IDLE_INTERVAL
self.log.info(
f"[idle] returning to idle mode with {self._task.interval} second interval"
)

@_transition_to_busy.before
def _enter_busy_mode(self):
Expand All @@ -191,7 +184,7 @@ def _enter_busy_mode(self):
self._task.interval = max(
round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL
)
self.log.info(f"[working] cycle interval is {self._task.interval} seconds")
self.log.info(f"[working] cycle interval is now {self._task.interval} seconds")

@_BUSY.enter
def _process_busy(self):
Expand Down Expand Up @@ -234,9 +227,14 @@ def _stop(self):
self._task.stop()

def _wake(self) -> None:
if not self._task.running:
log.info("[wake] waking up")
self._start(now=True)
"""Runs the looping call immediately."""
log.info("[wake] running looping call now.")
if self._task.running:
# TODO instead of stopping/starting, can you set interval to 0
# and call reset to have looping call immediately?
self._stop()

self._start(now=True)

def _sleep(self) -> None:
if self._task.running:
Expand Down Expand Up @@ -328,19 +326,6 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx

def pause(self) -> None:
self._pause = True
self.log.warn(
f"[pause] pending transaction {self._tx_tracker.pending.txhash.hex()} has been paused."
)
hook = self._tx_tracker.pending.on_pause
if hook:
fire_hook(hook=hook, tx=self._tx_tracker.pending)

def resume(self) -> None:
self.log.info("[pause] pause lifted by strategy")
self._pause = False # resume

def __strategize(self) -> Optional[PendingTx]:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
Expand All @@ -351,8 +336,7 @@ def __strategize(self) -> Optional[PendingTx]:
try:
params = strategy.execute(pending=_active_copy)
except Wait as e:
log.info(f"[pause] strategy {strategy.__class__} signalled pause: {e}")
self.pause()
log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}")
return
except TransactionFault as e:
self._tx_tracker.fault(
Expand All @@ -366,9 +350,6 @@ def __strategize(self) -> Optional[PendingTx]:
# keep the parameters as they are.
_active_copy.params.update(params)

if self._pause:
self.resume()

# (!) retry the transaction with the new parameters
retry_params = TxParams(_active_copy.params)
_names = " -> ".join(s.name for s in self.strategies)
Expand Down Expand Up @@ -417,3 +398,43 @@ def __monitor_finalized(self) -> None:
self.log.info(
f"[monitor] transaction {tx.txhash.hex()} has {confirmations} confirmations"
)

#
# Exposed functions
#
def pause(self) -> None:
"""
Pause the machine's tx processing loop; no txs are processed until unpaused (resume()).
"""
if not self._pause:
self._pause = True
self.log.info("[pause] pause mode requested")
self._cycle_state() # force a move to PAUSED state (don't wait for next iteration)

def resume(self) -> None:
"""Unpauses (resumes) the machine's tx processing loop."""
if self._pause:
self._pause = False
self.log.info("[pause] pause mode deactivated")
self._wake()

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
previously_busy_or_paused = self._busy or self._pause

if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not previously_busy_or_paused:
self._wake()

return tx
17 changes: 0 additions & 17 deletions atxm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,6 @@ def faults(self) -> List[AsyncTx]:
"""Return a set of faulted transactions."""
return list(self._tx_tracker.faulty)

def queue_transaction(
self, params: TxParams, signer: LocalAccount, *args, **kwargs
) -> FutureTx:
"""
Queue a new transaction for broadcast and subsequent tracking.
Optionally provide a dictionary of additional string data
to log during the transaction's lifecycle for identification.
"""
if signer.address not in self.signers:
self.signers[signer.address] = signer
tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
if not self._task.running:
self._wake()
return tx

def queue_transactions(
self, params: List[TxParams], signer: LocalAccount, *args, **kwargs
) -> List[FutureTx]:
Expand Down
2 changes: 0 additions & 2 deletions atxm/tacker.py → atxm/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def _queue(
info: Dict[str, str] = None,
on_broadcast: Optional[Callable] = None,
on_finalized: Optional[Callable] = None,
on_pause: Optional[Callable] = None,
on_fault: Optional[Callable] = None,
) -> FutureTx:
"""Queue a new transaction for broadcast and subsequent tracking."""
Expand All @@ -213,7 +212,6 @@ def _queue(
# configure hooks
tx.on_broadcast = on_broadcast
tx.on_finalized = on_finalized
tx.on_pause = on_pause
tx.on_fault = on_fault

self.__queue.append(tx)
Expand Down
1 change: 0 additions & 1 deletion atxm/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ class AsyncTx(ABC):
on_finalized: Optional[Callable[["FinalizedTx"], None]] = field(
default=None, init=False
)
on_pause: Optional[Callable[[PendingTx], None]] = field(default=None, init=False)
on_fault: Optional[Callable] = field(default=None, init=False)

def __repr__(self):
Expand Down
5 changes: 4 additions & 1 deletion atxm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]:
# If it is equals 0 the transaction was reverted by EVM.
# https://web3py.readthedocs.io/en/stable/web3.eth.html#web3.eth.Eth.get_transaction_receipt
log.warn(
f"Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}"
f"[reverted] Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}"
)
raise TransactionReverted(receipt)

Expand Down Expand Up @@ -159,4 +159,7 @@ def _make_tx_params(data: TxData) -> TxParams:
params["type"] = "0x02"
params["maxFeePerGas"] = data["maxFeePerGas"]
params["maxPriorityFeePerGas"] = data["maxPriorityFeePerGas"]
else:
raise ValueError(f"unrecognized tx data: {data}")

return params
6 changes: 4 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def machine(w3):
clock = Clock()
_machine = AutomaticTxMachine(w3=w3)
_machine._task.clock = clock
return _machine
yield _machine

_machine.stop()


@pytest.fixture
Expand All @@ -77,7 +79,7 @@ def interval(machine):
return 1


@pytest.fixture(autouse=True)
@pytest.fixture
def mock_wake_sleep(machine, mocker):
wake = mocker.patch.object(machine, "_wake")
sleep = mocker.patch.object(machine, "_sleep")
Expand Down
10 changes: 9 additions & 1 deletion tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@


@pytest_twisted.inlineCallbacks
def test_machine(account, w3, legacy_transaction, eip1559_transaction, machine, clock):
def test_machine(
account,
w3,
legacy_transaction,
eip1559_transaction,
machine,
clock,
mock_wake_sleep,
):
assert not machine.busy
async_txs = machine.queue_transactions(
params=[legacy_transaction, eip1559_transaction],
Expand Down
Loading

0 comments on commit ee41fa7

Please sign in to comment.