Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: custom subgrounds client for polars #43

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ subgrounds.egg-info/

# apple
.DS_Store
.venv*/
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need to do this, there should already be a .gitignore in your .venv.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was not so I had to add it in.

Do you also mean that the .gitignore I have should not be getting pushed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .gitignore is auto-generating if the .venv is created via python's built in venv module (which is also what poetry uses for generating environments).

Binary file added data/curve_swaps.parquet
Binary file not shown.
80 changes: 79 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ dash = { version = "^2.3.1", optional = true }
plotly = { version = "^5.14.1", optional = true }
httpx = { extras = ["http2"], version = "^0.24.1" }
pytest-asyncio = "^0.21.0"
polars = { version = ">=0.19.3", optional = true }
pyarrow = { version = "^13.0.0", optional = true }

[tool.poetry.extras]
dash = ["dash"]
plotly = ["plotly"]
polars = ["polars", "pyarrow"]
all = ["dash", "plotly"]

# https://python-poetry.org/docs/managing-dependencies/#dependency-groups
Expand Down
3 changes: 3 additions & 0 deletions subgrounds/contrib/polars/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .client import PolarsSubgrounds

__all__ = ["PolarsSubgrounds"]
194 changes: 194 additions & 0 deletions subgrounds/contrib/polars/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import os
import polars as pl
import warnings
from functools import cached_property
from json import JSONDecodeError
from typing import Any, Type

import httpx
from pipe import map, traverse

from subgrounds.client import SubgroundsBase
from subgrounds.contrib.polars import utils
from subgrounds.errors import GraphQLError, ServerError
from subgrounds.pagination import LegacyStrategy, PaginationStrategy
from subgrounds.query import DataRequest, DataResponse, DocumentResponse
from subgrounds.subgraph import FieldPath, Subgraph
from subgrounds.utils import default_header
from subgrounds.contrib.polars.utils import force_numeric

HTTP2_SUPPORT = True


class PolarsSubgrounds(SubgroundsBase):
"""TODO: Write comment"""

@cached_property
def _client(self):
"""Cached client"""

return httpx.Client(http2=HTTP2_SUPPORT, timeout=self.timeout)

def load(
self,
url: str,
save_schema: bool = False,
cache_dir: str | None = None,
is_subgraph: bool = True,
) -> Subgraph:
if cache_dir is not None:
warnings.warn("This will be depreciated", DeprecationWarning)

try:
loader = self._load(url, save_schema, is_subgraph)
url, query = next(loader) # if this fails, schema is loaded from cache
data = self._fetch(url, {"query": query})
loader.send(data)

except StopIteration as e:
return e.value

assert False

def load_subgraph(
self, url: str, save_schema: bool = False, cache_dir: str | None = None
) -> Subgraph:
"""Performs introspection on the provided GraphQL API ``url`` to get the
schema, stores the schema if ``save_schema`` is ``True`` and returns a
generated class representing the subgraph with all its entities.

Args:
url The url of the API.
save_schema: Flag indicating whether or not the schema should be cached to
disk.

Returns:
Subgraph: A generated class representing the subgraph and its entities
"""

return self.load(url, save_schema, cache_dir, True)

def _fetch(self, url: str, blob: dict[str, Any]) -> dict[str, Any]:
resp = self._client.post(
url, json=blob, headers=default_header(url) | self.headers
)
resp.raise_for_status()

try:
raw_data = resp.json()

except JSONDecodeError:
raise ServerError(
f"Server ({url}) did not respond with proper JSON"
f"\nDid you query a proper GraphQL endpoint?"
f"\n\n{resp.content}"
)

if (data := raw_data.get("data")) is None:
raise GraphQLError(raw_data.get("errors", "Unknown Error(s) Found"))

return data

def execute(
self,
req: DataRequest,
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy,
) -> DataResponse:
"""Executes a :class:`DataRequest` and returns a :class:`DataResponse`.

Args:
req: The :class:`DataRequest` object to be executed.
pagination_strategy: A Class implementing the :class:`PaginationStrategy`
``Protocol``. If ``None``, then automatic pagination is disabled.
Defaults to :class:`LegacyStrategy`.

Returns:
A :class:`DataResponse` object representing the response
"""

try:
executor = self._execute(req, pagination_strategy)

doc = next(executor)
while True:
data = self._fetch(
doc.url, {"query": doc.graphql, "variables": doc.variables}
)
doc = executor.send(DocumentResponse(url=doc.url, data=data))

