Skip to content

Commit

Permalink
Add GRIB field, handle and metadata cache (#428)
Browse files Browse the repository at this point in the history
* Add grib handle and metadata cache

---------

Co-authored-by: Iain Russell <40060766+iainrussell@users.noreply.github.com>
Co-authored-by: Iain Russell <Iain.Russell@ecmwf.int>
  • Loading branch information
3 people authored Aug 20, 2024
1 parent 81d5d6b commit 8060550
Show file tree
Hide file tree
Showing 20 changed files with 1,203 additions and 156 deletions.
195 changes: 98 additions & 97 deletions docs/examples/netcdf_fieldlist.ipynb

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions docs/guide/data_format/grib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,9 @@ Examples:
- :ref:`/examples/grib_metadata.ipynb`
- :ref:`/examples/grib_selection.ipynb`
- :ref:`/examples/grib_missing.ipynb`


Memory management
++++++++++++++++++++

See details :ref:`here <grib-memory>`.
130 changes: 130 additions & 0 deletions docs/guide/misc/grib_memory.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
.. _grib-memory:

GRIB field memory management
//////////////////////////////

:ref:`grib` is a message-based binary format, where each message is regarded as a field. For reading GRIB, earthkit-data relies on :xref:`eccodes`, which, when loading a message into memory, represents it as a ``GRIB handle``. In the low level API, the GRIB handle is the object that holds the data and metadata of a GRIB field, therefore it can use up a significant amount of memory.

Determining when a GRIB handle needs to be created and when it can be released is important for memory management. Earthkit-data provides several settings to control this behaviour depending on how we actually read the data.

Reading GRIB data as a stream iterator
========================================

We can read :ref:`grib` data as a :ref:`stream <streams>` iterator e.g. with the following code:

.. code-block:: python
import earthkit.data
url = "https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib"
ds = earthkit.data.from_source("url", url, stream=True)
for f in fields:
print(f)
Here, field ``f`` is not attached to a fieldlist and only exists in the scope of the iteration (in the for loop). During its existence the field keeps the GRIB handle in memory and if used in the way shown above, only one field can exist at a time. Once the stream is consumed there is no way to access the data again (unless we read it with :func:`from_source` again).

Reading all GRIB data from a stream into memory
===============================================

We can load :ref:`grib` data fully into memory when we read it as a :ref:`stream <streams>` with the ``read_all=True`` option in :func:`from_source`.

.. code-block:: python
import earthkit.data
url = "https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib"
ds = earthkit.data.from_source("url", url, stream=True, read_all=True)
With this, the entire ``ds`` fieldlist, including all the fields and the related GRIB handles, are stored in memory.

Reading data from disk and managing its memory
==============================================

When reading :ref:`grib` data from disk as a :ref:`file source <data-sources-file>`, it is represented as a fieldlist and loaded lazily. After the (fast) initial scan for field offsets and lengths, no actual fields are created and no data is read into memory. When we start using the fieldlist, e.g. by iterating over the fields, accessing data or metadata etc., the fields will be created **on demand** and the related GRIB handles will be loaded from disk **when needed**. Whether this data or part of it stays in memory depends on the following :ref:`settings <settings>`:

- :ref:`grib-field-policy <grib-field-policy>`
- :ref:`grib-handle-policy <grib-handle-policy>`
- :ref:`grib-handle-cache-size <grib-handle-cache-size>`
- :ref:`use-grib-metadata-cache <use-grib-metadata-cache>`

.. _grib-field-policy:

grib-field-policy
++++++++++++++++++++++++++++

Controls whether fields are kept in memory. The default is ``"persistent"``. The possible values are:

- ``"persistent"``: fields are kept in memory until the fieldlist is deleted
- ``"temporary"``: fields are deleted when they go out of scope and recreated on demand

The actual memory used by a field depends on whether it owns the GRIB handle of the related GRIB message. This is controlled by the :ref:`grib-handle-policy <grib-handle-policy>` settings.

A field can also cache its metadata access for performance, thus increasing memory usage. This is controlled by the :ref:`use-grib-metadata-cache <use-grib-metadata-cache>` settings.

.. _grib-handle-policy:

grib-handle-policy
++++++++++++++++++++++++++++

Controls whether GRIB handles are kept in memory. The default is ``"cache"``. The possible values are:

- ``"cache"``: a separate in-memory LRU cache is created for the GRIB handles in the fieldlist. The maximum number of GRIB handles kept in this cache is controlled by :ref:`grib-handle-cache-size <grib-handle-cache-size>`. In this mode, field objects are lightweight and only store the GRIB handle cache index, and can only access the GRIB handles via the cache.
- ``"persistent"``: once a GRIB handle is created, a field keeps it in memory until the field is deleted
- ``"temporary"``: for each call to data and metadata access on a field, a new GRIB handle is created and released once the access has finished.

.. _grib-handle-cache-size:

grib-handle-cache-size
++++++++++++++++++++++++++++

When :ref:`grib-handle-policy <grib-handle-policy>` is ``"cache"``, the setting ``grib-handle-cache-size`` (default is ``1``) specifies the maximum number of GRIB handles kept in an in-memory cache per fieldlist. This is an LRU cache, so when it is full, the least recently used GRIB handle is removed and a new GRIB message is loaded from disk and added to the cache.

.. _use-grib-metadata-cache:

use-grib-metadata-cache
+++++++++++++++++++++++++++++++++++

When ``use-grib-metadata-cache`` is ``True`` (this is the default) all the fields will cache their metadata access. This is an in-memory cache attached to the field and implemented for the low-level metadata accessor for individual keys. This cache can be useful when the same metadata keys are accessed multiple times for the same field.


Overriding the settings
++++++++++++++++++++++++++++

In addition to changing the :ref:`settings` themselves, it is possible to override the 4 parameters above when loading a given fieldlist by passing them as keyword arguments to :func:`from_source`. The parameter names are the same but the dashes are replaced by underscores. When a parameter is not specified in :func:`from_source` or is set to None, its value is taken from the actual :ref:`settings`. E.g.:

.. code-block:: python
import earthkit.data
ds = earthkit.data.from_source(
"file",
"test6.grib",
grib_field_policy="persistent",
grib_handle_policy="temporary",
grib_handle_cache_size=0,
use_grib_metadata_cache=True,
)
Reading data from disk as a stream
++++++++++++++++++++++++++++++++++

Whilst the usual way of reading GRIB data from disk loads fields lazily (i.e. only when they are actually used), it is also possible to read all
fields up-front and keep them in memory by reading it as a :ref:`stream source <data-sources-stream>` with the ``read_all=True`` option.

.. code-block:: python
import earthkit.data
f = open("test6.grib", "rb")
ds = earthkit.data.from_source("stream", f, read_all=True)
.. warning::

Use this option carefully since your data might not fit into memory.



.. note::
The default settings are chosen to keep the memory usage low and the performance high. However, depending on the use case, the settings can be adjusted to optimize the memory
usage and performance.
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dependencies:
- eccovjson>=0.0.5
- earthkit-geo>=0.2.0
- tqdm>=4.63.0
- lru-dict
- markdown
- make
- mypy
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies = [
"filelock",
"jinja2",
"jsonschema",
"lru-dict",
"markdown",
"multiurl",
"netcdf4",
Expand Down
12 changes: 7 additions & 5 deletions src/earthkit/data/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ def unique_values(self, *coords, remapping=None, patches=None, progress_bar=Fals
returns the list of unique values for each attributes
"""
from earthkit.data.core.order import build_remapping
from earthkit.data.utils.progbar import progress_bar

assert len(coords)
assert all(isinstance(k, str) for k in coords), coords
Expand All @@ -112,10 +111,13 @@ def unique_values(self, *coords, remapping=None, patches=None, progress_bar=Fals
iterable = self

if progress_bar:
iterable = progress_bar(
iterable=self,
desc=f"Finding coords in dataset for {coords}",
)
from earthkit.data.utils.progbar import progress_bar

if progress_bar:
iterable = progress_bar(
iterable=self,
desc=f"Finding coords in dataset for {coords}",
)

vals = defaultdict(dict)
for f in iterable:
Expand Down
15 changes: 7 additions & 8 deletions src/earthkit/data/core/fieldlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,8 +461,7 @@ def datetime(self):
def metadata(self, *keys, astype=None, **kwargs):
r"""Return metadata values from the field.
When called without any arguments returns a :obj:`Metadata` object, which for GRIB data
it always owns its own copy of the ecCodes handle of the GRIB message.
When called without any arguments returns a :obj:`Metadata` object.
Parameters
----------
Expand Down Expand Up @@ -565,7 +564,7 @@ def metadata(self, *keys, astype=None, **kwargs):
'2 metre temperature'
"""
# when called without arguments returns the metadata object
if len(keys) == 0 and astype is None and len(kwargs) == 0:
if len(keys) == 0 and astype is None and not kwargs:
return self._metadata

namespace = kwargs.pop("namespace", None)
Expand Down Expand Up @@ -745,7 +744,7 @@ def ignore(self):
@cached_method
def _default_index_keys(self):
if len(self) > 0:
return self[0].metadata().index_keys()
return self[0]._metadata.index_keys()
else:
return []

Expand Down Expand Up @@ -1108,7 +1107,7 @@ def _proc(keys, n):
@cached_method
def _default_ls_keys(self):
if len(self) > 0:
return self[0].metadata().ls_keys()
return self[0]._metadata.ls_keys()
else:
return []

Expand Down Expand Up @@ -1199,7 +1198,7 @@ def _proc():
@cached_method
def _describe_keys(self):
if len(self) > 0:
return self[0].metadata().describe_keys()
return self[0]._metadata.describe_keys()
else:
return []

Expand Down Expand Up @@ -1368,9 +1367,9 @@ def bounding_box(self):
@cached_method
def _is_shared_grid(self):
if len(self) > 0:
grid = self[0].metadata().geography._unique_grid_id()
grid = self[0]._metadata.geography._unique_grid_id()
if grid is not None:
return all(f.metadata().geography._unique_grid_id() == grid for f in self)
return all(f._metadata.geography._unique_grid_id() == grid for f in self)
return False

@detect_out_filename
Expand Down
9 changes: 7 additions & 2 deletions src/earthkit/data/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,13 @@ def build_actions(self, kwargs):

def compare_elements(self, a, b):
assert callable(self.remapping), (type(self.remapping), self.remapping)
a_metadata = self.remapping(a.metadata)
b_metadata = self.remapping(b.metadata)
if self.remapping:
a_metadata = self.remapping(a.metadata)
b_metadata = self.remapping(b.metadata)
else:
a_metadata = a.metadata
b_metadata = b.metadata

for k, v in self.actions.items():
n = v(a_metadata(k, default=None), b_metadata(k, default=None))
if n != 0:
Expand Down
34 changes: 26 additions & 8 deletions src/earthkit/data/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@

from abc import ABCMeta
from abc import abstractmethod
from functools import lru_cache

from earthkit.data.core.constants import DATETIME
from earthkit.data.core.constants import GRIDSPEC

try:
from functools import cache as memoise # noqa
except ImportError:
memoise = lru_cache


class Metadata(metaclass=ABCMeta):
r"""Base class to represent metadata.
Expand All @@ -22,6 +28,14 @@ class Metadata(metaclass=ABCMeta):
Implemented in subclasses: :obj:`RawMetadata`, :obj:`GribMetadata`.
Parameters
----------
extra: dict, None
Extra key/value pairs to be added on top of the underlying metadata. Default is None.
cache: bool
Enable caching of all the calls to :meth:`get`. Default is False. The cache
is attached to the instance.
Examples
--------
- :ref:`/examples/metadata.ipynb`
Expand All @@ -37,9 +51,11 @@ class Metadata(metaclass=ABCMeta):

extra = None

def __init__(self, extra=None):
def __init__(self, extra=None, cache=False):
if extra is not None:
self.extra = extra
if cache:
self.get = memoise(self.get)

def __iter__(self):
"""Return an iterator over the metadata keys."""
Expand Down Expand Up @@ -150,6 +166,8 @@ def _items(self):
def get(self, key, default=None, *, astype=None, raise_on_missing=False):
r"""Return the value for ``key``.
When the instance is created with ``cache=True`` all the result is cached.
Parameters
----------
key: str
Expand Down Expand Up @@ -179,13 +197,12 @@ def get(self, key, default=None, *, astype=None, raise_on_missing=False):
"""
if self._is_extra_key(key):
return self._get_extra_key(key, default=default, astype=astype)
if self._is_custom_key(key):
return self._get_custom_key(
key, default=default, astype=astype, raise_on_missing=raise_on_missing
)
v = self._get_extra_key(key, default=default, astype=astype)
elif self._is_custom_key(key):
v = self._get_custom_key(key, default=default, astype=astype, raise_on_missing=raise_on_missing)
else:
return self._get(key, default=default, astype=astype, raise_on_missing=raise_on_missing)
v = self._get(key, default=default, astype=astype, raise_on_missing=raise_on_missing)
return v

@abstractmethod
def _get(self, key, astype=None, default=None, raise_on_missing=False):
Expand All @@ -196,7 +213,6 @@ def _is_extra_key(self, key):

def _get_extra_key(self, key, default=None, astype=None, **kwargs):
v = self.extra.get(key, default)

if astype is not None and v is not None:
try:
return astype(v)
Expand Down Expand Up @@ -279,6 +295,7 @@ def datetime(self):
dict of datatime.datetime
Dict with items "base_time" and "valid_time".
>>> import earthkit.data
>>> ds = earthkit.data.from_source("file", "tests/data/t_time_series.grib")
>>> ds[4].datetime()
Expand Down Expand Up @@ -376,6 +393,7 @@ class RawMetadata(Metadata):

def __init__(self, *args, **kwargs):
self._d = dict(*args, **kwargs)
super().__init__()

def override(self, *args, **kwargs):
d = dict(**self._d)
Expand Down
25 changes: 25 additions & 0 deletions src/earthkit/data/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,31 @@ def validate(self, name, value):
{validator}""",
validator=IntervalValidator(Interval(8, 4096)),
),
"grib-field-policy": _(
"persistent",
"""GRIB field management policy for fieldlists with data on disk. {validator}
See :doc:`/guide/misc/grib_memory` for more information.""",
validator=ListValidator(["persistent", "temporary"]),
),
"grib-handle-policy": _(
"cache",
"""GRIB handle management policy for fieldlists with data on disk. {validator}
See :doc:`/guide/misc/grib_memory` for more information.""",
validator=ListValidator(["cache", "persistent", "temporary"]),
),
"grib-handle-cache-size": _(
1,
"""Maximum number of GRIB handles cached in memory per fieldlist with data on disk.
Used when ``grib-handle-policy`` is ``cache``.
See :doc:`/guide/misc/grib_memory` for more information.""",
none_ok=True,
),
"use-grib-metadata-cache": _(
True,
"""Use in-memory cache kept in each field for GRIB metadata access in
fieldlists with data on disk.
See :doc:`/guide/misc/grib_memory` for more information.""",
),
}


Expand Down
Loading

0 comments on commit 8060550

Please sign in to comment.