Skip to content

Commit

Permalink
Merge pull request #429 from euroargodev/float-store
Browse files Browse the repository at this point in the history
For a given float WMO, a new class to easily open netcdf dataset for local and remote GDAC
  • Loading branch information
gmaze authored Jan 24, 2025
2 parents 0a4585b + 9084465 commit 32d88da
Show file tree
Hide file tree
Showing 55 changed files with 1,964 additions and 56 deletions.
83 changes: 83 additions & 0 deletions .github/workflows/tmp_debug.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
name: Debug windows

on:
workflow_dispatch: # allows you to trigger the workflow run manually
pull_request:
types: [synchronize]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

permissions:
id-token: write
contents: read

jobs:
core-pinned:

name: Core - Pinned - Py${{matrix.python-version}}
runs-on: ${{ matrix.os }}
defaults:
run:
shell: bash -l {0}
continue-on-error: ${{ matrix.experimental }}
# timeout-minutes: 45
strategy:
max-parallel: 12
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]
os: ["windows-latest"]
experimental: [false]

steps:
- uses: actions/checkout@v4

- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v4.0.2
with:
aws-region: us-west-1
role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/ga-ci-tests-argopy-01

- name: Set environment variables
run: |
echo "CONDA_ENV_FILE=ci/requirements/py${{matrix.python-version}}-core-pinned.yml" >> $GITHUB_ENV
echo "PYTHON_VERSION=${{ matrix.python-version }}" >> $GITHUB_ENV
echo "LOG_FILE=argopy-tests-Core-Pinned-Py${{matrix.python-version}}-${{matrix.os}}.log" >> $GITHUB_ENV
- name: Setup Micromamba ${{ matrix.python-version }}
uses: mamba-org/setup-micromamba@v2
with:
micromamba-version: '1.5.10-0'
environment-name: argopy-tests
environment-file: ${{ env.CONDA_ENV_FILE }}
init-shell: bash
cache-environment: true
cache-environment-key: "${{runner.os}}-${{runner.arch}}-py${{matrix.python-version}}-${{env.TODAY}}-${{hashFiles(env.CONDA_ENV_FILE)}}"
create-args: >-
python=${{matrix.python-version}}
- name: Install argopy
run: |
python -m pip install --no-deps -e .
- name: Version info
run: |
micromamba info
micromamba list
- name: Test 1
continue-on-error: true
run: |
python -c "import argopy; print(argopy.utils.format.argo_split_path('/dac/coriolis/3902131/profiles/BD3902131_001.nc'))"
- name: Test 2
continue-on-error: true
run: |
python -c "import argopy; print(argopy.utils.format.argo_split_path('C:/Users/runneradmin/.argopy_tutorial_data/ftp/dac/aoml/13857/profiles/R13857_001.nc'))"
- name: Test 3
continue-on-error: true
run: |
python -c "import argopy; print(argopy.utils.format.argo_split_path('s3://argo-gdac-sandbox/pub/dac/aoml/13857/profiles/R13857_001.nc'))"
3 changes: 2 additions & 1 deletion argopy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .plot import dashboard, ArgoColors # noqa: E402
from .options import set_options, reset_options # noqa: E402
from .data_fetchers import CTDRefDataFetcher # noqa: E402
from .stores import ArgoIndex # noqa: E402
from .stores import ArgoIndex, ArgoFloat # noqa: E402
from .utils import show_versions, show_options # noqa: E402
from .utils import clear_cache, lscache # noqa: E402
from .utils import MonitoredThreadPoolExecutor # noqa: E402, F401
Expand Down Expand Up @@ -67,6 +67,7 @@
"OceanOPSDeployments", # Class
"CTDRefDataFetcher", # Class
"ArgoIndex", # Class
"ArgoFloat", # Class
"ArgoDocs", # Class
"TopoFetcher", # Class
"ArgoDOI", # Class
Expand Down
4 changes: 2 additions & 2 deletions argopy/data_fetchers/gdac_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
has_pyarrow = importlib.util.find_spec('pyarrow') is not None
if has_pyarrow:
from argopy.stores.argo_index_pa import indexstore_pyarrow as indexstore
log.debug("Using pyarrow indexstore")
# log.debug("Using pyarrow indexstore")
else:
from argopy.stores.argo_index_pd import indexstore_pandas as indexstore
# warnings.warn("Consider installing pyarrow in order to improve performances when fetching GDAC data")
log.debug("Using pandas indexstore")
# log.debug("Using pandas indexstore")

access_points = ["wmo", "box"]
exit_formats = ["xarray"]
Expand Down
3 changes: 3 additions & 0 deletions argopy/stores/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@
from .argo_index_pd import indexstore_pandas as indexstore_pd

from .argo_index import ArgoIndex
from .float.argo_float import ArgoFloat