except StopIteration as e:
return e.value

def query_json(
self,
fpaths: FieldPath | list[FieldPath],
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy,
) -> list[dict[str, Any]]:
"""Equivalent to
``Subgrounds.execute(Subgrounds.mk_request(fpaths), pagination_strategy)``.

Args:
fpaths: One or more :class:`FieldPath` objects
that should be included in the request.
pagination_strategy: A Class implementing the :class:`PaginationStrategy`
``Protocol``. If ``None``, then automatic pagination is disabled.
Defaults to :class:`LegacyStrategy`.

Returns:
The reponse data
"""

fpaths = list([fpaths] | traverse | map(FieldPath._auto_select) | traverse)
req = self.mk_request(fpaths)
data = self.execute(req, pagination_strategy)
return [doc.data for doc in data.responses]

def query_df(
self,
fpaths: FieldPath | list[FieldPath],
pagination_strategy: Type[PaginationStrategy] | None = LegacyStrategy,
parquet_name: str = None,
) -> pl.DataFrame:
"""
Queries and converts raw GraphQL data to a Polars DataFrame.

Args:
fpaths (FieldPath or list[FieldPath]): One or more FieldPath objects that
should be included in the request.
pagination_strategy (Type[PaginationStrategy] or None, optional):
A class implementing the PaginationStrategy Protocol. If None, then automatic
pagination is disabled. Defaults to LegacyStrategy.
parquet_name (str, optional): The name of the parquet file to write to.
Returns:
pl.DataFrame: A Polars DataFrame containing the queried data.
"""

# Query raw GraphQL data
fpaths = list([fpaths] | traverse | map(FieldPath._auto_select) | traverse)
graphql_data = self.query_json(fpaths, pagination_strategy=pagination_strategy)

# Get the first key of the first JSON object. This is the key that contains the data.
json_data_key = list(graphql_data[0].keys())[0]
numeric_data = force_numeric(graphql_data[0][json_data_key])

# Convert the JSON data to a Polars DataFrame
# graphql_df = pl.from_dicts(
# graphql_data[0][json_data_key], infer_schema_length=None
# )

graphql_df = pl.from_dicts(numeric_data, infer_schema_length=None)

# Apply the formatting to the Polars DataFrame - can I apply this pre-emptively?
graphql_df = utils.format_dictionary_columns(graphql_df)
graphql_df = utils.format_array_columns(graphql_df)

match parquet_name:
case None:
pass
case _:
# check if folder exists
os.makedirs("data/", exist_ok=True)
# write to parquet
graphql_df.write_parquet(f"data/{parquet_name}.parquet")

return graphql_df
30 changes: 30 additions & 0 deletions subgrounds/contrib/polars/test_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from subgrounds.contrib.polars.client import PolarsSubgrounds


sg = PolarsSubgrounds()

subgraph = sg.load_subgraph(
"https://api.thegraph.com/subgraphs/name/messari/curve-finance-ethereum"
)


# Partial FieldPath selecting the top 4 most traded pools on Curve
curve_swaps = subgraph.Query.swaps(
orderBy=subgraph.Swap.timestamp,
orderDirection="desc",
first=500,
)

df = sg.query_df(
[
curve_swaps.timestamp,
curve_swaps.blockNumber,
curve_swaps.pool._select("id"),
curve_swaps.hash,
curve_swaps.tokenIn._select("id"),
curve_swaps.tokenOut._select("id"),
],
parquet_name="curve_swaps",
)

print(df)
28 changes: 28 additions & 0 deletions subgrounds/contrib/polars/test_force_numeric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from subgrounds.contrib.polars.utils import (
force_numeric,
)

overflow_data = [
{
"x24e88aa9a8dbf48e": [
{
"blockNumber": 18381720,
"amountIn": 1008310,
"amountOut": 10082717795683768903291,
"id": "swap-0xb67a23794697275f14c0b4e3d6d73960d13a6df4e0e1d3cd2269a9bbffe59d3e-49",
"timestamp": 1697686283,
},
{
"blockNumber": 18381710,
"amountIn": 2402410386963680919,
"amountOut": 7683170744157548213,
"id": "swap-0x37fede110a1267df8833ad2b0db6f85d663e6b30cd1b85757314c67e6235b5e6-69",
"timestamp": 1697686163,
},
]
}
]

output = force_numeric(overflow_data[0]["x24e88aa9a8dbf48e"])

print(output)
Loading