From a3fe46d339c8f218ed73544218d083401377660f Mon Sep 17 00:00:00 2001 From: Luke Campbell Date: Wed, 21 Dec 2022 15:06:51 -0500 Subject: [PATCH] Adds caching To alleviate the number of requests we send to ERDDAP, I've added a caching layer around outgoing HTTP requests for JSON and CSV documents. I will be improving this in the future. --- environment.yml | 1 + intake_erddap/cache.py | 101 ++++++++++++++++++++++++++ intake_erddap/erddap_cat.py | 12 +++- intake_erddap/utils.py | 28 ++++++-- requirements.txt | 1 + tests/test_cache.py | 140 ++++++++++++++++++++++++++++++++++++ tests/test_erddap_cat.py | 46 ++++++------ 7 files changed, 296 insertions(+), 33 deletions(-) create mode 100644 intake_erddap/cache.py create mode 100644 tests/test_cache.py diff --git a/environment.yml b/environment.yml index bd52282..9cce980 100644 --- a/environment.yml +++ b/environment.yml @@ -8,6 +8,7 @@ dependencies: - pandas - erddapy - panel + - appdirs - intake - intake-xarray>=0.6.1 - pip diff --git a/intake_erddap/cache.py b/intake_erddap/cache.py new file mode 100644 index 0000000..564f11a --- /dev/null +++ b/intake_erddap/cache.py @@ -0,0 +1,101 @@ +"""Caching support.""" +import gzip +import json +import pandas as pd +import time +from typing import Optional, Union, Any, Type +import appdirs +import hashlib +import requests +from pathlib import Path + + +class CacheStore: + """A caching mechanism to store HTTP responses in a local cache.""" + + def __init__(self, cache_dir: Optional[Path] = None, http_client: Optional[Type] = None, cache_period: Optional[Union[int, float]] = None): + self.cache_dir: Path = cache_dir or Path(appdirs.user_cache_dir("intake-erddap", "axds")) + self.http_client = http_client or requests + self.cache_period = cache_period or 500. + + if not self.cache_dir.exists(): + self.cache_dir.mkdir(parents=True, exist_ok=True) + + @staticmethod + def hash_url(url: str) -> str: + """Returns the hash of the URL""" + return hashlib.sha256(url.encode("utf-8")).hexdigest() + + def cache_file(self, url: str) -> Path: + """Return the path to the cache file.""" + checksum = self.hash_url(url) + filename = self.cache_dir / f'{checksum}.gz' + return filename + + def cache_response(self, url: str, *args, **kwargs): + """Write the content of the HTTP response to a gzipped cached file.""" + filename = self.cache_file(url) + with gzip.open(filename, "wb") as f: + resp = self.http_client.get(url, *args, **kwargs) + resp.raise_for_status() + f.write(resp.content) + + def read_csv(self, url: str, pandas_kwargs: Optional[dict] = None, http_kwargs: Optional[dict] = None) -> pd.DataFrame: + """Return a pandas data frame read from source or cache.""" + pandas_kwargs = pandas_kwargs or {} + http_kwargs = http_kwargs or {} + pth = self.cache_file(url) + now = time.time() + allowed_mtime = now - self.cache_period + if pth.exists(): + if pth.stat().st_mtime < allowed_mtime: + print("Cache MISS") + self.cache_response(url, **http_kwargs) + else: + print("Cache HIT") + else: + print("Cache MISS") + self.cache_response(url, **http_kwargs) + + with gzip.open(pth) as f: + return pd.read_csv(f, **pandas_kwargs) + + def read_json(self, url: str, http_kwargs: Optional[dict] = None) -> Any: + http_kwargs = http_kwargs or {} + pth = self.cache_file(url) + now = time.time() + allowed_mtime = now - self.cache_period + if pth.exists(): + if pth.stat().st_mtime < allowed_mtime: + print("Cache MISS") + self.cache_response(url, **http_kwargs) + else: + print("Cache HIT") + else: + print("Cache MISS") + self.cache_response(url, **http_kwargs) + + with gzip.open(pth) as f: + return json.load(f) + + def clear_cache(self, mtime: Optional[Union[int, float]] = None): + """Removes all cached files.""" + if self.cache_dir.exists(): + if mtime is None: + self._clear_cache() + else: + self._clear_cache_mtime(mtime) + + def _clear_cache(self): + """Removes all cached files.""" + for cache_file in self.cache_dir.glob('*.gz'): + cache_file.unlink() + + def _clear_cache_mtime(self, age: Union[int, float]): + """Removes cached files older than ``age`` seconds.""" + current_time = time.time() + cutoff = current_time - age + for cache_file in self.cache_dir.glob('*.gz'): + mtime = cache_file.stat().st_mtime + if mtime <= cutoff: + cache_file.unlink() diff --git a/intake_erddap/erddap_cat.py b/intake_erddap/erddap_cat.py index 72e9f03..1e12988 100644 --- a/intake_erddap/erddap_cat.py +++ b/intake_erddap/erddap_cat.py @@ -22,6 +22,8 @@ from intake.catalog.base import Catalog from intake.catalog.local import LocalCatalogEntry +from intake_erddap.cache import CacheStore + from . import utils from .erddap import GridDAPSource, TableDAPSource from .utils import match_key_to_category @@ -141,6 +143,7 @@ def __init__( self._query_type = query_type self.server = server self.search_url = None + self.cache_store = CacheStore() if kwargs_search is not None: checks = [ @@ -214,19 +217,22 @@ def __init__( category, key = category_search # Currently just take first match, but there could be more than one. self.kwargs_search[category] = match_key_to_category( - self.server, key, category + self.server, key, category, cache_store=self.cache_store )[0] metadata = metadata or {} metadata["kwargs_search"] = self.kwargs_search + # Clear the cache of old stale data on initialization + self.cache_store.clear_cache(self.cache_store.cache_period) + super(ERDDAPCatalog, self).__init__(metadata=metadata, **kwargs) def _load_df(self) -> pd.DataFrame: frames = [] for url in self.get_search_urls(): try: - df = pd.read_csv(url) + df = self.cache_store.read_csv(url) except HTTPError as e: if e.code == 404: log.warning(f"search {url} returned HTTP 404") @@ -253,7 +259,7 @@ def _load_metadata(self) -> Mapping[str, dict]: """Returns all of the dataset metadata available from allDatasets API.""" if self._dataset_metadata is None: self._dataset_metadata = utils.get_erddap_metadata( - self.server, self.kwargs_search + self.server, self.kwargs_search, cache_store=self.cache_store ) return self._dataset_metadata diff --git a/intake_erddap/utils.py b/intake_erddap/utils.py index 1d20b44..615dd84 100644 --- a/intake_erddap/utils.py +++ b/intake_erddap/utils.py @@ -12,6 +12,8 @@ from pandas import DataFrame +from intake_erddap.cache import CacheStore + log = getLogger("intake-erddap") @@ -29,6 +31,7 @@ def get_project_version() -> str: def return_category_options( server: str, category: str = "standard_name", + cache_store: Optional[CacheStore] = None, ) -> DataFrame: """Find category options for ERDDAP server. @@ -39,6 +42,9 @@ def return_category_options( category : str, optional ERDDAP category for filtering results. Default is "standard_name" but another good option is "variableName". + cache_store : CacheStore + The cache store to use for caching responses. If one is provided it will + be used instead of making the requests directly. Returns ------- @@ -47,11 +53,10 @@ def return_category_options( the link for search results for searching for a given category value. """ - df = pd.read_csv( - f"{server}/categorize/{category}/index.csv?page=1&itemsPerPage=100000" - ) - - return df + url = f"{server}/categorize/{category}/index.csv?page=1&itemsPerPage=100000" + if cache_store is not None: + return cache_store.read_csv(url) + return pd.read_csv(url) def match_key_to_category( @@ -59,6 +64,7 @@ def match_key_to_category( key: str, category: str = "standard_name", criteria: Optional[dict] = None, + cache_store: Optional[CacheStore] = None, ) -> list: """Find category values for server and return match to key. @@ -75,6 +81,9 @@ def match_key_to_category( criteria : dict, optional Criteria to use to map from variable to attributes describing the variable. If user has defined custom_criteria, this will be used by default. + cache_store : CacheStore + The cache store to use for caching responses. If one is provided it will + be used instead of making the requests directly. Returns ------- @@ -82,7 +91,7 @@ def match_key_to_category( Values from category results that match key, according to the custom criteria. """ - df = return_category_options(server, category) + df = return_category_options(server, category, cache_store=cache_store) matching_category_value = cfp.match_criteria_key( df["Category"].values, key, criteria=criteria ) @@ -98,7 +107,10 @@ def as_a_list(value: Any) -> list: def get_erddap_metadata( - server: str, constraints: Mapping[str, Any], http_client: Any = None + server: str, + constraints: Mapping[str, Any], + http_client: Any = None, + cache_store: Optional[CacheStore] = None, ) -> Mapping[str, dict]: """Return a map for all the dataset metadata.""" if http_client is None: @@ -123,6 +135,8 @@ def get_erddap_metadata( url = f"{server}/tabledap/allDatasets.json?" + quote_plus(",".join(fields)) if constraints_query: url += "&" + urlencode(constraints_query) + if cache_store: # pragma: no cover + return parse_erddap_tabledap_response(cache_store.read_json(url)) resp = http_client.get(url) resp.raise_for_status() return parse_erddap_tabledap_response(resp.json()) diff --git a/requirements.txt b/requirements.txt index 515ca87..219f7c1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +appdirs erddapy intake intake-xarray diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..87308b7 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,140 @@ +#!/usr/bin/env pytest +"""Unit tests for caching support.""" +import time +import gzip +from pathlib import Path +import hashlib +from unittest import mock +from intake_erddap import cache +import pytest +import tempfile +import shutil +import os + + +@pytest.fixture +def tempdir(): + tempdir = tempfile.mkdtemp() + yield tempdir + if os.path.exists(tempdir): + shutil.rmtree(tempdir) + + +@mock.patch('appdirs.user_cache_dir') +def test_cache_file(user_cache_dir_mock, tempdir): + user_cache_dir_mock.return_value = tempdir + url = "http://kevinbacon.invalid/erddap/advanced?blahbah" + store = cache.CacheStore() + filepath = store.cache_file(url) + assert filepath.parent == Path(tempdir) + sha = cache.CacheStore.hash_url(url) + assert filepath.name == f'{sha}.gz' + + +@mock.patch('requests.get') +@mock.patch('appdirs.user_cache_dir') +def test_cache_csv(user_cache_dir_mock, http_get_mock, tempdir): + user_cache_dir_mock.return_value = tempdir + resp = mock.Mock() + resp.content = b'blahblah' + http_get_mock.return_value = resp + url = "http://kevinbacon.invalid/erddap/advanced?blahbah" + store = cache.CacheStore() + store.cache_response(url) + sha = store.hash_url(url) + target = (Path(tempdir) / f'{sha}.gz') + assert target.exists() + assert http_get_mock.called_with(url) + with gzip.open(target, "rt", encoding="utf-8") as f: + buf = f.read() + assert buf == "blahblah" + + +@mock.patch('requests.get') +@mock.patch('appdirs.user_cache_dir') +def test_clearing_cache(user_cache_dir_mock, http_get_mock, tempdir): + user_cache_dir_mock.return_value = tempdir + resp = mock.Mock() + resp.content = b'blahblah' + http_get_mock.return_value = resp + url = "http://kevinbacon.invalid/erddap/advanced?blahbah" + store = cache.CacheStore() + store.cache_response(url) + sha = store.hash_url(url) + target = (Path(tempdir) / f'{sha}.gz') + + store.clear_cache() + assert not target.exists() + store.cache_response(url) + assert target.exists() + + # Clear cached files older than 100 s. The file we just created is brand new so should remain. + store.clear_cache(100) + assert target.exists() + + # Now change the mtime of the file to be 500 s old + now = time.time() + os.utime(target, (now - 500, now - 500)) + store.clear_cache(100) + assert not target.exists() + + +@mock.patch('appdirs.user_cache_dir') +def test_cache_no_dir(user_cache_dir_mock, tempdir): + """Tests that the cache store will create the cache dir if it doesn't exist.""" + user_cache_dir_mock.return_value = tempdir + tempdir = Path(tempdir) + tempdir.rmdir() + assert not tempdir.exists() + cache.CacheStore() + assert tempdir.exists() + + +@mock.patch('requests.get') +@mock.patch('appdirs.user_cache_dir') +def test_cache_read_csv(user_cache_dir_mock, http_get_mock, tempdir): + user_cache_dir_mock.return_value = tempdir + resp = mock.Mock() + http_get_mock.return_value = resp + resp.content = b"col_a,col_b\n1,blue\n2,red\n" + store = cache.CacheStore() + url = "http://blah.invalid/erddap/search?q=bacon+egg+and+cheese" + df = store.read_csv(url) + assert len(df) == 2 + filepath = store.cache_file(url) + with gzip.open(filepath, "wb") as f: + f.write(b"col_a,col_b\n3,green\n4,yellow\n") + df = store.read_csv(url) + assert df['col_a'].tolist() == [3, 4] + assert df['col_b'].tolist() == ["green", "yellow"] + + # Force a cache miss + now = time.time() + os.utime(filepath, (now - 1000, now - 1000)) + df = store.read_csv(url) + assert df['col_a'].tolist() == [1, 2] + assert df['col_b'].tolist() == ["blue", "red"] + + +@mock.patch('requests.get') +@mock.patch('appdirs.user_cache_dir') +def test_cache_read_json(user_cache_dir_mock, http_get_mock, tempdir): + user_cache_dir_mock.return_value = tempdir + resp = mock.Mock() + http_get_mock.return_value = resp + resp.content = b'{"key":"value", "example": "blah"}' + store = cache.CacheStore() + url = "http://blah.invalid/erddap/search?q=bacon+egg+and+cheese" + data = store.read_json(url) + assert data == {'key': 'value', 'example': 'blah'} + filepath = store.cache_file(url) + with gzip.open(filepath, "wb") as f: + f.write(b'{"different": "is different"}') + data = store.read_json(url) + assert data["different"] == "is different" + + # Force a cache miss + now = time.time() + os.utime(filepath, (now - 1000, now - 1000)) + data = store.read_json(url) + assert data == {'key': 'value', 'example': 'blah'} \ No newline at end of file diff --git a/tests/test_erddap_cat.py b/tests/test_erddap_cat.py index fc7f110..3b4951f 100644 --- a/tests/test_erddap_cat.py +++ b/tests/test_erddap_cat.py @@ -48,7 +48,7 @@ def temporary_catalog(): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_erddap_catalog(mock_read_csv, load_metadata_mock): """Test basic catalog API.""" load_metadata_mock.return_value = {} @@ -60,7 +60,7 @@ def test_erddap_catalog(mock_read_csv, load_metadata_mock): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_erddap_catalog_searching(mock_read_csv, load_metadata_mock): """Test catalog with search parameters.""" load_metadata_mock.return_value = {} @@ -80,12 +80,12 @@ def test_erddap_catalog_searching(mock_read_csv, load_metadata_mock): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_erddap_catalog_searching_variable(mock_read_csv, load_metadata_mock): load_metadata_mock.return_value = {} df1 = pd.DataFrame() df1["Category"] = ["sea_water_temperature"] - df1["URL"] = ["http://blah.com"] + df1["URL"] = ["http://blah.invalid"] df2 = pd.DataFrame() df2["Dataset ID"] = ["testID"] # pd.read_csv is called twice, so two return results @@ -177,7 +177,7 @@ def test_invalid_kwarg_search(): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_uses_di_client( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -191,7 +191,7 @@ def test_catalog_uses_di_client( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_skips_all_datasets_row(mock_read_csv, load_metadata_mock): load_metadata_mock.return_value = {} """Tests that the catalog results ignore allDatasets special dataset.""" @@ -203,7 +203,7 @@ def test_catalog_skips_all_datasets_row(mock_read_csv, load_metadata_mock): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_params_search(mock_read_csv, load_metadata_mock): load_metadata_mock.return_value = {} df = pd.DataFrame() @@ -232,7 +232,7 @@ def test_params_search(mock_read_csv, load_metadata_mock): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_constraints_present_in_source( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -255,7 +255,7 @@ def test_constraints_present_in_source( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_with_griddap( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -271,7 +271,7 @@ def test_catalog_with_griddap( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_with_unsupported_protocol( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -286,7 +286,7 @@ def test_catalog_with_unsupported_protocol( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_get_search_urls_by_category( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -303,7 +303,7 @@ def test_catalog_get_search_urls_by_category( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_bbox(mock_read_csv, load_metadata_mock, single_dataset_catalog): load_metadata_mock.return_value = {} mock_read_csv.return_value = single_dataset_catalog @@ -320,7 +320,7 @@ def test_catalog_bbox(mock_read_csv, load_metadata_mock, single_dataset_catalog) @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_standard_names_arg( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -336,7 +336,7 @@ def test_catalog_standard_names_arg( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_variable_names_arg( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -350,7 +350,7 @@ def test_catalog_variable_names_arg( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_times_arg(mock_read_csv, load_metadata_mock, single_dataset_catalog): load_metadata_mock.return_value = {} mock_read_csv.return_value = single_dataset_catalog @@ -372,7 +372,7 @@ def test_catalog_times_arg(mock_read_csv, load_metadata_mock, single_dataset_cat @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_search_for_arg( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -386,7 +386,7 @@ def test_catalog_search_for_arg( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_query_search_for( mock_read_csv, load_metadata_mock, single_dataset_catalog ): @@ -409,7 +409,7 @@ def test_catalog_query_search_for( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_search_returns_404(mock_read_csv, load_metadata_mock): load_metadata_mock.return_value = {} mock_read_csv.side_effect = HTTPError( @@ -425,7 +425,7 @@ def test_search_returns_404(mock_read_csv, load_metadata_mock): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_saving_catalog( mock_read_csv, load_metadata_mock, single_dataset_catalog, temporary_catalog ): @@ -453,7 +453,7 @@ def test_saving_catalog( @mock.patch("intake_erddap.utils.get_erddap_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_loading_metadata( mock_read_csv, mock_get_erddap_metadata, single_dataset_catalog ): @@ -467,7 +467,7 @@ def test_loading_metadata( @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_trailing_slash(mock_read_csv, load_metadata_mock, single_dataset_catalog): load_metadata_mock.return_value = {} mock_read_csv.return_value = single_dataset_catalog @@ -476,7 +476,7 @@ def test_trailing_slash(mock_read_csv, load_metadata_mock, single_dataset_catalo @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_catalog_query_type_intersection(mock_read_csv, load_metadata_mock): data = [ { @@ -531,7 +531,7 @@ def test_catalog_query_type_intersection(mock_read_csv, load_metadata_mock): @mock.patch("intake_erddap.erddap_cat.ERDDAPCatalog._load_metadata") -@mock.patch("pandas.read_csv") +@mock.patch("intake_erddap.cache.CacheStore.read_csv") def test_query_type_invalid(mock_read_csv, load_metadata_mock, single_dataset_catalog): load_metadata_mock.return_value = {} mock_read_csv.return_value = single_dataset_catalog