#
__all__ = (
# Classes:
"ArgoIndex",
"ArgoFloat",
"indexstore_pa",
"indexstore_pd",
"filestore",
Expand Down
24 changes: 14 additions & 10 deletions argopy/stores/argo_index_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,28 @@ class ArgoIndexStoreProto(ABC):

def __init__(
self,
host: str = "https://data-argo.ifremer.fr",
host: str = None,
index_file: str = "ar_index_global_prof.txt",
convention: str = None,
cache: bool = False,
cachedir: str = "",
timeout: int = 0,
**kwargs,
):
"""Create an Argo index file store
"""Create an Argo index store
Parameters
----------
host: str, default: ``https://data-argo.ifremer.fr``
Local or remote (ftp, https or s3) path to a `dac` folder (GDAC structure compliant).
host: str, optional, default=OPTIONS["gdac"]
Local or remote (http, ftp or s3) path to a `dac` folder (compliant with GDAC structure).
This parameter takes values like:
- ``https://data-argo.ifremer.fr``
- ``ftp://ftp.ifremer.fr/ifremer/argo``
- ``s3://argo-gdac-sandbox/pub/idx``
- a local absolute path
You can also use the following keywords: ``http``/``https``, ``ftp`` and ``s3``/``aws``, respectively.
- ``https://data-argo.ifremer.fr``, shortcut with ``http`` or ``https``
- ``https://usgodae.org/pub/outgoing/argo``, shortcut with ``us-http`` or ``us-https``
- ``ftp://ftp.ifremer.fr/ifremer/argo``, shortcut with ``ftp``
- ``s3://argo-gdac-sandbox/pub/idx``, shortcut with ``s3`` or ``aws``
index_file: str, default: ``ar_index_global_prof.txt``
Name of the csv-like text file with the index.
Expand All @@ -114,14 +113,19 @@ def __init__(
timeout: int, default: OPTIONS['api_timeout']
Time out in seconds to connect to a remote host (ftp or http).
"""
host = OPTIONS["gdac"] if host is None else host

# Catchup keywords for host:
if str(host).lower() in ["ftp"]:
host = "ftp://ftp.ifremer.fr/ifremer/argo"
elif str(host).lower() in ["http", "https"]:
elif str(host).lower() in ["http", "https", "fr-http", "fr-https"]:
host = "https://data-argo.ifremer.fr"
elif str(host).lower() in ["us-http", "us-https"]:
host = "https://usgodae.org/pub/outgoing/argo"
elif str(host).lower() in ["s3", "aws"]:
host = "s3://argo-gdac-sandbox/pub/idx"
elif str(host).lower() in ["s3://argo-gdac-sandbox/pub", "s3://argo-gdac-sandbox/pub/"]:
host = "s3://argo-gdac-sandbox/pub/idx" # Fix s3 anomaly whereby index files are not at the 'dac' level
self.host = host

# Catchup keyword for the main profile index files:
Expand Down
12 changes: 10 additions & 2 deletions argopy/stores/filesystems.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ def open_json(self, url, **kwargs):
return js

def open_dataset(self, path, *args, **kwargs):
"""Return a xarray.dataset from a path.
"""Return a :class:`xarray.Dataset` from a path.
Parameters
----------
path: str
Path to resources passed to xarray.open_dataset
Path to resources passed to :func:`xarray.open_dataset`
*args, **kwargs:
Other arguments are passed to :func:`xarray.open_dataset`
Expand Down Expand Up @@ -1700,6 +1700,14 @@ def open_dataset(self, url, *args, **kwargs):
# except aiohttp.ClientResponseError as e:
raise

if data[0:3] != b"CDF" and data[0:3] != b"\x89HD":
raise TypeError(
"We didn't get a CDF or HDF5 binary data as expected ! We get: %s"
% data
)
if data[0:3] == b"\x89HD":
data = io.BytesIO(data)

xr_opts = {}
if "xr_opts" in kwargs:
xr_opts.update(kwargs["xr_opts"])
Expand Down
Empty file added argopy/stores/float/__init__.py
Empty file.
73 changes: 73 additions & 0 deletions argopy/stores/float/argo_float.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
"""
If client is online (connected to the web) we work with the 'online' implementation
otherwise we fall back on an offline implementation.
The choice is really meaningfull when the client is using a local host. In this case
we don't know if client intends to be online or offline, so we check and implement.
"""

import logging

from ...utils import isconnected


log = logging.getLogger("argopy.stores.ArgoFloat")


if isconnected():
from .implementations.argo_float_online import ArgoFloatOnline as FloatStore

log.info("Using ONLINE Argo Float implementation")
else:
from .implementations.argo_float_offline import ArgoFloatOffline as FloatStore

log.info("Using OFFLINE Argo Float implementation")


class ArgoFloat(FloatStore):
"""Argo GDAC float store
This store makes it easy to load/read data for a given float from any GDAC location and netcdf files
Examples
--------
.. code-block:: python
:caption: A float store is instantiated with float WMO number and a host (any access path: local, http, ftp or s3) where float files are to be found.
>>> from argopy import ArgoFloat
>>> af = ArgoFloat(WMO) # Use argopy 'gdac' option by default
>>> af = ArgoFloat(WMO, host='/home/ref-argo/gdac') # Use your local GDAC copy
>>> af = ArgoFloat(WMO, host='http') # Shortcut for https://data-argo.ifremer.fr
>>> af = ArgoFloat(WMO, host='ftp') # shortcut for ftp://ftp.ifremer.fr/ifremer/argo
>>> af = ArgoFloat(WMO, host='s3') # Shortcut for s3://argo-gdac-sandbox/pub
.. code-block:: python
:caption: Load/read GDAC netcdf files as a :class:`xarray.Dataset`
>>> af.list_dataset() # Return a dictionary with all available datasets for this float
>>> ds = af.open_dataset('prof') # Use keys from the available datasets dictionary
>>> ds = af.open_dataset('meta')
>>> ds = af.open_dataset('tech')
>>> ds = af.open_dataset('Rtraj')
>>> ds = af.open_dataset('Sprof')
.. code-block:: python
:caption: Other attributes and methods
>>> af.N_CYCLES # Number of cycles (estimated)
>>> af.path # root path for all float datasets
>>> af.dac # name of the DAC this float belongs to
>>> af.metadata # a dictionary with all available metadata for this file (from netcdf or fleetmonitoring API)
>>> af.ls() # list af.path folder content
.. code-block:: python
:caption: Working with float profiles
>>> af.lsprofiles() # list float "profiles" folder content
>>> af.describe_profiles() # Pandas DataFrame describing all available float profile files
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
11 changes: 11 additions & 0 deletions argopy/stores/float/implementations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/env python
# -*coding: UTF-8 -*-
#
# HELP
#
# Created by gmaze on 09/01/2025
__author__ = 'gmaze@ifremer.fr'

import os
import sys
import xarray as xr
91 changes: 91 additions & 0 deletions argopy/stores/float/implementations/argo_float_offline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pandas as pd
import numpy as np
from pathlib import Path
import logging

from ....errors import InvalidOption
from ..spec import ArgoFloatProto


log = logging.getLogger("argopy.stores.ArgoFloat")


class ArgoFloatOffline(ArgoFloatProto):
"""Offline :class:`ArgoFloat` implementation"""
_online = False

def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)

if self.host_protocol != "file":
raise InvalidOption(
"Trying to work with the offline store using a remote host !"
)

# Load some data (in a perfect world, this should be done asynchronously):
self.load_dac()
self.load_metadata() # must come after dac because metadata are read from netcdf files requiring dac folder name

def load_metadata(self):
"""Method to load float meta-data"""
data = {}

ds = self.open_dataset("meta")
data.update(
{
"deployment": {
"launchDate": pd.to_datetime(ds["LAUNCH_DATE"].values, utc=True)
}
}
)
data.update(
{"platform": {"type": ds["PLATFORM_TYPE"].values[np.newaxis][0].strip()}}
)
data.update({"maker": ds["PLATFORM_MAKER"].values[np.newaxis][0].strip()})

def infer_network(this_ds):
if this_ds["PLATFORM_FAMILY"].values[np.newaxis][0].strip() == "FLOAT_DEEP":
network = ["DEEP"]
if len(this_ds["SENSOR"].values) > 4:
network.append("BGC")

elif this_ds["PLATFORM_FAMILY"].values[np.newaxis][0].strip() == "FLOAT":
if len(this_ds["SENSOR"].values) > 4:
network = ["BGC"]
else:
network = ["CORE"]

else:
network = ["?"]

return network

data.update({"networks": infer_network(ds)})

data.update({"cycles": np.unique(self.open_dataset("prof")["CYCLE_NUMBER"])})

self._metadata = data

def load_dac(self):
"""Load the DAC short name for this float"""
try:
dac = [
p.parts[-2]
for p in Path(self.host).glob(
self.host_sep.join(["dac", "*", "%i" % self.WMO])
)
]
if len(dac) > 0:
self._dac = dac[0]

except:
raise ValueError(
f"DAC name for Float {self.WMO} cannot be found from {self.host}"
)

# For the record, another method to get the DAC name, based on the profile index:
# self._dac = self.idx.search_wmo(self.WMO).read_dac_wmo()[0][0] # Get DAC from Argo index
Loading

0 comments on commit 32d88da

Please sign in to comment.