Skip to content

Commit

Permalink
Add live load
Browse files Browse the repository at this point in the history
  • Loading branch information
tvorogme committed Mar 13, 2024
1 parent a8aac50 commit b2db6a0
Showing 1 changed file with 47 additions and 3 deletions.
50 changes: 47 additions & 3 deletions src/tonpy/blockscanner/blockscanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
sys.path.append("../src")

import requests
import os
import signal

from tonpy import LiteClient, Cell, get_block_info, BlockId, BlockIdExt, \
Address, Emulator, begin_cell, StackEntry, VmDict, CellSlice, SkipCryptoCurrency
Expand Down Expand Up @@ -360,7 +362,7 @@ class BlockScanner(Thread):
def __init__(self,
lcparams: dict,
start_from: int,
load_to: int,
load_to: int = None,
nproc: int = 20,
loglevel: int = 0,
raw_process: Callable = None,
Expand All @@ -372,7 +374,8 @@ def __init__(self,
transaction_subscriptions: "CustomSubscription" = None,
account_subscriptions: "CustomAccountSubscription" = None,
database_provider: "BaseDatabaseProvider" = None,
emulate_before_output: bool = False):
emulate_before_output: bool = False,
live_load_enable: bool = False):
"""
:param lcparams: Params for LiteClient
Expand All @@ -387,14 +390,30 @@ def __init__(self,
:param account_subscriptions: Rules to filter accounts that will be scanned
:param database_provider: TODO: Database connector to get information about accounts and states, save hashes, etc
:param emulate_before_output: If True - will emulate transaction to get actual account state on TX, default False
:param live_load_enable: If True - will load all new blocks, default False. If delta >90sec between updates - application will be killed
"""
super(BlockScanner, self).__init__()

self.only_mc_blocks = only_mc_blocks
self.lcparams = json.dumps(lcparams)
self.lc = LiteClient(**lcparams)
lcparams['timeout'] = 200

self.lc_long = LiteClient(**lcparams)
self.start_from = start_from
self.load_to = load_to
self.live_load_enable = live_load_enable

if load_to is not None:
self.load_to = load_to

if self.live_load_enable:
logger.info("Live load and load_to enabled (?)")
else:
if not self.live_load_enable:
raise ValueError(f"Provide load_to or live_load_enable")

self.load_to = self.lc.get_masterchain_info_ext().last.id.seqno

self.loglevel = loglevel
self.nproc = nproc
self.chunk_size = chunk_size
Expand Down Expand Up @@ -679,6 +698,31 @@ def run(self):
self.load_historical()
self.done = True

if self.live_load_enable:
self.load_live()

def load_live(self):
last_update = datetime.now()

while True:
try:
self.start_from = self.load_to

# Get last block seqno from MC
self.load_to = self.lc_long.wait_masterchain_seqno(self.load_to + 1, 10000).last.id.seqno

self.load_historical()
last_update = datetime.now()

except Exception as e:
delta = (datetime.now() - last_update).total_seconds()
logger.error(f"Error in live load: {e}, retry. Delta: {delta}")

if delta > 80:
logger.error(f"Fatal error, too big delta: {delta}")
self.out_queue.put(ValueError("Delta too big"))
os.kill(os.getpid(), signal.SIGKILL)


def raw_process(chunk):
out = []
Expand Down

0 comments on commit b2db6a0

Please sign in to comment.