Skip to content

Commit

Permalink
Add Logger & Example Run Script (#13)
Browse files Browse the repository at this point in the history
* add logging

* update readme with latest example script

* add reqeusts type stubs
  • Loading branch information
bh2smith authored May 29, 2024
1 parent 1112826 commit 985c733
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 37 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ build/
__pycache__/
.vscode
*.egg-info
.mypy_cache
.mypy_cache
.venv/
27 changes: 21 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,25 @@ Greetings from the Data Platform Team! We are happy and proud to announce an MVP

```python3
import asyncio
import logging
import os

from near_lake_framework import LakeConfig, streamer, Network
from near_lake_framework.utils import fetch_latest_block

# Suppress warning logs from specific dependencies
logging.getLogger("near_lake_framework").setLevel(logging.INFO)


async def main():
network = Network.TESTNET
latest_final_block = fetch_latest_block(network=network)
config = LakeConfig(
network=Network.MAINNET,
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
start_block_height=69130938,
network,
start_block_height=latest_final_block,
# These fields must be set!
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)

stream_handle, streamer_messages_queue = streamer(config)
Expand All @@ -41,8 +49,15 @@ async def main():
)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
```

Try it yourself as follows
```shell
pip3 install -r requirements.txt
python3 example/run.py
```

## How to use
Expand Down
Empty file added example/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions example/run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import asyncio
import logging
import os

from near_lake_framework import LakeConfig, streamer, Network
from near_lake_framework.utils import fetch_latest_block

# Suppress warning logs from specific dependencies
logging.getLogger("near_lake_framework").setLevel(logging.INFO)


async def main():
network = Network.TESTNET
latest_final_block = fetch_latest_block(network=network)
config = LakeConfig(
network,
start_block_height=latest_final_block,
# These fields must be set!
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_key=os.environ["AWS_SECRET_ACCESS_KEY"],
)

stream_handle, streamer_messages_queue = streamer(config)
while True:
streamer_message = await streamer_messages_queue.get()
print(
f"Received Block #{streamer_message.block.header.height} from Lake Framework"
)


if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())
15 changes: 10 additions & 5 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
asyncio==3.4.3
dataclasses==0.6
dataclasses-json==0.5.7
botocore==1.24.21
aiobotocore==2.3.0
asyncio>=3.4.3
dataclasses>=0.6
dataclasses-json>=0.6.6
botocore>=1.34.70
aiobotocore>=2.13.0
requests>=2.32.2

types-botocore>=1.0.2
types-aiobotocore>=2.13.0
types-requests>=2.32.0.20240523
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"dataclasses>=0.6",
"dataclasses-json>=0.5.7",
"aiobotocore>=2.3.0",
"requests>=2.32.2",
],
package_data={"near_lake_framework": ["py.typed"]},
)
20 changes: 15 additions & 5 deletions src/near_lake_framework/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@
import asyncio
import itertools
from enum import Enum

import logging
from typing import Optional

from aiobotocore.session import get_session # type: ignore
from aiobotocore.session import get_session

from near_lake_framework import near_primitives
from near_lake_framework import s3_fetchers


from dataclasses import dataclass

# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s %(message)s"
)
logger = logging.getLogger(__name__)


class Network(Enum):
MAINNET = "mainnet"
Expand Down Expand Up @@ -81,12 +89,14 @@ async def start(config: LakeConfig, streamer_messages_queue: asyncio.Queue):
)

if not block_heights_prefixes:
print("No new blocks on S3, retry in 2s...")
logger.info("No new blocks on S3, retry in 2s...")

await asyncio.sleep(2)
continue

print("Received {} blocks from S3".format(len(block_heights_prefixes)))
logger.info(
"Received {} blocks from S3".format(len(block_heights_prefixes))
)

pending_block_heights = iter(block_heights_prefixes)
streamer_messages_futures = []
Expand All @@ -109,8 +119,8 @@ async def start(config: LakeConfig, streamer_messages_queue: asyncio.Queue):
and last_processed_block_hash
!= streamer_message.block.header.prev_hash
):
print(
"`prev_hash` does not match, refetching the data from S3 in 200ms",
logger.warning(
"`prev_hash` does not match, re-fetching the data from S3 in 200ms",
last_processed_block_hash,
streamer_message.block.header.prev_hash,
)
Expand Down
34 changes: 17 additions & 17 deletions src/near_lake_framework/near_primitives.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, List, Optional
from typing import Any, Optional

from dataclasses import dataclass, field
from dataclasses_json import DataClassJsonMixin, config, mm
Expand Down Expand Up @@ -36,8 +36,8 @@ class BlockHeader(DataClassJsonMixin):
metadata=config(mm_field=mm.fields.Integer(as_string=True))
)
random_value: CryptoHash
validator_proposals: List[Any]
chunk_mask: List[bool]
validator_proposals: list[Any]
chunk_mask: list[bool]
gas_price: int = field(metadata=config(mm_field=mm.fields.Integer(as_string=True)))
block_ordinal: Optional[int]
rent_paid: int = field(metadata=config(mm_field=mm.fields.Integer(as_string=True)))
Expand All @@ -47,13 +47,13 @@ class BlockHeader(DataClassJsonMixin):
total_supply: int = field(
metadata=config(mm_field=mm.fields.Integer(as_string=True))
)
challenges_result: List[Any]
challenges_result: list[Any]
last_final_block: CryptoHash
last_ds_final_block: CryptoHash
next_bp_hash: CryptoHash
block_merkle_root: CryptoHash
epoch_sync_data_hash: Optional[CryptoHash]
approvals: List[Optional[str]]
approvals: list[Optional[str]]
signature: str
latest_protocol_version: int
height: BlockHeight = field(metadata=config(mm_field=BlockHeightField()))
Expand Down Expand Up @@ -84,15 +84,15 @@ class ChunkHeader(DataClassJsonMixin):
)
outgoing_receipts_root: CryptoHash
tx_root: CryptoHash
validator_proposals: List[Any]
validator_proposals: list[Any]
signature: str


