-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: custom subgrounds client for polars #43
Closed
Closed
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
ad9d683
feat: custom subgrounds client for polars
Evan-Kim2028 cc69f88
fix: polars_utils import path, update test
Evan-Kim2028 daed16c
fix: adjust naming + TODOs
0xMochan a0dfbcb
feat: polars `query_df()`
Evan-Kim2028 8f69ceb
feat: test `query_df()`
Evan-Kim2028 bd221af
feat: remove unused dependencies
Evan-Kim2028 1aa273a
feat: refactor names, comments, and style
Evan-Kim2028 5551dbb
feat: add functional test for polars utils
Evan-Kim2028 2ed42f5
fix: poetry lock
Evan-Kim2028 5e32880
chore: update query_df() function style
Evan-Kim2028 67a0bdb
feat: add download param to `query_df()`
Evan-Kim2028 ee1740d
chore: refactor, cleanup comments
Evan-Kim2028 dd7bd38
feat: convert float to int
Evan-Kim2028 ee3a5b0
feat: test for int overflow
Evan-Kim2028 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,3 +15,4 @@ subgrounds.egg-info/ | |
|
||
# apple | ||
.DS_Store | ||
.venv*/ | ||
Binary file not shown.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .client import PolarsSubgrounds | ||
|
||
__all__ = ["PolarsSubgrounds"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
import os | ||
import polars as pl | ||
import warnings | ||
from functools import cached_property | ||
from json import JSONDecodeError | ||
from typing import Any, Type | ||
|
||
import httpx | ||
from pipe import map, traverse | ||
|
||
from subgrounds.client import SubgroundsBase | ||
from subgrounds.contrib.polars import utils | ||
from subgrounds.errors import GraphQLError, ServerError | ||
from subgrounds.pagination import LegacyStrategy, PaginationStrategy | ||
from subgrounds.query import DataRequest, DataResponse, DocumentResponse | ||
from subgrounds.subgraph import FieldPath, Subgraph | ||
from subgrounds.utils import default_header | ||
from subgrounds.contrib.polars.utils import force_numeric | ||
|
||
HTTP2_SUPPORT = True | ||
|
||
|
||
class PolarsSubgrounds(SubgroundsBase): | ||
"""TODO: Write comment""" | ||
|
||
@cached_property | ||
def _client(self): | ||
"""Cached client""" | ||
|
||
return httpx.Client(http2=HTTP2_SUPPORT, timeout=self.timeout) | ||
|
||
def load( | ||
self, | ||
url: str, | ||
save_schema: bool = False, | ||
cache_dir: str | None = None, | ||
is_subgraph: bool = True, | ||
) -> Subgraph: | ||
if cache_dir is not None: | ||
warnings.warn("This will be depreciated", DeprecationWarning) | ||
|
||
try: | ||
loader = self._load(url, save_schema, is_subgraph) | ||
url, query = next(loader) # if this fails, schema is loaded from cache | ||
data = self._fetch(url, {"query": query}) | ||
loader.send(data) | ||
|
||
except StopIteration as e: | ||
return e.value | ||
|
||
assert False | ||
|
||
def load_subgraph( | ||
self, url: str, save_schema: bool = False, cache_dir: str | None = None | ||
) -> Subgraph: | ||
"""Performs introspection on the provided GraphQL API ``url`` to get the | ||
schema, stores the schema if ``save_schema`` is ``True`` and returns a | ||
generated class representing the subgraph with all its entities. | ||
|
||
Args: | ||
url The url of the API. | ||
save_schema: Flag indicating whether or not the schema should be cached to | ||
disk. | ||
|
||
Returns: | ||
Subgraph: A generated class representing the subgraph and its entities | ||
""" | ||
|
||
return self.load(url, save_schema, cache_dir, True) | ||
|
||
def _fetch(self, url: str, blob: dict[str, Any]) -> dict[str, Any]: | ||
resp = self._client.post( | ||
url, json=blob, headers=default_header(url) | self.headers | ||
) | ||
resp.raise_for_status() | ||
|
||
try: | ||
raw_data = resp.json() | ||
|
||
except JSONDecodeError: | ||
raise ServerError( | ||
f"Server ({url}) did not respond with proper JSON" | ||
f"\nDid you query a proper GraphQL endpoint?" | ||
f"\n\n{resp.content}" | ||
) | ||
|
||
if (data := raw_data.get("data")) is None: | ||
raise GraphQLError(raw_data.get("errors", "Unknown Error(s) Found")) | ||
|
||
return data | ||
|
||
def execute( | ||
self, | ||
req: DataRequest, | ||
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy, | ||
) -> DataResponse: | ||
"""Executes a :class:`DataRequest` and returns a :class:`DataResponse`. | ||
|
||
Args: | ||
req: The :class:`DataRequest` object to be executed. | ||
pagination_strategy: A Class implementing the :class:`PaginationStrategy` | ||
``Protocol``. If ``None``, then automatic pagination is disabled. | ||
Defaults to :class:`LegacyStrategy`. | ||
|
||
Returns: | ||
A :class:`DataResponse` object representing the response | ||
""" | ||
|
||
try: | ||
executor = self._execute(req, pagination_strategy) | ||
|
||
doc = next(executor) | ||
while True: | ||
data = self._fetch( | ||
doc.url, {"query": doc.graphql, "variables": doc.variables} | ||
) | ||
doc = executor.send(DocumentResponse(url=doc.url, data=data)) | ||
|
||
except StopIteration as e: | ||
return e.value | ||
|
||
def query_json( | ||
self, | ||
fpaths: FieldPath | list[FieldPath], | ||
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy, | ||
) -> list[dict[str, Any]]: | ||
"""Equivalent to | ||
``Subgrounds.execute(Subgrounds.mk_request(fpaths), pagination_strategy)``. | ||
|
||
Args: | ||
fpaths: One or more :class:`FieldPath` objects | ||
that should be included in the request. | ||
pagination_strategy: A Class implementing the :class:`PaginationStrategy` | ||
``Protocol``. If ``None``, then automatic pagination is disabled. | ||
Defaults to :class:`LegacyStrategy`. | ||
|
||
Returns: | ||
The reponse data | ||
""" | ||
|
||
fpaths = list([fpaths] | traverse | map(FieldPath._auto_select) | traverse) | ||
req = self.mk_request(fpaths) | ||
data = self.execute(req, pagination_strategy) | ||
return [doc.data for doc in data.responses] | ||
|
||
def query_df( | ||
self, | ||
fpaths: FieldPath | list[FieldPath], | ||
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy, | ||
parquet_name: str = None, | ||
) -> pl.DataFrame: | ||
""" | ||
Queries and converts raw GraphQL data to a Polars DataFrame. | ||
|
||
Args: | ||
fpaths (FieldPath or list[FieldPath]): One or more FieldPath objects that | ||
should be included in the request. | ||
pagination_strategy (Type[PaginationStrategy] or None, optional): | ||
A class implementing the PaginationStrategy Protocol. If None, then automatic | ||
pagination is disabled. Defaults to LegacyStrategy. | ||
parquet_name (str, optional): The name of the parquet file to write to. | ||
Returns: | ||
pl.DataFrame: A Polars DataFrame containing the queried data. | ||
""" | ||
|
||
# Query raw GraphQL data | ||
fpaths = list([fpaths] | traverse | map(FieldPath._auto_select) | traverse) | ||
graphql_data = self.query_json(fpaths, pagination_strategy=pagination_strategy) | ||
|
||
# Get the first key of the first JSON object. This is the key that contains the data. | ||
json_data_key = list(graphql_data[0].keys())[0] | ||
numeric_data = force_numeric(graphql_data[0][json_data_key]) | ||
|
||
# Convert the JSON data to a Polars DataFrame | ||
# graphql_df = pl.from_dicts( | ||
# graphql_data[0][json_data_key], infer_schema_length=None | ||
# ) | ||
|
||
graphql_df = pl.from_dicts(numeric_data, infer_schema_length=None) | ||
|
||
# Apply the formatting to the Polars DataFrame - can I apply this pre-emptively? | ||
graphql_df = utils.format_dictionary_columns(graphql_df) | ||
graphql_df = utils.format_array_columns(graphql_df) | ||
|
||
match parquet_name: | ||
case None: | ||
pass | ||
case _: | ||
# check if folder exists | ||
os.makedirs("data/", exist_ok=True) | ||
# write to parquet | ||
graphql_df.write_parquet(f"data/{parquet_name}.parquet") | ||
|
||
return graphql_df |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from subgrounds.contrib.polars.client import PolarsSubgrounds | ||
|
||
|
||
sg = PolarsSubgrounds() | ||
|
||
subgraph = sg.load_subgraph( | ||
"https://api.thegraph.com/subgraphs/name/messari/curve-finance-ethereum" | ||
) | ||
|
||
|
||
# Partial FieldPath selecting the top 4 most traded pools on Curve | ||
curve_swaps = subgraph.Query.swaps( | ||
orderBy=subgraph.Swap.timestamp, | ||
orderDirection="desc", | ||
first=500, | ||
) | ||
|
||
df = sg.query_df( | ||
[ | ||
curve_swaps.timestamp, | ||
curve_swaps.blockNumber, | ||
curve_swaps.pool._select("id"), | ||
curve_swaps.hash, | ||
curve_swaps.tokenIn._select("id"), | ||
curve_swaps.tokenOut._select("id"), | ||
], | ||
parquet_name="curve_swaps", | ||
) | ||
|
||
print(df) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
from subgrounds.contrib.polars.utils import ( | ||
force_numeric, | ||
) | ||
|
||
overflow_data = [ | ||
{ | ||
"x24e88aa9a8dbf48e": [ | ||
{ | ||
"blockNumber": 18381720, | ||
"amountIn": 1008310, | ||
"amountOut": 10082717795683768903291, | ||
"id": "swap-0xb67a23794697275f14c0b4e3d6d73960d13a6df4e0e1d3cd2269a9bbffe59d3e-49", | ||
"timestamp": 1697686283, | ||
}, | ||
{ | ||
"blockNumber": 18381710, | ||
"amountIn": 2402410386963680919, | ||
"amountOut": 7683170744157548213, | ||
"id": "swap-0x37fede110a1267df8833ad2b0db6f85d663e6b30cd1b85757314c67e6235b5e6-69", | ||
"timestamp": 1697686163, | ||
}, | ||
] | ||
} | ||
] | ||
|
||
output = force_numeric(overflow_data[0]["x24e88aa9a8dbf48e"]) | ||
|
||
print(output) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need to do this, there should already be a
.gitignore
in your.venv
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there was not so I had to add it in.
Do you also mean that the
.gitignore
I have should not be getting pushed?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
.gitignore
is auto-generating if the.venv
is created via python's built invenv
module (which is also whatpoetry
uses for generating environments).