Skip to content

Commit

Permalink
Adds caching
Browse files Browse the repository at this point in the history
To alleviate the number of requests we send to ERDDAP, I've added a
caching layer around outgoing HTTP requests for JSON and CSV documents.
I will be improving this in the future.
  • Loading branch information
lukecampbell committed Dec 21, 2022
1 parent efaf741 commit a3fe46d
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 33 deletions.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies:
- pandas
- erddapy
- panel
- appdirs
- intake
- intake-xarray>=0.6.1
- pip
Expand Down
101 changes: 101 additions & 0 deletions intake_erddap/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Caching support."""
import gzip
import json
import pandas as pd
import time
from typing import Optional, Union, Any, Type
import appdirs
import hashlib
import requests
from pathlib import Path


class CacheStore:
"""A caching mechanism to store HTTP responses in a local cache."""

def __init__(self, cache_dir: Optional[Path] = None, http_client: Optional[Type] = None, cache_period: Optional[Union[int, float]] = None):
self.cache_dir: Path = cache_dir or Path(appdirs.user_cache_dir("intake-erddap", "axds"))
self.http_client = http_client or requests
self.cache_period = cache_period or 500.

if not self.cache_dir.exists():
self.cache_dir.mkdir(parents=True, exist_ok=True)

@staticmethod
def hash_url(url: str) -> str:
"""Returns the hash of the URL"""
return hashlib.sha256(url.encode("utf-8")).hexdigest()

def cache_file(self, url: str) -> Path:
"""Return the path to the cache file."""
checksum = self.hash_url(url)
filename = self.cache_dir / f'{checksum}.gz'
return filename

def cache_response(self, url: str, *args, **kwargs):
"""Write the content of the HTTP response to a gzipped cached file."""
filename = self.cache_file(url)
with gzip.open(filename, "wb") as f:
resp = self.http_client.get(url, *args, **kwargs)
resp.raise_for_status()
f.write(resp.content)

def read_csv(self, url: str, pandas_kwargs: Optional[dict] = None, http_kwargs: Optional[dict] = None) -> pd.DataFrame:
"""Return a pandas data frame read from source or cache."""
pandas_kwargs = pandas_kwargs or {}
http_kwargs = http_kwargs or {}
pth = self.cache_file(url)
now = time.time()
allowed_mtime = now - self.cache_period
if pth.exists():
if pth.stat().st_mtime < allowed_mtime:
print("Cache MISS")
self.cache_response(url, **http_kwargs)
else:
print("Cache HIT")
else:
print("Cache MISS")
self.cache_response(url, **http_kwargs)

with gzip.open(pth) as f:
return pd.read_csv(f, **pandas_kwargs)

def read_json(self, url: str, http_kwargs: Optional[dict] = None) -> Any:
http_kwargs = http_kwargs or {}
pth = self.cache_file(url)
now = time.time()
allowed_mtime = now - self.cache_period
if pth.exists():
if pth.stat().st_mtime < allowed_mtime:
print("Cache MISS")
self.cache_response(url, **http_kwargs)
else:
print("Cache HIT")
else:
print("Cache MISS")
self.cache_response(url, **http_kwargs)

with gzip.open(pth) as f:
return json.load(f)

def clear_cache(self, mtime: Optional[Union[int, float]] = None):
"""Removes all cached files."""
if self.cache_dir.exists():
if mtime is None:
self._clear_cache()
else:
self._clear_cache_mtime(mtime)

def _clear_cache(self):
"""Removes all cached files."""
for cache_file in self.cache_dir.glob('*.gz'):
cache_file.unlink()

def _clear_cache_mtime(self, age: Union[int, float]):
"""Removes cached files older than ``age`` seconds."""
current_time = time.time()
cutoff = current_time - age
for cache_file in self.cache_dir.glob('*.gz'):
mtime = cache_file.stat().st_mtime
if mtime <= cutoff:
cache_file.unlink()
12 changes: 9 additions & 3 deletions intake_erddap/erddap_cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from intake.catalog.base import Catalog
from intake.catalog.local import LocalCatalogEntry

from intake_erddap.cache import CacheStore

from . import utils
from .erddap import GridDAPSource, TableDAPSource
from .utils import match_key_to_category
Expand Down Expand Up @@ -141,6 +143,7 @@ def __init__(
self._query_type = query_type
self.server = server
self.search_url = None
self.cache_store = CacheStore()

if kwargs_search is not None:
checks = [
Expand Down Expand Up @@ -214,19 +217,22 @@ def __init__(
category, key = category_search
# Currently just take first match, but there could be more than one.
self.kwargs_search[category] = match_key_to_category(
self.server, key, category
self.server, key, category, cache_store=self.cache_store
)[0]

metadata = metadata or {}
metadata["kwargs_search"] = self.kwargs_search

# Clear the cache of old stale data on initialization
self.cache_store.clear_cache(self.cache_store.cache_period)

super(ERDDAPCatalog, self).__init__(metadata=metadata, **kwargs)

def _load_df(self) -> pd.DataFrame:
frames = []
for url in self.get_search_urls():
try:
df = pd.read_csv(url)
df = self.cache_store.read_csv(url)
except HTTPError as e:
if e.code == 404:
log.warning(f"search {url} returned HTTP 404")
Expand All @@ -253,7 +259,7 @@ def _load_metadata(self) -> Mapping[str, dict]:
"""Returns all of the dataset metadata available from allDatasets API."""
if self._dataset_metadata is None:
self._dataset_metadata = utils.get_erddap_metadata(
self.server, self.kwargs_search
self.server, self.kwargs_search, cache_store=self.cache_store
)
return self._dataset_metadata

Expand Down
28 changes: 21 additions & 7 deletions intake_erddap/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

from pandas import DataFrame

from intake_erddap.cache import CacheStore


log = getLogger("intake-erddap")

Expand All @@ -29,6 +31,7 @@ def get_project_version() -> str:
def return_category_options(
server: str,
category: str = "standard_name",
cache_store: Optional[CacheStore] = None,
) -> DataFrame:
"""Find category options for ERDDAP server.
Expand All @@ -39,6 +42,9 @@ def return_category_options(
category : str, optional
ERDDAP category for filtering results. Default is "standard_name" but another good option is
"variableName".
cache_store : CacheStore
The cache store to use for caching responses. If one is provided it will
be used instead of making the requests directly.
Returns
-------
Expand All @@ -47,18 +53,18 @@ def return_category_options(
the link for search results for searching for a given category value.
"""

df = pd.read_csv(
f"{server}/categorize/{category}/index.csv?page=1&itemsPerPage=100000"
)

return df
url = f"{server}/categorize/{category}/index.csv?page=1&itemsPerPage=100000"
if cache_store is not None:
return cache_store.read_csv(url)
return pd.read_csv(url)


def match_key_to_category(
server: str,
key: str,
category: str = "standard_name",
criteria: Optional[dict] = None,
cache_store: Optional[CacheStore] = None,
) -> list:
"""Find category values for server and return match to key.
Expand All @@ -75,14 +81,17 @@ def match_key_to_category(
criteria : dict, optional
Criteria to use to map from variable to attributes describing the variable. If user has
defined custom_criteria, this will be used by default.
cache_store : CacheStore
The cache store to use for caching responses. If one is provided it will
be used instead of making the requests directly.
Returns
-------
list
Values from category results that match key, according to the custom criteria.
"""

df = return_category_options(server, category)
df = return_category_options(server, category, cache_store=cache_store)
matching_category_value = cfp.match_criteria_key(
df["Category"].values, key, criteria=criteria
)
Expand All @@ -98,7 +107,10 @@ def as_a_list(value: Any) -> list:


def get_erddap_metadata(
server: str, constraints: Mapping[str, Any], http_client: Any = None
server: str,
constraints: Mapping[str, Any],
http_client: Any = None,
cache_store: Optional[CacheStore] = None,
) -> Mapping[str, dict]:
"""Return a map for all the dataset metadata."""
if http_client is None:
Expand All @@ -123,6 +135,8 @@ def get_erddap_metadata(
url = f"{server}/tabledap/allDatasets.json?" + quote_plus(",".join(fields))
if constraints_query:
url += "&" + urlencode(constraints_query)
if cache_store: # pragma: no cover
return parse_erddap_tabledap_response(cache_store.read_json(url))
resp = http_client.get(url)
resp.raise_for_status()
return parse_erddap_tabledap_response(resp.json())
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
appdirs
erddapy
intake
intake-xarray
Expand Down
140 changes: 140 additions & 0 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#!/usr/bin/env pytest
"""Unit tests for caching support."""
import time
import gzip
from pathlib import Path
import hashlib
from unittest import mock
from intake_erddap import cache
import pytest
import tempfile
import shutil
import os


@pytest.fixture
def tempdir():
tempdir = tempfile.mkdtemp()
yield tempdir
if os.path.exists(tempdir):
shutil.rmtree(tempdir)


@mock.patch('appdirs.user_cache_dir')
def test_cache_file(user_cache_dir_mock, tempdir):
user_cache_dir_mock.return_value = tempdir
url = "http://kevinbacon.invalid/erddap/advanced?blahbah"
store = cache.CacheStore()
filepath = store.cache_file(url)
assert filepath.parent == Path(tempdir)
sha = cache.CacheStore.hash_url(url)
assert filepath.name == f'{sha}.gz'


@mock.patch('requests.get')
@mock.patch('appdirs.user_cache_dir')
def test_cache_csv(user_cache_dir_mock, http_get_mock, tempdir):
user_cache_dir_mock.return_value = tempdir
resp = mock.Mock()
resp.content = b'blahblah'
http_get_mock.return_value = resp
url = "http://kevinbacon.invalid/erddap/advanced?blahbah"
store = cache.CacheStore()
store.cache_response(url)
sha = store.hash_url(url)
target = (Path(tempdir) / f'{sha}.gz')
assert target.exists()
assert http_get_mock.called_with(url)
with gzip.open(target, "rt", encoding="utf-8") as f:
buf = f.read()
assert buf == "blahblah"


@mock.patch('requests.get')
@mock.patch('appdirs.user_cache_dir')
def test_clearing_cache(user_cache_dir_mock, http_get_mock, tempdir):
user_cache_dir_mock.return_value = tempdir
resp = mock.Mock()
resp.content = b'blahblah'
http_get_mock.return_value = resp
url = "http://kevinbacon.invalid/erddap/advanced?blahbah"
store = cache.CacheStore()
store.cache_response(url)
sha = store.hash_url(url)
target = (Path(tempdir) / f'{sha}.gz')

store.clear_cache()
assert not target.exists()
store.cache_response(url)
assert target.exists()

# Clear cached files older than 100 s. The file we just created is brand new so should remain.
store.clear_cache(100)
assert target.exists()

# Now change the mtime of the file to be 500 s old
now = time.time()
os.utime(target, (now - 500, now - 500))
store.clear_cache(100)
assert not target.exists()


@mock.patch('appdirs.user_cache_dir')
def test_cache_no_dir(user_cache_dir_mock, tempdir):
"""Tests that the cache store will create the cache dir if it doesn't exist."""
user_cache_dir_mock.return_value = tempdir
tempdir = Path(tempdir)
tempdir.rmdir()
assert not tempdir.exists()
cache.CacheStore()
assert tempdir.exists()


@mock.patch('requests.get')
@mock.patch('appdirs.user_cache_dir')
def test_cache_read_csv(user_cache_dir_mock, http_get_mock, tempdir):
user_cache_dir_mock.return_value = tempdir
resp = mock.Mock()
http_get_mock.return_value = resp
resp.content = b"col_a,col_b\n1,blue\n2,red\n"
store = cache.CacheStore()
url = "http://blah.invalid/erddap/search?q=bacon+egg+and+cheese"
df = store.read_csv(url)
assert len(df) == 2
filepath = store.cache_file(url)
with gzip.open(filepath, "wb") as f:
f.write(b"col_a,col_b\n3,green\n4,yellow\n")
df = store.read_csv(url)
assert df['col_a'].tolist() == [3, 4]
assert df['col_b'].tolist() == ["green", "yellow"]

# Force a cache miss
now = time.time()
os.utime(filepath, (now - 1000, now - 1000))
df = store.read_csv(url)
assert df['col_a'].tolist() == [1, 2]
assert df['col_b'].tolist() == ["blue", "red"]


@mock.patch('requests.get')
@mock.patch('appdirs.user_cache_dir')
def test_cache_read_json(user_cache_dir_mock, http_get_mock, tempdir):
user_cache_dir_mock.return_value = tempdir
resp = mock.Mock()
http_get_mock.return_value = resp
resp.content = b'{"key":"value", "example": "blah"}'
store = cache.CacheStore()
url = "http://blah.invalid/erddap/search?q=bacon+egg+and+cheese"
data = store.read_json(url)
assert data == {'key': 'value', 'example': 'blah'}
filepath = store.cache_file(url)
with gzip.open(filepath, "wb") as f:
f.write(b'{"different": "is different"}')
data = store.read_json(url)
assert data["different"] == "is different"

# Force a cache miss
now = time.time()
os.utime(filepath, (now - 1000, now - 1000))
data = store.read_json(url)
assert data == {'key': 'value', 'example': 'blah'}
Loading

0 comments on commit a3fe46d

Please sign in to comment.