@dataclass
class Block(DataClassJsonMixin):
author: AccountId
header: BlockHeader
chunks: List[ChunkHeader]
chunks: list[ChunkHeader]


@dataclass
Expand All @@ -113,13 +113,13 @@ class CostGasUsed(DataClassJsonMixin):
@dataclass
class ExecutionMetadata(DataClassJsonMixin):
version: int
gas_profile: Optional[List[CostGasUsed]]
gas_profile: Optional[list[CostGasUsed]]


@dataclass
class ExecutionOutcome(DataClassJsonMixin):
logs: List[str]
receipt_ids: List[CryptoHash]
logs: list[str]
receipt_ids: list[CryptoHash]
gas_burnt: int
tokens_burnt: int = field(
metadata=config(mm_field=mm.fields.Integer(as_string=True))
Expand All @@ -131,7 +131,7 @@ class ExecutionOutcome(DataClassJsonMixin):

@dataclass
class ExecutionOutcomeWithId(DataClassJsonMixin):
proof: List[Any]
proof: list[Any]
block_hash: CryptoHash
id: CryptoHash
outcome: ExecutionOutcome
Expand All @@ -149,7 +149,7 @@ class SignedTransaction(DataClassJsonMixin):
public_key: str
nonce: int
receiver_id: AccountId
actions: List[Any]
actions: list[Any]
signature: str
hash: CryptoHash

Expand All @@ -170,19 +170,19 @@ class IndexerTransactionWithOutcome(DataClassJsonMixin):
class IndexerChunk(DataClassJsonMixin):
author: AccountId
header: ChunkHeader
transactions: List[IndexerTransactionWithOutcome]
receipts: List[Receipt]
transactions: list[IndexerTransactionWithOutcome]
receipts: list[Receipt]


@dataclass
class IndexerShard(DataClassJsonMixin):
shard_id: int
chunk: Optional[IndexerChunk]
receipt_execution_outcomes: List[IndexerExecutionOutcomeWithReceipt]
state_changes: List[Any]
receipt_execution_outcomes: list[IndexerExecutionOutcomeWithReceipt]
state_changes: list[Any]


@dataclass
class StreamerMessage(DataClassJsonMixin):
block: Block
shards: List[IndexerShard]
shards: list[IndexerShard]
6 changes: 3 additions & 3 deletions src/near_lake_framework/s3_fetchers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import asyncio
import logging
from typing import List
import traceback
from botocore.exceptions import ClientError

from near_lake_framework import near_primitives
from near_lake_framework.near_primitives import IndexerShard


async def list_blocks(
s3_client,
s3_bucket_name: str,
start_from_block_height: near_primitives.BlockHeight,
number_of_blocks_requested: int,
) -> List[near_primitives.BlockHeight]:
) -> list[near_primitives.BlockHeight]:
response = await s3_client.list_objects_v2(
Bucket=s3_bucket_name,
Delimiter="/",
Expand Down Expand Up @@ -47,7 +47,7 @@ async def fetch_streamer_message(
]
shards = await asyncio.gather(*shards_fetching)

return near_primitives.StreamerMessage(block, shards)
return near_primitives.StreamerMessage(block, list(shards))


async def fetch_shard_or_retry(
Expand Down
37 changes: 37 additions & 0 deletions src/near_lake_framework/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json

import requests

from near_lake_framework import Network, near_primitives


def fetch_latest_block(
network: Network = Network.MAINNET, timeout: int = 10
) -> near_primitives.BlockHeight:
"""
Define the RPC endpoint for the NEAR network
"""
url = f"https://rpc.{network}.near.org"

# Define the payload for fetching the latest block
payload = json.dumps(
{
"jsonrpc": "2.0",
"id": "dontcare",
"method": "block",
"params": {"finality": "final"},
}
)

# Define the headers for the HTTP request
headers = {"Content-Type": "application/json"}

# Send the HTTP request to the NEAR RPC endpoint
response = requests.request(
"POST", url, headers=headers, data=payload, timeout=timeout
)

# Parse the JSON response to get the latest final block height
latest_final_block: int = response.json()["result"]["header"]["height"]

return latest_final_block

0 comments on commit 985c733

Please sign in to comment.