From b2db6a0f22e86cc0b9eecb2775ea968cea23ab01 Mon Sep 17 00:00:00 2001 From: Andrey Tvorozhkov Date: Wed, 13 Mar 2024 17:15:54 +0300 Subject: [PATCH] Add live load --- src/tonpy/blockscanner/blockscanner.py | 50 ++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/src/tonpy/blockscanner/blockscanner.py b/src/tonpy/blockscanner/blockscanner.py index 5782949..1b7f5fa 100644 --- a/src/tonpy/blockscanner/blockscanner.py +++ b/src/tonpy/blockscanner/blockscanner.py @@ -9,6 +9,8 @@ sys.path.append("../src") import requests +import os +import signal from tonpy import LiteClient, Cell, get_block_info, BlockId, BlockIdExt, \ Address, Emulator, begin_cell, StackEntry, VmDict, CellSlice, SkipCryptoCurrency @@ -360,7 +362,7 @@ class BlockScanner(Thread): def __init__(self, lcparams: dict, start_from: int, - load_to: int, + load_to: int = None, nproc: int = 20, loglevel: int = 0, raw_process: Callable = None, @@ -372,7 +374,8 @@ def __init__(self, transaction_subscriptions: "CustomSubscription" = None, account_subscriptions: "CustomAccountSubscription" = None, database_provider: "BaseDatabaseProvider" = None, - emulate_before_output: bool = False): + emulate_before_output: bool = False, + live_load_enable: bool = False): """ :param lcparams: Params for LiteClient @@ -387,14 +390,30 @@ def __init__(self, :param account_subscriptions: Rules to filter accounts that will be scanned :param database_provider: TODO: Database connector to get information about accounts and states, save hashes, etc :param emulate_before_output: If True - will emulate transaction to get actual account state on TX, default False + :param live_load_enable: If True - will load all new blocks, default False. If delta >90sec between updates - application will be killed """ super(BlockScanner, self).__init__() self.only_mc_blocks = only_mc_blocks self.lcparams = json.dumps(lcparams) self.lc = LiteClient(**lcparams) + lcparams['timeout'] = 200 + + self.lc_long = LiteClient(**lcparams) self.start_from = start_from - self.load_to = load_to + self.live_load_enable = live_load_enable + + if load_to is not None: + self.load_to = load_to + + if self.live_load_enable: + logger.info("Live load and load_to enabled (?)") + else: + if not self.live_load_enable: + raise ValueError(f"Provide load_to or live_load_enable") + + self.load_to = self.lc.get_masterchain_info_ext().last.id.seqno + self.loglevel = loglevel self.nproc = nproc self.chunk_size = chunk_size @@ -679,6 +698,31 @@ def run(self): self.load_historical() self.done = True + if self.live_load_enable: + self.load_live() + + def load_live(self): + last_update = datetime.now() + + while True: + try: + self.start_from = self.load_to + + # Get last block seqno from MC + self.load_to = self.lc_long.wait_masterchain_seqno(self.load_to + 1, 10000).last.id.seqno + + self.load_historical() + last_update = datetime.now() + + except Exception as e: + delta = (datetime.now() - last_update).total_seconds() + logger.error(f"Error in live load: {e}, retry. Delta: {delta}") + + if delta > 80: + logger.error(f"Fatal error, too big delta: {delta}") + self.out_queue.put(ValueError("Delta too big")) + os.kill(os.getpid(), signal.SIGKILL) + def raw_process(chunk): out = []