From 985c733c18d6451fc76780573aaf83b45cdd9e31 Mon Sep 17 00:00:00 2001 From: Benjamin Smith Date: Wed, 29 May 2024 16:04:24 +0200 Subject: [PATCH] Add Logger & Example Run Script (#13) * add logging * update readme with latest example script * add reqeusts type stubs --- .gitignore | 3 +- README.md | 27 ++++++++++++---- example/__init__.py | 0 example/run.py | 33 +++++++++++++++++++ requirements.txt | 15 ++++++--- setup.py | 1 + src/near_lake_framework/__init__.py | 20 +++++++++--- src/near_lake_framework/near_primitives.py | 34 ++++++++++---------- src/near_lake_framework/s3_fetchers.py | 6 ++-- src/near_lake_framework/utils.py | 37 ++++++++++++++++++++++ 10 files changed, 139 insertions(+), 37 deletions(-) create mode 100644 example/__init__.py create mode 100644 example/run.py create mode 100644 src/near_lake_framework/utils.py diff --git a/.gitignore b/.gitignore index 597f517..fd41b82 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ build/ __pycache__/ .vscode *.egg-info -.mypy_cache \ No newline at end of file +.mypy_cache +.venv/ diff --git a/README.md b/README.md index a2470e9..c8deff5 100644 --- a/README.md +++ b/README.md @@ -20,17 +20,25 @@ Greetings from the Data Platform Team! We are happy and proud to announce an MVP ```python3 import asyncio +import logging import os from near_lake_framework import LakeConfig, streamer, Network +from near_lake_framework.utils import fetch_latest_block + +# Suppress warning logs from specific dependencies +logging.getLogger("near_lake_framework").setLevel(logging.INFO) async def main(): + network = Network.TESTNET + latest_final_block = fetch_latest_block(network=network) config = LakeConfig( - network=Network.MAINNET, - aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), - aws_secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"), - start_block_height=69130938, + network, + start_block_height=latest_final_block, + # These fields must be set! + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_key=os.environ["AWS_SECRET_ACCESS_KEY"], ) stream_handle, streamer_messages_queue = streamer(config) @@ -41,8 +49,15 @@ async def main(): ) -loop = asyncio.get_event_loop() -loop.run_until_complete(main()) +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) +``` + +Try it yourself as follows +```shell +pip3 install -r requirements.txt +python3 example/run.py ``` ## How to use diff --git a/example/__init__.py b/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/run.py b/example/run.py new file mode 100644 index 0000000..137a3a4 --- /dev/null +++ b/example/run.py @@ -0,0 +1,33 @@ +import asyncio +import logging +import os + +from near_lake_framework import LakeConfig, streamer, Network +from near_lake_framework.utils import fetch_latest_block + +# Suppress warning logs from specific dependencies +logging.getLogger("near_lake_framework").setLevel(logging.INFO) + + +async def main(): + network = Network.TESTNET + latest_final_block = fetch_latest_block(network=network) + config = LakeConfig( + network, + start_block_height=latest_final_block, + # These fields must be set! + aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"], + aws_secret_key=os.environ["AWS_SECRET_ACCESS_KEY"], + ) + + stream_handle, streamer_messages_queue = streamer(config) + while True: + streamer_message = await streamer_messages_queue.get() + print( + f"Received Block #{streamer_message.block.header.height} from Lake Framework" + ) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + loop.run_until_complete(main()) diff --git a/requirements.txt b/requirements.txt index af8450c..7f13e0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,10 @@ -asyncio==3.4.3 -dataclasses==0.6 -dataclasses-json==0.5.7 -botocore==1.24.21 -aiobotocore==2.3.0 +asyncio>=3.4.3 +dataclasses>=0.6 +dataclasses-json>=0.6.6 +botocore>=1.34.70 +aiobotocore>=2.13.0 +requests>=2.32.2 + +types-botocore>=1.0.2 +types-aiobotocore>=2.13.0 +types-requests>=2.32.0.20240523 diff --git a/setup.py b/setup.py index cfbab25..47ef68b 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ "dataclasses>=0.6", "dataclasses-json>=0.5.7", "aiobotocore>=2.3.0", + "requests>=2.32.2", ], package_data={"near_lake_framework": ["py.typed"]}, ) diff --git a/src/near_lake_framework/__init__.py b/src/near_lake_framework/__init__.py index e12e49b..3752c79 100644 --- a/src/near_lake_framework/__init__.py +++ b/src/near_lake_framework/__init__.py @@ -2,9 +2,11 @@ import asyncio import itertools from enum import Enum + +import logging from typing import Optional -from aiobotocore.session import get_session # type: ignore +from aiobotocore.session import get_session from near_lake_framework import near_primitives from near_lake_framework import s3_fetchers @@ -12,6 +14,12 @@ from dataclasses import dataclass +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s" +) +logger = logging.getLogger(__name__) + class Network(Enum): MAINNET = "mainnet" @@ -81,12 +89,14 @@ async def start(config: LakeConfig, streamer_messages_queue: asyncio.Queue): ) if not block_heights_prefixes: - print("No new blocks on S3, retry in 2s...") + logger.info("No new blocks on S3, retry in 2s...") await asyncio.sleep(2) continue - print("Received {} blocks from S3".format(len(block_heights_prefixes))) + logger.info( + "Received {} blocks from S3".format(len(block_heights_prefixes)) + ) pending_block_heights = iter(block_heights_prefixes) streamer_messages_futures = [] @@ -109,8 +119,8 @@ async def start(config: LakeConfig, streamer_messages_queue: asyncio.Queue): and last_processed_block_hash != streamer_message.block.header.prev_hash ): - print( - "`prev_hash` does not match, refetching the data from S3 in 200ms", + logger.warning( + "`prev_hash` does not match, re-fetching the data from S3 in 200ms", last_processed_block_hash, streamer_message.block.header.prev_hash, ) diff --git a/src/near_lake_framework/near_primitives.py b/src/near_lake_framework/near_primitives.py index bf36077..4b87900 100644 --- a/src/near_lake_framework/near_primitives.py +++ b/src/near_lake_framework/near_primitives.py @@ -1,4 +1,4 @@ -from typing import Any, List, Optional +from typing import Any, Optional from dataclasses import dataclass, field from dataclasses_json import DataClassJsonMixin, config, mm @@ -36,8 +36,8 @@ class BlockHeader(DataClassJsonMixin): metadata=config(mm_field=mm.fields.Integer(as_string=True)) ) random_value: CryptoHash - validator_proposals: List[Any] - chunk_mask: List[bool] + validator_proposals: list[Any] + chunk_mask: list[bool] gas_price: int = field(metadata=config(mm_field=mm.fields.Integer(as_string=True))) block_ordinal: Optional[int] rent_paid: int = field(metadata=config(mm_field=mm.fields.Integer(as_string=True))) @@ -47,13 +47,13 @@ class BlockHeader(DataClassJsonMixin): total_supply: int = field( metadata=config(mm_field=mm.fields.Integer(as_string=True)) ) - challenges_result: List[Any] + challenges_result: list[Any] last_final_block: CryptoHash last_ds_final_block: CryptoHash next_bp_hash: CryptoHash block_merkle_root: CryptoHash epoch_sync_data_hash: Optional[CryptoHash] - approvals: List[Optional[str]] + approvals: list[Optional[str]] signature: str latest_protocol_version: int height: BlockHeight = field(metadata=config(mm_field=BlockHeightField())) @@ -84,7 +84,7 @@ class ChunkHeader(DataClassJsonMixin): ) outgoing_receipts_root: CryptoHash tx_root: CryptoHash - validator_proposals: List[Any] + validator_proposals: list[Any] signature: str @@ -92,7 +92,7 @@ class ChunkHeader(DataClassJsonMixin): class Block(DataClassJsonMixin): author: AccountId header: BlockHeader - chunks: List[ChunkHeader] + chunks: list[ChunkHeader] @dataclass @@ -113,13 +113,13 @@ class CostGasUsed(DataClassJsonMixin): @dataclass class ExecutionMetadata(DataClassJsonMixin): version: int - gas_profile: Optional[List[CostGasUsed]] + gas_profile: Optional[list[CostGasUsed]] @dataclass class ExecutionOutcome(DataClassJsonMixin): - logs: List[str] - receipt_ids: List[CryptoHash] + logs: list[str] + receipt_ids: list[CryptoHash] gas_burnt: int tokens_burnt: int = field( metadata=config(mm_field=mm.fields.Integer(as_string=True)) @@ -131,7 +131,7 @@ class ExecutionOutcome(DataClassJsonMixin): @dataclass class ExecutionOutcomeWithId(DataClassJsonMixin): - proof: List[Any] + proof: list[Any] block_hash: CryptoHash id: CryptoHash outcome: ExecutionOutcome @@ -149,7 +149,7 @@ class SignedTransaction(DataClassJsonMixin): public_key: str nonce: int receiver_id: AccountId - actions: List[Any] + actions: list[Any] signature: str hash: CryptoHash @@ -170,19 +170,19 @@ class IndexerTransactionWithOutcome(DataClassJsonMixin): class IndexerChunk(DataClassJsonMixin): author: AccountId header: ChunkHeader - transactions: List[IndexerTransactionWithOutcome] - receipts: List[Receipt] + transactions: list[IndexerTransactionWithOutcome] + receipts: list[Receipt] @dataclass class IndexerShard(DataClassJsonMixin): shard_id: int chunk: Optional[IndexerChunk] - receipt_execution_outcomes: List[IndexerExecutionOutcomeWithReceipt] - state_changes: List[Any] + receipt_execution_outcomes: list[IndexerExecutionOutcomeWithReceipt] + state_changes: list[Any] @dataclass class StreamerMessage(DataClassJsonMixin): block: Block - shards: List[IndexerShard] + shards: list[IndexerShard] diff --git a/src/near_lake_framework/s3_fetchers.py b/src/near_lake_framework/s3_fetchers.py index a23a6be..fce1ec3 100644 --- a/src/near_lake_framework/s3_fetchers.py +++ b/src/near_lake_framework/s3_fetchers.py @@ -1,10 +1,10 @@ import asyncio import logging -from typing import List import traceback from botocore.exceptions import ClientError from near_lake_framework import near_primitives +from near_lake_framework.near_primitives import IndexerShard async def list_blocks( @@ -12,7 +12,7 @@ async def list_blocks( s3_bucket_name: str, start_from_block_height: near_primitives.BlockHeight, number_of_blocks_requested: int, -) -> List[near_primitives.BlockHeight]: +) -> list[near_primitives.BlockHeight]: response = await s3_client.list_objects_v2( Bucket=s3_bucket_name, Delimiter="/", @@ -47,7 +47,7 @@ async def fetch_streamer_message( ] shards = await asyncio.gather(*shards_fetching) - return near_primitives.StreamerMessage(block, shards) + return near_primitives.StreamerMessage(block, list(shards)) async def fetch_shard_or_retry( diff --git a/src/near_lake_framework/utils.py b/src/near_lake_framework/utils.py new file mode 100644 index 0000000..7ef34b7 --- /dev/null +++ b/src/near_lake_framework/utils.py @@ -0,0 +1,37 @@ +import json + +import requests + +from near_lake_framework import Network, near_primitives + + +def fetch_latest_block( + network: Network = Network.MAINNET, timeout: int = 10 +) -> near_primitives.BlockHeight: + """ + Define the RPC endpoint for the NEAR network + """ + url = f"https://rpc.{network}.near.org" + + # Define the payload for fetching the latest block + payload = json.dumps( + { + "jsonrpc": "2.0", + "id": "dontcare", + "method": "block", + "params": {"finality": "final"}, + } + ) + + # Define the headers for the HTTP request + headers = {"Content-Type": "application/json"} + + # Send the HTTP request to the NEAR RPC endpoint + response = requests.request( + "POST", url, headers=headers, data=payload, timeout=timeout + ) + + # Parse the JSON response to get the latest final block height + latest_final_block: int = response.json()["result"]["header"]["height"] + + return latest_final_block