From 9935080ff0e744f7f6b437855017ea90fb2131a8 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 09:35:31 -0500 Subject: [PATCH 01/14] Rename module from tacker -> tracker. --- atxm/machine.py | 2 +- atxm/{tacker.py => tracker.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename atxm/{tacker.py => tracker.py} (100%) diff --git a/atxm/machine.py b/atxm/machine.py index e7c3488..cc77640 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -15,7 +15,7 @@ TransactionFault, Fault, ) -from atxm.tacker import _TxTracker +from atxm.tracker import _TxTracker from atxm.strategies import ( AsyncTxStrategy, InsufficientFundsPause, diff --git a/atxm/tacker.py b/atxm/tracker.py similarity index 100% rename from atxm/tacker.py rename to atxm/tracker.py From 17829a7625bc25ac697ff618d7ebe79c700c318f Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 09:36:57 -0500 Subject: [PATCH 02/14] Clean up logging. --- atxm/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/atxm/utils.py b/atxm/utils.py index 067dc8e..0a210a1 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -71,7 +71,7 @@ def _get_receipt(w3: Web3, pending_tx: PendingTx) -> Optional[TxReceipt]: # If it is equals 0 the transaction was reverted by EVM. # https://web3py.readthedocs.io/en/stable/web3.eth.html#web3.eth.Eth.get_transaction_receipt log.warn( - f"Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}" + f"[reverted] Transaction {txdata['hash'].hex()} was reverted by EVM with status {status}" ) raise TransactionReverted(receipt) From b71b1106ad86d60a116ac64736f6d54587c82deb Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 09:37:37 -0500 Subject: [PATCH 03/14] Raise ValueError if tx type unrecognized. Ensures that we always handling the different types of txs appropriately. --- atxm/utils.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/atxm/utils.py b/atxm/utils.py index 0a210a1..ef4c0d3 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -159,4 +159,7 @@ def _make_tx_params(data: TxData) -> TxParams: params["type"] = "0x02" params["maxFeePerGas"] = data["maxFeePerGas"] params["maxPriorityFeePerGas"] = data["maxPriorityFeePerGas"] + else: + raise ValueError(f"unrecognized tx data: {data}") + return params From da297a3f9f5f91a403e0663471d9713890d8d34f Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:22:36 -0500 Subject: [PATCH 04/14] Remove the use of on_pause hook - pauses are user-controlled so no use in providing a callback to alert the user about pausing. Move queue_transaction to _Machine instead - more applicable there since internals of _Machine are used. Move exposed functions to their own section in _Machine. --- README.md | 1 - atxm/machine.py | 51 ++++++++++++++++++++++++++++++++++++------------- atxm/main.py | 17 ----------------- atxm/tracker.py | 2 -- atxm/tx.py | 1 - 5 files changed, 38 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index b2b4e39..fee30a9 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,6 @@ Hooks are fired in a dedicated thread for lifecycle events. - `on_broadcast`: When a transaction is broadcasted. - `on_finalized`: When a transaction is finalized. -- `on_pause`: When a transaction is halted by the strategy. - `on_fault`: When a transaction reverted or another error occurred. diff --git a/atxm/machine.py b/atxm/machine.py index cc77640..1276ddf 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -328,19 +328,6 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]: fire_hook(hook=tx.on_broadcast, tx=pending_tx) return pending_tx - def pause(self) -> None: - self._pause = True - self.log.warn( - f"[pause] pending transaction {self._tx_tracker.pending.txhash.hex()} has been paused." - ) - hook = self._tx_tracker.pending.on_pause - if hook: - fire_hook(hook=hook, tx=self._tx_tracker.pending) - - def resume(self) -> None: - self.log.info("[pause] pause lifted by strategy") - self._pause = False # resume - def __strategize(self) -> Optional[PendingTx]: """Retry the currently tracked pending transaction with the configured strategy.""" if not self._tx_tracker.pending: @@ -417,3 +404,41 @@ def __monitor_finalized(self) -> None: self.log.info( f"[monitor] transaction {tx.txhash.hex()} has {confirmations} confirmations" ) + + # + # Exposed functions + # + def pause(self) -> None: + """ + Pause the machine's tx processing loop; no txs are processed until unpaused (resume()). + """ + self._pause = True + self.log.info("[pause] pause mode requested") + + def resume(self) -> None: + """Unpauses (resumes) the machine's tx processing loop.""" + if self._pause: + self._pause = False + self.log.info("[pause] pause mode deactivated") + self._wake() + + def queue_transaction( + self, params: TxParams, signer: LocalAccount, *args, **kwargs + ) -> FutureTx: + """ + Queue a new transaction for broadcast and subsequent tracking. + Optionally provide a dictionary of additional string data + to log during the transaction's lifecycle for identification. + """ + previously_busy = self._busy + + if signer.address not in self.signers: + self.signers[signer.address] = signer + + tx = self._tx_tracker._queue( + _from=signer.address, params=params, *args, **kwargs + ) + if not previously_busy: + self._wake() + + return tx diff --git a/atxm/main.py b/atxm/main.py index ba79acf..b6eccd5 100644 --- a/atxm/main.py +++ b/atxm/main.py @@ -57,23 +57,6 @@ def faults(self) -> List[AsyncTx]: """Return a set of faulted transactions.""" return list(self._tx_tracker.faulty) - def queue_transaction( - self, params: TxParams, signer: LocalAccount, *args, **kwargs - ) -> FutureTx: - """ - Queue a new transaction for broadcast and subsequent tracking. - Optionally provide a dictionary of additional string data - to log during the transaction's lifecycle for identification. - """ - if signer.address not in self.signers: - self.signers[signer.address] = signer - tx = self._tx_tracker._queue( - _from=signer.address, params=params, *args, **kwargs - ) - if not self._task.running: - self._wake() - return tx - def queue_transactions( self, params: List[TxParams], signer: LocalAccount, *args, **kwargs ) -> List[FutureTx]: diff --git a/atxm/tracker.py b/atxm/tracker.py index 94cc7d4..2d71e85 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -199,7 +199,6 @@ def _queue( info: Dict[str, str] = None, on_broadcast: Optional[Callable] = None, on_finalized: Optional[Callable] = None, - on_pause: Optional[Callable] = None, on_fault: Optional[Callable] = None, ) -> FutureTx: """Queue a new transaction for broadcast and subsequent tracking.""" @@ -213,7 +212,6 @@ def _queue( # configure hooks tx.on_broadcast = on_broadcast tx.on_finalized = on_finalized - tx.on_pause = on_pause tx.on_fault = on_fault self.__queue.append(tx) diff --git a/atxm/tx.py b/atxm/tx.py index 7b6b4ce..a262dc4 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -23,7 +23,6 @@ class AsyncTx(ABC): on_finalized: Optional[Callable[["FinalizedTx"], None]] = field( default=None, init=False ) - on_pause: Optional[Callable[[PendingTx], None]] = field(default=None, init=False) on_fault: Optional[Callable] = field(default=None, init=False) def __repr__(self): From 766074096de4d91358f4181225cdb9f284382e88 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:25:06 -0500 Subject: [PATCH 05/14] Strategy raising Wait should not cause pause, but rather simply skip any processing for this execution round of the machine; reasses during next execution round. --- atxm/machine.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 1276ddf..e0256bb 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -338,8 +338,7 @@ def __strategize(self) -> Optional[PendingTx]: try: params = strategy.execute(pending=_active_copy) except Wait as e: - log.info(f"[pause] strategy {strategy.__class__} signalled pause: {e}") - self.pause() + log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}") return except TransactionFault as e: self._tx_tracker.fault( From 887c589a9c7aa235a541fcf5437a9d50b2eefbf7 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:27:32 -0500 Subject: [PATCH 06/14] Pause can be activated from either Busy or Idle states. Pause will cause the looping task to stop (sleep), and resume will cause the looping task to start (wake). Idle state itself does not need processing, just setting of the interval when it is transitioned into. --- atxm/machine.py | 51 +++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index e0256bb..03271fa 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -48,25 +48,32 @@ class _Machine(StateMachine): # # State Machine: # - # Idle <---> Busy <---> Paused - # | ^ | ^ | ^ - # V | V | V | - # _ _ _ + # Pause + # ^ ^ + # / \ + # V v + # Idle <---> Busy + # | ^ | ^ + # V | V | + # _ _ # _BUSY = State("Busy") _IDLE = State("Idle", initial=True) _PAUSED = State("Paused") # - State Transitions - - _transition_to_idle = _BUSY.to(_IDLE, unless="_busy") # Busy -> Idle - _transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") # Busy -> Pause + _transition_to_paused = _BUSY.to(_PAUSED, cond="_pause") | _IDLE.to( + _PAUSED, cond="_pause" + ) # Busy/Idle -> Pause + _transition_to_idle = _BUSY.to(_IDLE, unless=["_busy", "_pause"]) | _PAUSED.to( + _IDLE, unless=["_busy", "_pause"] + ) # Busy/Paused -> Idle _transition_to_busy = _IDLE.to(_BUSY, cond="_busy") | _PAUSED.to( _BUSY, unless="_pause" ) # Idle/Pause -> Busy # self transitions i.e. remain in same state _remain_busy = _BUSY.to.itself(cond="_busy", unless="_pause") - _remain_idle = _IDLE.to.itself(unless="_busy") - _remain_paused = _PAUSED.to.itself(cond="_pause") + _remain_idle = _IDLE.to.itself(unless=["_busy", "_pause"]) _cycle_state = ( _transition_to_idle @@ -74,7 +81,6 @@ class _Machine(StateMachine): | _transition_to_busy | _remain_busy | _remain_idle - | _remain_paused ) # internal @@ -157,30 +163,17 @@ def _handle_errors(self, *args, **kwargs): @_transition_to_paused.before def _enter_pause_mode(self): self.log.info("[pause] pause mode activated") - return @_PAUSED.enter def _process_paused(self): - # TODO what action should be taken to check if we leave the pause state? - # Perhaps a call to self.__strategize() - return - - @_IDLE.enter - def _process_idle(self): - """Return to idle mode if not already there (slow down)""" - if self._task.interval != self._IDLE_INTERVAL: - # TODO does changing the interval value actually update the LoopingCall? - self._task.interval = self._IDLE_INTERVAL - self.log.info( - f"[done] returning to idle mode with " - f"{self._task.interval} second interval" - ) + self._sleep() - # TODO - # 1. don't always sleep (idle for some number of cycles?) - # 2. careful sleeping - potential concurrency concerns - # 3. there is currently no difference between sleep/idle ... - self._sleep() + @_transition_to_idle.before + def _enter_idle_mode(self): + self._task.interval = self._IDLE_INTERVAL + self.log.info( + f"[idle] returning to idle mode with {self._task.interval} second interval" + ) @_transition_to_busy.before def _enter_busy_mode(self): From ea62ab650edbabaaeba240f6a71d1051d139b8a9 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:29:32 -0500 Subject: [PATCH 07/14] Wake should immediately run the looping call - either we are idle and a tx got queued, OR we are in pause and need to be woken up. --- atxm/machine.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 03271fa..6f53ab5 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -184,7 +184,7 @@ def _enter_busy_mode(self): self._task.interval = max( round(average_block_time * self._BLOCK_INTERVAL), self._MIN_INTERVAL ) - self.log.info(f"[working] cycle interval is {self._task.interval} seconds") + self.log.info(f"[working] cycle interval is now {self._task.interval} seconds") @_BUSY.enter def _process_busy(self): @@ -227,9 +227,14 @@ def _stop(self): self._task.stop() def _wake(self) -> None: - if not self._task.running: - log.info("[wake] waking up") - self._start(now=True) + """Runs the looping call immediately.""" + log.info("[reset] running looping call now.") + if self._task.running: + # TODO instead of stopping/starting, can you set interval to 0 + # and call reset to have looping call immediately? + self._stop() + + self._start(now=True) def _sleep(self) -> None: if self._task.running: From c29c9d44dffffc5e0cf454d19a09bd7d2f82aa45 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 14:56:57 -0500 Subject: [PATCH 08/14] Improved testing of machine. --- tests/conftest.py | 4 ++- tests/test_machine.py | 83 +++++++++++++++++++++++++++---------------- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index afea01e..27d7ae3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -64,7 +64,9 @@ def machine(w3): clock = Clock() _machine = AutomaticTxMachine(w3=w3) _machine._task.clock = clock - return _machine + yield _machine + + _machine.stop() @pytest.fixture diff --git a/tests/test_machine.py b/tests/test_machine.py index 3cd4d3c..c696385 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -1,8 +1,5 @@ import pytest - import pytest_twisted -from twisted.internet import reactor -from twisted.internet.task import deferLater from atxm.tx import FutureTx, PendingTx @@ -14,14 +11,15 @@ def rpc_spy(mocker, w3): @pytest_twisted.inlineCallbacks -def test_no_rpc_calls_when_idle(machine, state_observer, clock, rpc_spy): +def test_no_rpc_calls_when_idle(clock, machine, state_observer, rpc_spy): assert machine.current_state == machine._IDLE assert not machine.busy assert len(machine.queued) == 0 machine.start(now=True) - yield clock.advance(machine._task.interval * 3) - machine.stop() + for i in range(3): + yield clock.advance(1) + assert machine.current_state == machine._IDLE # Verify that no RPC calls were made assert rpc_spy.call_count == 0 @@ -32,11 +30,12 @@ def test_no_rpc_calls_when_idle(machine, state_observer, clock, rpc_spy): assert machine.current_state == machine._IDLE assert len(state_observer.transitions) == 0 # remained idle + machine.stop() + def test_queue( machine, state_observer, - clock, rpc_spy, account, eip1559_transaction, @@ -76,8 +75,16 @@ def test_queue( @pytest_twisted.inlineCallbacks def test_broadcast( - machine, state_observer, clock, eip1559_transaction, account, mocker + clock, + machine, + state_observer, + eip1559_transaction, + account, + mocker, + mock_wake_sleep, ): + wake, _ = mock_wake_sleep + assert machine.current_state == machine._IDLE assert not machine.busy @@ -90,14 +97,16 @@ def test_broadcast( info={"message": "something wonderful is happening..."}, ) + assert wake.call_count == 1 + # There is one queued transaction assert len(machine.queued) == 1 - # distort the time-space continuum machine.start(now=True) while machine.pending is None: yield clock.advance(1) - machine.stop() + + assert machine.current_state == machine._BUSY # The transaction is no longer queued assert len(machine.queued) == 0 @@ -110,20 +119,21 @@ def test_broadcast( assert atx.txhash # wait for the hook to be called - yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 # tx only broadcasted and not finalized, so we are still busy assert machine.current_state == machine._BUSY + assert len(state_observer.transitions) == 1 assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) @pytest_twisted.inlineCallbacks def test_finalize( + chain, + clock, machine, state_observer, - clock, eip1559_transaction, account, mock_wake_sleep, @@ -142,16 +152,19 @@ def test_finalize( # There is one queued transaction assert len(machine.queued) == 1 - # advance time to broadcast the transaction machine.start(now=True) + + # advance to broadcast the transaction while machine.pending is None: yield clock.advance(1) - machine.stop() + + assert machine.current_state == machine._BUSY + assert machine.pending == atx - # advance time to finalize the transaction - machine.start(now=True) + # advance to finalize the transaction while machine.pending: + yield chain.mine(1) yield clock.advance(1) # The transaction is no longer pending @@ -166,8 +179,6 @@ def test_finalize( assert atx.final assert atx.receipt - # wait for the hook to be called - yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 yield clock.advance(1) @@ -195,16 +206,20 @@ def test_follow( signer=account, ) + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + while not machine.finalized: yield clock.advance(1) assert atx.final is True while len(machine.finalized) > 0: - yield clock.advance(1) yield chain.mine(1) - - machine.stop() + yield clock.advance(1) assert len(machine.finalized) == 0 assert len(machine.queued) == 0 @@ -213,16 +228,14 @@ def test_follow( assert not machine.busy - # wait for the hook to be called - yield deferLater(reactor, 0.2, lambda: None) - assert sleep.call_count == 1 - assert machine.current_state == machine._IDLE assert len(state_observer.transitions) == 2 assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) assert state_observer.transitions[1] == (machine._BUSY, machine._IDLE) + machine.stop() + def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, account): assert machine.current_state == machine._IDLE @@ -232,6 +245,19 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac # no change in state assert machine.current_state == machine._IDLE + # idle -> pause + machine.pause() + machine._cycle() + assert machine.current_state == machine._PAUSED + assert machine._pause + + # resume after pausing + machine.resume() + machine._cycle() + assert machine.current_state == machine._IDLE + assert not machine._pause + assert not machine.busy + atx = machine.queue_transaction( params=eip1559_transaction, signer=account, @@ -242,17 +268,12 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac assert machine.current_state == machine._BUSY assert machine.busy - # pause + # busy -> pause machine.pause() machine._cycle() assert machine.current_state == machine._PAUSED assert machine._pause - for i in range(3): - machine._cycle() - # no change in state - assert machine.current_state == machine._PAUSED - # resume after pausing machine.resume() machine._cycle() From 03b49ddcfacd5eaff101b1f4481d96630976504a Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 15:05:16 -0500 Subject: [PATCH 09/14] Wait for hook to be called. --- tests/test_machine.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tests/test_machine.py b/tests/test_machine.py index c696385..19fc036 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -1,5 +1,8 @@ import pytest + import pytest_twisted +from twisted.internet import reactor +from twisted.internet.task import deferLater from atxm.tx import FutureTx, PendingTx @@ -119,6 +122,7 @@ def test_broadcast( assert atx.txhash # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 # tx only broadcasted and not finalized, so we are still busy @@ -127,6 +131,8 @@ def test_broadcast( assert len(state_observer.transitions) == 1 assert state_observer.transitions[0] == (machine._IDLE, machine._BUSY) + machine.stop() + @pytest_twisted.inlineCallbacks def test_finalize( @@ -179,6 +185,8 @@ def test_finalize( assert atx.final assert atx.receipt + # wait for the hook to be called + yield deferLater(reactor, 0.2, lambda: None) assert hook.call_count == 1 yield clock.advance(1) From 31267f582a0d26e8047f8dbfa09a2955e8c13a7d Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 15:45:32 -0500 Subject: [PATCH 10/14] Add test that calls pause while already paused, and resume when not paused; all is still fine. --- tests/test_machine.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/test_machine.py b/tests/test_machine.py index 19fc036..01f3e0a 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -245,7 +245,7 @@ def test_follow( machine.stop() -def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, account): +def test_simple_state_transitions(chain, machine, eip1559_transaction, account): assert machine.current_state == machine._IDLE for i in range(3): @@ -257,13 +257,18 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac machine.pause() machine._cycle() assert machine.current_state == machine._PAUSED - assert machine._pause + assert machine.paused + + # calling pause has no effect if already paused + assert machine.paused + for i in range(3): + machine.pause() # resume after pausing machine.resume() machine._cycle() assert machine.current_state == machine._IDLE - assert not machine._pause + assert not machine.paused assert not machine.busy atx = machine.queue_transaction( @@ -280,13 +285,13 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac machine.pause() machine._cycle() assert machine.current_state == machine._PAUSED - assert machine._pause + assert machine.paused # resume after pausing machine.resume() machine._cycle() assert machine.current_state == machine._BUSY - assert not machine._pause + assert not machine.paused # finalize tx while machine.busy: @@ -298,3 +303,9 @@ def test_simple_state_transitions(chain, machine, clock, eip1559_transaction, ac # transition to idle machine._cycle() assert machine.current_state == machine._IDLE + + # resume has no effect if not paused + assert not machine.paused + for i in range(3): + machine.resume() + assert machine.current_state == machine._IDLE From cb7f893c563a58373a5abf2f67964ea33283e6a8 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Wed, 28 Feb 2024 16:32:11 -0500 Subject: [PATCH 11/14] Don't calll _wake if previous state was busy/pause after queueing a tx. Add tests for _wake. --- atxm/machine.py | 4 +- tests/conftest.py | 2 +- tests/test_api.py | 10 +++- tests/test_machine.py | 129 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 137 insertions(+), 8 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 6f53ab5..1d2a8f3 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -427,7 +427,7 @@ def queue_transaction( Optionally provide a dictionary of additional string data to log during the transaction's lifecycle for identification. """ - previously_busy = self._busy + previously_busy_or_paused = self._busy or self._pause if signer.address not in self.signers: self.signers[signer.address] = signer @@ -435,7 +435,7 @@ def queue_transaction( tx = self._tx_tracker._queue( _from=signer.address, params=params, *args, **kwargs ) - if not previously_busy: + if not previously_busy_or_paused: self._wake() return tx diff --git a/tests/conftest.py b/tests/conftest.py index 27d7ae3..2b8c43d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -79,7 +79,7 @@ def interval(machine): return 1 -@pytest.fixture(autouse=True) +@pytest.fixture def mock_wake_sleep(machine, mocker): wake = mocker.patch.object(machine, "_wake") sleep = mocker.patch.object(machine, "_sleep") diff --git a/tests/test_api.py b/tests/test_api.py index 3d18f69..f0489ac 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -4,7 +4,15 @@ @pytest_twisted.inlineCallbacks -def test_machine(account, w3, legacy_transaction, eip1559_transaction, machine, clock): +def test_machine( + account, + w3, + legacy_transaction, + eip1559_transaction, + machine, + clock, + mock_wake_sleep, +): assert not machine.busy async_txs = machine.queue_transactions( params=[legacy_transaction, eip1559_transaction], diff --git a/tests/test_machine.py b/tests/test_machine.py index 01f3e0a..09e936c 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -44,7 +44,7 @@ def test_queue( eip1559_transaction, mock_wake_sleep, ): - wake, sleep = mock_wake_sleep + wake, _ = mock_wake_sleep # The machine is idle assert machine.current_state == machine._IDLE @@ -76,6 +76,127 @@ def test_queue( assert len(state_observer.transitions) == 0 # nothing actually executed +def test_wake_after_queuing_when_idle_and_not_already_running( + machine, + eip1559_transaction, + account, + mocker, +): + assert machine.current_state == machine._IDLE + assert not machine.busy + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + assert not machine.running + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert stop_spy.call_count == 0, "no task to stop" + assert start_spy.call_count == 1, "task started" + + assert machine.running + assert machine.current_state == machine._BUSY + + machine.stop() + + +def test_wake_after_queuing_when_idle_and_already_running( + machine, + eip1559_transaction, + account, + mocker, +): + machine.start(now=True) + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + assert machine.current_state == machine._IDLE + assert not machine.busy + + assert machine.running + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert stop_spy.call_count == 1, "task stopped" + assert start_spy.call_count == 1, "task started" + + assert machine.running + assert machine.current_state == machine._BUSY + + machine.stop() + + +def test_wake_no_call_after_queuing_when_already_busy( + machine, + eip1559_transaction, + account, + mock_wake_sleep, +): + wake, _ = mock_wake_sleep + + assert machine.current_state == machine._IDLE + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 + + machine._cycle() + assert machine.current_state == machine._BUSY + + # Queue another transaction while busy + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 1 # remains unchanged + + +def test_wake_no_call_after_queuing_when_already_paused( + machine, + eip1559_transaction, + account, + mock_wake_sleep, +): + wake, sleep = mock_wake_sleep + + assert machine.current_state == machine._IDLE + + machine.pause() + machine._cycle() + assert machine.paused + assert machine.current_state == machine._PAUSED + + assert sleep.call_count == 1 + + # Queue a transaction + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + info={"message": "something wonderful is happening..."}, + ) + + assert wake.call_count == 0 + + @pytest_twisted.inlineCallbacks def test_broadcast( clock, @@ -204,8 +325,6 @@ def test_finalize( def test_follow( chain, machine, state_observer, clock, eip1559_transaction, account, mock_wake_sleep ): - wake, sleep = mock_wake_sleep - machine.start() assert machine.current_state == machine._IDLE @@ -245,7 +364,9 @@ def test_follow( machine.stop() -def test_simple_state_transitions(chain, machine, eip1559_transaction, account): +def test_simple_state_transitions( + chain, machine, eip1559_transaction, account, mock_wake_sleep +): assert machine.current_state == machine._IDLE for i in range(3): From 96e1435146802d43e3a380614089a5561c4eae02 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 09:01:54 -0500 Subject: [PATCH 12/14] Force a move to paused state instead of waiting for next iteration. --- atxm/machine.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index 1d2a8f3..b4dcf8d 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -228,7 +228,7 @@ def _stop(self): def _wake(self) -> None: """Runs the looping call immediately.""" - log.info("[reset] running looping call now.") + log.info("[wake] running looping call now.") if self._task.running: # TODO instead of stopping/starting, can you set interval to 0 # and call reset to have looping call immediately? @@ -409,8 +409,10 @@ def pause(self) -> None: """ Pause the machine's tx processing loop; no txs are processed until unpaused (resume()). """ - self._pause = True - self.log.info("[pause] pause mode requested") + if not self._pause: + self._pause = True + self.log.info("[pause] pause mode requested") + self._cycle_state() # force a move to PAUSED state (don't wait for next iteration) def resume(self) -> None: """Unpauses (resumes) the machine's tx processing loop.""" From 554040529fdf00ed3c313fd6d9006d0884676f10 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 09:02:46 -0500 Subject: [PATCH 13/14] Improve testing around pausing/waking given that pause now forces a move to the PAUSED state instead of waiting until next iteration of looping call. --- tests/test_machine.py | 82 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 77 insertions(+), 5 deletions(-) diff --git a/tests/test_machine.py b/tests/test_machine.py index 09e936c..76bf4f3 100644 --- a/tests/test_machine.py +++ b/tests/test_machine.py @@ -181,7 +181,6 @@ def test_wake_no_call_after_queuing_when_already_paused( assert machine.current_state == machine._IDLE machine.pause() - machine._cycle() assert machine.paused assert machine.current_state == machine._PAUSED @@ -364,6 +363,75 @@ def test_follow( machine.stop() +@pytest_twisted.inlineCallbacks +def test_pause_when_idle(clock, machine, mocker): + machine.start() + assert machine.current_state == machine._IDLE + assert machine.running + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + machine.pause() + yield clock.advance(1) + + assert machine.current_state == machine._PAUSED + + assert not machine.running + + assert stop_spy.call_count == 1, "task stopped since paused" + assert start_spy.call_count == 0 + + machine.resume() + + assert stop_spy.call_count == 1 + assert start_spy.call_count == 1, "machine restarted" + + assert machine.current_state == machine._IDLE + assert machine.running + machine.stop() + + +@pytest_twisted.inlineCallbacks +def test_pause_when_busy(clock, machine, eip1559_transaction, account, mocker): + machine.start() + assert machine.current_state == machine._IDLE + assert machine.running + + _ = machine.queue_transaction( + params=eip1559_transaction, + signer=account, + ) + + # advance to broadcast the transaction + while machine.pending is None: + yield clock.advance(1) + + assert machine.current_state == machine._BUSY + + stop_spy = mocker.spy(machine._task, "stop") + start_spy = mocker.spy(machine._task, "start") + + machine.pause() + yield clock.advance(1) + + assert machine.current_state == machine._PAUSED + + assert not machine.running + + assert stop_spy.call_count == 1, "task stopped since paused" + assert start_spy.call_count == 0 + + machine.resume() + + assert stop_spy.call_count == 1 + assert start_spy.call_count == 1, "machine restarted" + + assert machine.current_state == machine._BUSY + assert machine.running + machine.stop() + + def test_simple_state_transitions( chain, machine, eip1559_transaction, account, mock_wake_sleep ): @@ -376,7 +444,6 @@ def test_simple_state_transitions( # idle -> pause machine.pause() - machine._cycle() assert machine.current_state == machine._PAUSED assert machine.paused @@ -387,7 +454,7 @@ def test_simple_state_transitions( # resume after pausing machine.resume() - machine._cycle() + machine._cycle() # wake doesn't do anything because mocked assert machine.current_state == machine._IDLE assert not machine.paused assert not machine.busy @@ -404,13 +471,12 @@ def test_simple_state_transitions( # busy -> pause machine.pause() - machine._cycle() assert machine.current_state == machine._PAUSED assert machine.paused # resume after pausing machine.resume() - machine._cycle() + machine._cycle() # wake doesn't do anything because mocked assert machine.current_state == machine._BUSY assert not machine.paused @@ -426,7 +492,13 @@ def test_simple_state_transitions( assert machine.current_state == machine._IDLE # resume has no effect if not paused + wake, sleep = mock_wake_sleep + wake_call_count = wake.call_count + sleep_call_count = sleep.call_count + assert not machine.paused for i in range(3): machine.resume() assert machine.current_state == machine._IDLE + assert wake.call_count == wake_call_count, "wake call count remains unchanged" + assert sleep.call_count == sleep_call_count, "wake call count remains unchanged" From 7a98fe64e7cb2c840679860a0819598ea474e0d0 Mon Sep 17 00:00:00 2001 From: derekpierre Date: Thu, 29 Feb 2024 09:18:58 -0500 Subject: [PATCH 14/14] Don't unpause based on strategies - pauses are explicitly user-directed. --- atxm/machine.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/atxm/machine.py b/atxm/machine.py index b4dcf8d..1c1aca9 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -350,9 +350,6 @@ def __strategize(self) -> Optional[PendingTx]: # keep the parameters as they are. _active_copy.params.update(params) - if self._pause: - self.resume() - # (!) retry the transaction with the new parameters retry_params = TxParams(_active_copy.params) _names = " -> ".join(s.name for s in self.strategies)