diff --git a/docs/examples/data_from_stream.ipynb b/docs/examples/data_from_stream.ipynb index 478cb06e..4401778f 100644 --- a/docs/examples/data_from_stream.ipynb +++ b/docs/examples/data_from_stream.ipynb @@ -96,7 +96,7 @@ "tags": [] }, "source": [ - "We load it into earthkit-data by using the default settings (:ref:`batch_size `\\=1). With this when we iterate through *ds* it will consume one message from the stream at a time:" + "We load it into earthkit-data." ] }, { @@ -127,7 +127,7 @@ "tags": [] }, "source": [ - "At this point nothing is read from the stream. As we progressing with the iteration :py:class:`~data.readers.grib.codes.GribField` objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time." + "Using this object we can iterate through the stream. As we progressing with the iteration :py:class:`~data.readers.grib.codes.GribField` objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time." ] }, { @@ -181,15 +181,15 @@ }, { "cell_type": "markdown", - "id": "7122a9a4-9ca0-4d75-9194-144074c6dcad", + "id": "judicial-backing", "metadata": {}, "source": [ - "### Using group_by" + "### Using batched" ] }, { "cell_type": "raw", - "id": "d970d832-7203-498f-81d6-99434ce42b88", + "id": "fb4528e4-f649-4a5b-92b4-e92e9391851b", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", @@ -199,13 +199,13 @@ "tags": [] }, "source": [ - "When we use the :ref:`group_by ` option :ref:`from_source() ` gives us a stream iterator object. Each iteration step results in a :py:class:`FieldList ` object, which is built by consuming GRIB messages from the stream until the values of the metadata keys specified in :ref:`group_by ` change. The generated :py:class:`FieldList ` keeps GRIB messages in memory then gets deleted when going out of scope." + "When we use the :py:meth:`batched ` method we can iterate throught the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time." ] }, { "cell_type": "code", "execution_count": 7, - "id": "8e1be478-6eb6-4732-bb96-9d6fa942c20d", + "id": "placed-blues", "metadata": { "editable": true, "slideshow": { @@ -213,44 +213,29 @@ }, "tags": [] }, - "outputs": [], - "source": [ - "stream = open(\"test6.grib\", \"rb\")\n", - "ds = earthkit.data.from_source(\"stream\", stream, group_by=\"level\")" - ] - }, - { - "cell_type": "code", - "execution_count": 8, - "id": "810cc3eb-4a3f-4806-b799-23e1bc5523c6", - "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "len=3\n", - " GribField(t,1000,20180801,1200,0,0)\n", - " GribField(u,1000,20180801,1200,0,0)\n", - " GribField(v,1000,20180801,1200,0,0)\n", - "len=3\n", - " GribField(t,850,20180801,1200,0,0)\n", - " GribField(u,850,20180801,1200,0,0)\n", - " GribField(v,850,20180801,1200,0,0)\n" + "len=2 [('t', 1000), ('u', 1000)]\n", + "len=2 [('v', 1000), ('t', 850)]\n", + "len=2 [('u', 850), ('v', 850)]\n" ] } ], "source": [ - "for f in ds:\n", - " # f is a fieldlist\n", - " print(f\"len={len(f)}\")\n", - " for g in f:\n", - " print(f\" {g}\")" + "stream = open(\"test6.grib\", \"rb\")\n", + "ds = earthkit.data.from_source(\"stream\", stream)\n", + "\n", + "# f is a fieldlist\n", + "for f in ds.batched(2):\n", + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { "cell_type": "markdown", - "id": "89d33d4c-00d2-4f0b-996e-aaf5a5c9e161", + "id": "unavailable-actress", "metadata": {}, "source": [ "Having finished the iteration there is no data available in *ds*. We can close the stream:" @@ -258,25 +243,17 @@ }, { "cell_type": "code", - "execution_count": 9, - "id": "2ac79f14-cb43-40c0-8a56-9bc16943d7e7", + "execution_count": 8, + "id": "jewish-season", "metadata": {}, "outputs": [], "source": [ "stream.close()" ] }, - { - "cell_type": "markdown", - "id": "judicial-backing", - "metadata": {}, - "source": [ - "### Using batch_size" - ] - }, { "cell_type": "raw", - "id": "fb4528e4-f649-4a5b-92b4-e92e9391851b", + "id": "ed0b0e9c-1016-474b-8ea0-c65d864d2427", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", @@ -286,13 +263,13 @@ "tags": [] }, "source": [ - "The :ref:`batch_size ` option controls how many fields we read from the stream in one go. Please note that :ref:`batch_size ` cannot be used together with *group_by*. In this example we create a stream and read 2 fields from it at a time by using :ref:`batch_size `\\=2 in :ref:`from_source() `:" + "It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size." ] }, { "cell_type": "code", - "execution_count": 10, - "id": "placed-blues", + "execution_count": 9, + "id": "bf94a190-ec0e-4172-8e75-6518e48f50a4", "metadata": { "editable": true, "slideshow": { @@ -300,63 +277,36 @@ }, "tags": [] }, - "outputs": [], - "source": [ - "stream = open(\"test6.grib\", \"rb\")\n", - "ds = earthkit.data.from_source(\"stream\", stream, batch_size=2)" - ] - }, - { - "cell_type": "code", - "execution_count": 11, - "id": "outside-tennis", - "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "len=2\n", - " GribField(t,1000,20180801,1200,0,0)\n", - " GribField(u,1000,20180801,1200,0,0)\n", - "len=2\n", - " GribField(v,1000,20180801,1200,0,0)\n", - " GribField(t,850,20180801,1200,0,0)\n", - "len=2\n", - " GribField(u,850,20180801,1200,0,0)\n", - " GribField(v,850,20180801,1200,0,0)\n" + "len=4 [('t', 1000), ('u', 1000), ('v', 1000), ('t', 850)]\n", + "len=2 [('u', 850), ('v', 850)]\n" ] } ], "source": [ - "for f in ds:\n", - " # f is a fieldlist\n", - " print(f\"len={len(f)}\")\n", - " for g in f:\n", - " print(f\" {g}\")" + "stream = open(\"test6.grib\", \"rb\")\n", + "ds = earthkit.data.from_source(\"stream\", stream)\n", + "\n", + "# f is a fieldlist\n", + "for f in ds.batched(4):\n", + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { "cell_type": "markdown", - "id": "unavailable-actress", - "metadata": {}, - "source": [ - "Having finished the iteration there is no data available in *ds*. We can close the stream:" - ] - }, - { - "cell_type": "code", - "execution_count": 12, - "id": "jewish-season", + "id": "7122a9a4-9ca0-4d75-9194-144074c6dcad", "metadata": {}, - "outputs": [], "source": [ - "stream.close()" + "### Using group_by" ] }, { "cell_type": "raw", - "id": "ed0b0e9c-1016-474b-8ea0-c65d864d2427", + "id": "d970d832-7203-498f-81d6-99434ce42b88", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", @@ -366,13 +316,13 @@ "tags": [] }, "source": [ - "It is possible to set a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than prescribed by :ref:`batch_size `." + "When we use the :py:meth:`group_by ` method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a :py:class:`FieldList ` object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated :py:class:`FieldList ` keeps GRIB messages in memory then gets deleted when going out of scope." ] }, { "cell_type": "code", - "execution_count": 13, - "id": "bf94a190-ec0e-4172-8e75-6518e48f50a4", + "execution_count": 10, + "id": "8e1be478-6eb6-4732-bb96-9d6fa942c20d", "metadata": { "editable": true, "slideshow": { @@ -385,26 +335,36 @@ "name": "stdout", "output_type": "stream", "text": [ - "len=4\n", - " GribField(t,1000,20180801,1200,0,0)\n", - " GribField(u,1000,20180801,1200,0,0)\n", - " GribField(v,1000,20180801,1200,0,0)\n", - " GribField(t,850,20180801,1200,0,0)\n", - "len=2\n", - " GribField(u,850,20180801,1200,0,0)\n", - " GribField(v,850,20180801,1200,0,0)\n" + "len=3 [('t', 1000), ('u', 1000), ('v', 1000)]\n", + "len=3 [('t', 850), ('u', 850), ('v', 850)]\n" ] } ], "source": [ "stream = open(\"test6.grib\", \"rb\")\n", - "ds = earthkit.data.from_source(\"stream\", stream, batch_size=4)\n", + "ds = earthkit.data.from_source(\"stream\", stream)\n", "\n", - "for f in ds:\n", - " # f is a fieldlist\n", - " print(f\"len={len(f)}\")\n", - " for g in f:\n", - " print(f\" {g}\")" + "# f is a fieldlist\n", + "for f in ds.group_by(\"level\"):\n", + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" + ] + }, + { + "cell_type": "markdown", + "id": "89d33d4c-00d2-4f0b-996e-aaf5a5c9e161", + "metadata": {}, + "source": [ + "Having finished the iteration there is no data available in *ds*. We can close the stream:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "2ac79f14-cb43-40c0-8a56-9bc16943d7e7", + "metadata": {}, + "outputs": [], + "source": [ + "stream.close()" ] }, { @@ -427,12 +387,12 @@ "tags": [] }, "source": [ - "We can also set :ref:`batch_size `\\=0 in :ref:`from_source() `." + "We can load the whole stream into memory by using ``read_all=True`` in :ref:`from_source() `. The resulting object will be a :py:class:`FieldList` storing all the GRIB messages in memory." ] }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 12, "id": "simple-london", "metadata": { "editable": true, @@ -444,22 +404,12 @@ "outputs": [], "source": [ "stream = open(\"test6.grib\", \"rb\")\n", - "ds = earthkit.data.from_source(\"stream\", stream, batch_size=0)" - ] - }, - { - "cell_type": "markdown", - "id": "seeing-freight", - "metadata": {}, - "source": [ - "The resulting earthkit-data object is empty at this point. However, as soon as we call any method on it it will consume the whole stream and load all the GRIB messages into memory. They will be stored in memory as long as *ds* exists.\n", - "\n", - "We can call all the standard earthkit-data methods on *ds*:" + "ds = earthkit.data.from_source(\"stream\", stream, read_all=True)" ] }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 13, "id": "meaning-oxide", "metadata": {}, "outputs": [ @@ -469,7 +419,7 @@ "6" ] }, - "execution_count": 15, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } @@ -480,9 +430,15 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 14, "id": "copyrighted-walnut", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [ { "data": { @@ -618,7 +574,7 @@ "5 an 0 regular_ll " ] }, - "execution_count": 16, + "execution_count": 14, "metadata": {}, "output_type": "execute_result" } @@ -629,7 +585,7 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 15, "id": "static-reasoning", "metadata": {}, "outputs": [ @@ -707,7 +663,7 @@ "1 an 0 regular_ll " ] }, - "execution_count": 17, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -719,7 +675,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 16, "id": "spanish-wagon", "metadata": {}, "outputs": [ @@ -1109,9 +1065,9 @@ " GRIB_subCentre: 0\n", " Conventions: CF-1.7\n", " institution: European Centre for Medium-Range Weather Forecasts\n", - " history: 2024-01-02T11:25 GRIB to CDM+CF via cfgrib-0.9.1...
  • GRIB_edition :
    1
    GRIB_centre :
    ecmf
    GRIB_centreDescription :
    European Centre for Medium-Range Weather Forecasts
    GRIB_subCentre :
    0
    Conventions :
    CF-1.7
    institution :
    European Centre for Medium-Range Weather Forecasts
    history :
    2024-04-23T17:21 GRIB to CDM+CF via cfgrib-0.9.10.4/ecCodes-2.36.0 with {"source": "N/A", "filter_by_keys": {}, "encode_cf": ["parameter", "time", "geography", "vertical"]}
  • " ], "text/plain": [ "\n", @@ -1134,10 +1090,10 @@ " GRIB_subCentre: 0\n", " Conventions: CF-1.7\n", " institution: European Centre for Medium-Range Weather Forecasts\n", - " history: 2024-01-02T11:25 GRIB to CDM+CF via cfgrib-0.9.1..." + " history: 2024-04-23T17:21 GRIB to CDM+CF via cfgrib-0.9.1..." ] }, - "execution_count": 18, + "execution_count": 16, "metadata": {}, "output_type": "execute_result" } @@ -1157,7 +1113,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 17, "id": "through-mistress", "metadata": { "editable": true, @@ -1174,9 +1130,9 @@ ], "metadata": { "kernelspec": { - "display_name": "dev", + "display_name": "dev_ecc", "language": "python", - "name": "dev" + "name": "dev_ecc" }, "language_info": { "codemirror_mode": { diff --git a/docs/examples/fdb.ipynb b/docs/examples/fdb.ipynb index 0b71df89..96a4417c 100644 --- a/docs/examples/fdb.ipynb +++ b/docs/examples/fdb.ipynb @@ -47,7 +47,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "drawn-renewal", "metadata": {}, "outputs": [], @@ -56,7 +56,7 @@ " 'class': 'od',\n", " 'expver': '0001',\n", " 'stream': 'oper',\n", - " 'date': '20230607',\n", + " 'date': '20240421',\n", " 'time': [0, 12],\n", " 'domain': 'g',\n", " 'type': 'an',\n", @@ -101,7 +101,7 @@ "tags": [] }, "source": [ - "#### Stream: iteration with one field at a time in memory" + "#### Iteration with one field at a time in memory" ] }, { @@ -135,12 +135,12 @@ "name": "stdout", "output_type": "stream", "text": [ - "GribField(msl,None,20230607,0,0,0)\n", - "GribField(2t,None,20230607,0,0,0)\n", - "GribField(2d,None,20230607,0,0,0)\n", - "GribField(msl,None,20230607,1200,0,0)\n", - "GribField(2t,None,20230607,1200,0,0)\n", - "GribField(2d,None,20230607,1200,0,0)\n" + "GribField(msl,None,20240421,0,0,0)\n", + "GribField(2t,None,20240421,0,0,0)\n", + "GribField(2d,None,20240421,0,0,0)\n", + "GribField(msl,None,20240421,1200,0,0)\n", + "GribField(2t,None,20240421,1200,0,0)\n", + "GribField(2d,None,20240421,1200,0,0)\n" ] } ], @@ -186,7 +186,7 @@ "tags": [] }, "source": [ - "#### Stream: using group_by" + "#### Iteration with group_by" ] }, { @@ -201,7 +201,7 @@ "tags": [] }, "source": [ - "When we use the :ref:`group_by ` option :ref:`from_source() ` gives us a stream iterator object. Each iteration step results in a Fieldlist object, which is built by consuming GRIB messages from the stream until the values of the metadata keys specified in :ref:`group_by ` change. The generated Fieldlist keeps GRIB messages in memory then gets deleted when going out of scope." + "When we use the :py:meth:`group_by ` method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a :py:class:`FieldList ` object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated :py:class:`FieldList ` keeps GRIB messages in memory then gets deleted when going out of scope." ] }, { @@ -220,23 +220,15 @@ "name": "stdout", "output_type": "stream", "text": [ - "\n", - " GribField(msl,None,20230607,0,0,0)\n", - " GribField(2t,None,20230607,0,0,0)\n", - " GribField(2d,None,20230607,0,0,0)\n", - "\n", - " GribField(msl,None,20230607,1200,0,0)\n", - " GribField(2t,None,20230607,1200,0,0)\n", - " GribField(2d,None,20230607,1200,0,0)\n" + "len=3 [('msl', 0), ('2t', 0), ('2d', 0)]\n", + "len=3 [('msl', 0), ('2t', 0), ('2d', 0)]\n" ] } ], "source": [ - "ds = earthkit.data.from_source(\"fdb\", request, group_by=\"time\")\n", - "for f in ds:\n", - " print(type(f))\n", - " for g in f:\n", - " print(f\" {g}\")" + "ds = earthkit.data.from_source(\"fdb\", request)\n", + "for f in ds.group_by(\"time\"):\n", + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { @@ -244,7 +236,7 @@ "id": "ethical-canyon", "metadata": {}, "source": [ - "#### Stream: using batch_size" + "#### Iteration with batched" ] }, { @@ -259,7 +251,7 @@ "tags": [] }, "source": [ - "We can read multiple fields into memory from the stream at a time by using :ref:`batch_size ` in :ref:`from_source `, Please note that :ref:`batch_size ` cannot be used together with :ref:`group_by `." + "When we use the :py:meth:`batched ` method we can iterate throught the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time." ] }, { @@ -278,24 +270,16 @@ "name": "stdout", "output_type": "stream", "text": [ - "\n", - " GribField(msl,None,20230607,0,0,0)\n", - " GribField(2t,None,20230607,0,0,0)\n", - "\n", - " GribField(2d,None,20230607,0,0,0)\n", - " GribField(msl,None,20230607,1200,0,0)\n", - "\n", - " GribField(2t,None,20230607,1200,0,0)\n", - " GribField(2d,None,20230607,1200,0,0)\n" + "len=2 [('msl', 0), ('2t', 0)]\n", + "len=2 [('2d', 0), ('msl', 0)]\n", + "len=2 [('2t', 0), ('2d', 0)]\n" ] } ], "source": [ - "ds = earthkit.data.from_source(\"fdb\", request, batch_size=2)\n", - "for f in ds:\n", - " print(type(f))\n", - " for g in f:\n", - " print(f\" {g}\")" + "ds = earthkit.data.from_source(\"fdb\", request)\n", + "for f in ds.batched(2):\n", + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { @@ -303,7 +287,7 @@ "id": "through-seven", "metadata": {}, "source": [ - "#### Stream: storing all the fields in memory" + "#### Storing all the fields in memory" ] }, { @@ -318,7 +302,7 @@ "tags": [] }, "source": [ - "When we use :ref:`batch_size `\\=0 all the fields are loaded into memory and the resulting object iswill behave like a FieldList:" + "We can load the whole stream into memory by using ``read_all=True`` in :ref:`from_source() `. The resulting object will be a FieldList." ] }, { @@ -334,46 +318,12 @@ }, "outputs": [], "source": [ - "ds = earthkit.data.from_source(\"fdb\", request, batch_size=0)" - ] - }, - { - "cell_type": "markdown", - "id": "concrete-filling", - "metadata": {}, - "source": [ - "Nothing is read at this moment:" + "ds = earthkit.data.from_source(\"fdb\", request, read_all=True)" ] }, { "cell_type": "code", "execution_count": 8, - "id": "defensive-spell", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "stored fields count=0\n" - ] - } - ], - "source": [ - "print(f\"stored fields count={len(ds._reader._fields)}\")" - ] - }, - { - "cell_type": "markdown", - "id": "blind-houston", - "metadata": {}, - "source": [ - "If we call any function on the fieldlist it reads the messages into memory" - ] - }, - { - "cell_type": "code", - "execution_count": 9, "id": "exciting-accused", "metadata": {}, "outputs": [ @@ -383,7 +333,7 @@ "6" ] }, - "execution_count": 9, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } @@ -394,25 +344,7 @@ }, { "cell_type": "code", - "execution_count": 10, - "id": "efficient-submission", - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "stored fields count=6\n" - ] - } - ], - "source": [ - "print(f\"stored fields count={len(ds._reader._fields)}\")" - ] - }, - { - "cell_type": "code", - "execution_count": 11, + "execution_count": 9, "id": "minus-horizon", "metadata": {}, "outputs": [ @@ -456,7 +388,7 @@ " msl\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -469,7 +401,7 @@ " 2t\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -482,7 +414,7 @@ " 2d\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -495,7 +427,7 @@ " msl\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -508,7 +440,7 @@ " 2t\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -521,7 +453,7 @@ " 2d\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -534,12 +466,12 @@ ], "text/plain": [ " centre shortName typeOfLevel level dataDate dataTime stepRange dataType \\\n", - "0 ecmf msl surface 0 20230607 0 0 an \n", - "1 ecmf 2t surface 0 20230607 0 0 an \n", - "2 ecmf 2d surface 0 20230607 0 0 an \n", - "3 ecmf msl surface 0 20230607 1200 0 an \n", - "4 ecmf 2t surface 0 20230607 1200 0 an \n", - "5 ecmf 2d surface 0 20230607 1200 0 an \n", + "0 ecmf msl surface 0 20240421 0 0 an \n", + "1 ecmf 2t surface 0 20240421 0 0 an \n", + "2 ecmf 2d surface 0 20240421 0 0 an \n", + "3 ecmf msl surface 0 20240421 1200 0 an \n", + "4 ecmf 2t surface 0 20240421 1200 0 an \n", + "5 ecmf 2d surface 0 20240421 1200 0 an \n", "\n", " number gridType \n", "0 0 reduced_gg \n", @@ -550,7 +482,7 @@ "5 0 reduced_gg " ] }, - "execution_count": 11, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } @@ -561,7 +493,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 10, "id": "tamil-tattoo", "metadata": {}, "outputs": [ @@ -605,7 +537,7 @@ " 2t\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -618,7 +550,7 @@ " 2t\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -631,15 +563,15 @@ ], "text/plain": [ " centre shortName typeOfLevel level dataDate dataTime stepRange dataType \\\n", - "0 ecmf 2t surface 0 20230607 0 0 an \n", - "1 ecmf 2t surface 0 20230607 1200 0 an \n", + "0 ecmf 2t surface 0 20240421 0 0 an \n", + "1 ecmf 2t surface 0 20240421 1200 0 an \n", "\n", " number gridType \n", "0 0 reduced_gg \n", "1 0 reduced_gg " ] }, - "execution_count": 12, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } @@ -650,7 +582,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 11, "id": "assumed-month", "metadata": {}, "outputs": [ @@ -1024,9 +956,9 @@ "Dimensions: (number: 1, time: 2, step: 1, surface: 1, values: 6599680)\n", "Coordinates:\n", " * number (number) int64 0\n", - " * time (time) datetime64[ns] 2023-06-07 2023-06-07T12:00:00\n", + " * time (time) datetime64[ns] 2024-04-21 2024-04-21T12:00:00\n", " * step (step) timedelta64[ns] 00:00:00\n", - " * surface (surface) int64 0\n", + " * surface (surface) float64 0.0\n", " latitude (values) float64 ...\n", " longitude (values) float64 ...\n", " valid_time (time, step) datetime64[ns] ...\n", @@ -1042,17 +974,17 @@ " GRIB_subCentre: 0\n", " Conventions: CF-1.7\n", " institution: European Centre for Medium-Range Weather Forecasts\n", - " history: 2023-06-08T11:49 GRIB to CDM+CF via cfgrib-0.9.1..." + " history: 2024-04-22T11:01 GRIB to CDM+CF via cfgrib-0.9.1..." ], "text/plain": [ "\n", "Dimensions: (number: 1, time: 2, step: 1, surface: 1, values: 6599680)\n", "Coordinates:\n", " * number (number) int64 0\n", - " * time (time) datetime64[ns] 2023-06-07 2023-06-07T12:00:00\n", + " * time (time) datetime64[ns] 2024-04-21 2024-04-21T12:00:00\n", " * step (step) timedelta64[ns] 00:00:00\n", - " * surface (surface) int64 0\n", + " * surface (surface) float64 0.0\n", " latitude (values) float64 ...\n", " longitude (values) float64 ...\n", " valid_time (time, step) datetime64[ns] ...\n", @@ -1068,10 +1000,10 @@ " GRIB_subCentre: 0\n", " Conventions: CF-1.7\n", " institution: European Centre for Medium-Range Weather Forecasts\n", - " history: 2023-06-08T11:49 GRIB to CDM+CF via cfgrib-0.9.1..." + " history: 2024-04-22T11:01 GRIB to CDM+CF via cfgrib-0.9.1..." ] }, - "execution_count": 13, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -1105,7 +1037,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 12, "id": "passing-georgia", "metadata": { "editable": true, @@ -1121,7 +1053,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 13, "id": "foster-profile", "metadata": {}, "outputs": [ @@ -1165,7 +1097,7 @@ " msl\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -1178,7 +1110,7 @@ " 2t\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -1191,7 +1123,7 @@ " 2d\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 0\n", " 0\n", " an\n", @@ -1204,7 +1136,7 @@ " msl\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -1217,7 +1149,7 @@ " 2t\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -1230,7 +1162,7 @@ " 2d\n", " surface\n", " 0\n", - " 20230607\n", + " 20240421\n", " 1200\n", " 0\n", " an\n", @@ -1243,12 +1175,12 @@ ], "text/plain": [ " centre shortName typeOfLevel level dataDate dataTime stepRange dataType \\\n", - "0 ecmf msl surface 0 20230607 0 0 an \n", - "1 ecmf 2t surface 0 20230607 0 0 an \n", - "2 ecmf 2d surface 0 20230607 0 0 an \n", - "3 ecmf msl surface 0 20230607 1200 0 an \n", - "4 ecmf 2t surface 0 20230607 1200 0 an \n", - "5 ecmf 2d surface 0 20230607 1200 0 an \n", + "0 ecmf msl surface 0 20240421 0 0 an \n", + "1 ecmf 2t surface 0 20240421 0 0 an \n", + "2 ecmf 2d surface 0 20240421 0 0 an \n", + "3 ecmf msl surface 0 20240421 1200 0 an \n", + "4 ecmf 2t surface 0 20240421 1200 0 an \n", + "5 ecmf 2d surface 0 20240421 1200 0 an \n", "\n", " number gridType \n", "0 0 reduced_gg \n", @@ -1259,7 +1191,7 @@ "5 0 reduced_gg " ] }, - "execution_count": 15, + "execution_count": 13, "metadata": {}, "output_type": "execute_result" } @@ -1300,9 +1232,9 @@ ], "metadata": { "kernelspec": { - "display_name": "dev", + "display_name": "pyfdb", "language": "python", - "name": "dev" + "name": "pyfdb" }, "language_info": { "codemirror_mode": { @@ -1314,7 +1246,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.13" + "version": "3.8.12" } }, "nbformat": 4, diff --git a/docs/examples/url_stream.ipynb b/docs/examples/url_stream.ipynb index 27f294cd..29cb0df4 100644 --- a/docs/examples/url_stream.ipynb +++ b/docs/examples/url_stream.ipynb @@ -43,7 +43,7 @@ "tags": [] }, "source": [ - "earthkit-data can read GRIB data from a URL as a stream without writing anything to disk. This can be activated with the **stream=True** kwarg when calling :ref:`from_source() `." + "earthkit-data can read GRIB data from a URL as a stream without writing anything to disk. This can be activated with the ``stream=True`` kwarg when calling :ref:`from_source() `." ] }, { @@ -65,28 +65,45 @@ ] }, { - "cell_type": "code", - "execution_count": 6, - "id": "51964579-bf3f-4948-918d-fcd35581950e", + "cell_type": "markdown", + "id": "1661f2d1-2c30-4021-826f-1c2c5c565fd0", "metadata": {}, - "outputs": [], "source": [ - "for f in ds:\n", - " # f is GribField object. It gets deleted when going out of scope\n", - " print(f)" + "The resulting object only supports one iteration. Having finsihed the iteration the stream is consumed and no more data is available." ] }, { - "cell_type": "markdown", - "id": "7f595ec8-008f-4198-8a3a-f8346fbfe09c", - "metadata": {}, + "cell_type": "code", + "execution_count": 3, + "id": "51964579-bf3f-4948-918d-fcd35581950e", + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "GribField(t,500,20070101,1200,0,0)\n", + "GribField(z,500,20070101,1200,0,0)\n", + "GribField(t,850,20070101,1200,0,0)\n", + "GribField(z,850,20070101,1200,0,0)\n" + ] + } + ], "source": [ - "### Stream options" + "for f in ds:\n", + " # f is GribField object. It gets deleted when going out of scope\n", + " print(f)" ] }, { "cell_type": "raw", - "id": "328b1d7b-500d-4014-8346-b6d47c40ee86", + "id": "df31a770-f5e9-46c7-8d5f-c9f73c57483c", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", @@ -96,13 +113,13 @@ "tags": [] }, "source": [ - "To control how the stream is read use :ref:`batch_size ` and :ref:`group_by `. E.g. the following code reads the GRIB data in messages of 2." + "The iteration can be done in batches by using :py:meth:`batched ` or :py:meth:`group_by `." ] }, { "cell_type": "code", "execution_count": 4, - "id": "b7c9a685-6b38-4236-ae74-a14445e8bb95", + "id": "754dc030-da97-4076-88ba-f31fed2f091d", "metadata": { "editable": true, "slideshow": { @@ -115,25 +132,186 @@ "name": "stdout", "output_type": "stream", "text": [ - "len=2\n", - " GribField(t,500,20070101,1200,0,0)\n", - " GribField(z,500,20070101,1200,0,0)\n", - "len=2\n", - " GribField(t,850,20070101,1200,0,0)\n", - " GribField(z,850,20070101,1200,0,0)\n" + "len=2 [('t', 500), ('z', 500)]\n", + "len=2 [('t', 850), ('z', 850)]\n" ] } ], "source": [ "ds = earthkit.data.from_source(\"url\", \n", " \"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test4.grib\", \n", - " stream=True, batch_size=2)\n", + " stream=True)\n", "\n", - "for f in ds:\n", + "for f in ds.batched(2):\n", " # f is a fieldlist\n", - " print(f\"len={len(f)}\")\n", - " for g in f:\n", - " print(f\" {g}\")" + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" + ] + }, + { + "cell_type": "markdown", + "id": "9db87b26-20e2-4cb5-89a1-2643d97fcf13", + "metadata": {}, + "source": [ + "#### Reading the whole stream into memory" + ] + }, + { + "cell_type": "raw", + "id": "f8622372-d2c9-4261-86cc-db46531b0cd5", + "metadata": { + "editable": true, + "raw_mimetype": "text/restructuredtext", + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, + "source": [ + "We can load the whole stream into memory by using ``read_all=True`` in :ref:`from_source() `. The resulting object will be a py:class:`FieldList` storing all the GRIB messages in memory." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "359ceb8d-3c88-411d-af86-78c7cb403a08", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "4" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds = earthkit.data.from_source(\"url\", \n", + " \"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test4.grib\", \n", + " stream=True, read_all=True)\n", + "\n", + "len(ds)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "4608964a-3849-4c40-bcd3-086f54b2f67a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
    \n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
    centreshortNametypeOfLevelleveldataDatedataTimestepRangedataTypenumbergridType
    0ecmftisobaricInhPa5002007010112000an0regular_ll
    1ecmfzisobaricInhPa5002007010112000an0regular_ll
    2ecmftisobaricInhPa8502007010112000an0regular_ll
    3ecmfzisobaricInhPa8502007010112000an0regular_ll
    \n", + "
    " + ], + "text/plain": [ + " centre shortName typeOfLevel level dataDate dataTime stepRange \\\n", + "0 ecmf t isobaricInhPa 500 20070101 1200 0 \n", + "1 ecmf z isobaricInhPa 500 20070101 1200 0 \n", + "2 ecmf t isobaricInhPa 850 20070101 1200 0 \n", + "3 ecmf z isobaricInhPa 850 20070101 1200 0 \n", + "\n", + " dataType number gridType \n", + "0 an 0 regular_ll \n", + "1 an 0 regular_ll \n", + "2 an 0 regular_ll \n", + "3 an 0 regular_ll " + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ds.ls()" ] }, { @@ -166,28 +344,24 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 7, "id": "891af3e5-5aa5-451f-b7bb-a86eaf7c8ffe", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "len=3\n", - " GribField(t,500,20070101,1200,0,0)\n", - " GribField(z,500,20070101,1200,0,0)\n", - " GribField(t,850,20070101,1200,0,0)\n", - "len=3\n", - " GribField(z,850,20070101,1200,0,0)\n", - " GribField(t,1000,20180801,1200,0,0)\n", - " GribField(u,1000,20180801,1200,0,0)\n", - "len=3\n", - " GribField(v,1000,20180801,1200,0,0)\n", - " GribField(t,850,20180801,1200,0,0)\n", - " GribField(u,850,20180801,1200,0,0)\n", - "len=1\n", - " GribField(v,850,20180801,1200,0,0)\n" + "len=3 [('t', 500), ('z', 500), ('t', 850)]\n", + "len=3 [('z', 850), ('t', 1000), ('u', 1000)]\n", + "len=3 [('v', 1000), ('t', 850), ('u', 850)]\n", + "len=1 [('v', 850)]\n" ] } ], @@ -195,20 +369,24 @@ "ds = earthkit.data.from_source(\"url\", \n", " [\"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test4.grib\", \n", " \"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib\"], \n", - " stream=True, batch_size=3)\n", + " stream=True)\n", "\n", - "for f in ds:\n", + "for f in ds.batched(3):\n", " # f is a fieldlist\n", - " print(f\"len={len(f)}\")\n", - " for g in f:\n", - " print(f\" {g}\")" + " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { "cell_type": "code", "execution_count": null, "id": "d287fb70-66fb-4b2f-8c07-a3cca7c3035d", - "metadata": {}, + "metadata": { + "editable": true, + "slideshow": { + "slide_type": "" + }, + "tags": [] + }, "outputs": [], "source": [] } diff --git a/docs/guide/data.rst b/docs/guide/data.rst index 495a893a..3eb74942 100644 --- a/docs/guide/data.rst +++ b/docs/guide/data.rst @@ -10,6 +10,8 @@ The list of common methods/operators: - :ref:`conversion` - :ref:`concat` - :ref:`iter` + - :ref:`batched` + - :ref:`group_by` - :ref:`slice` - :ref:`sel` - :ref:`order_by` @@ -77,6 +79,49 @@ In the the following example we read a GRIB file from disk. In the iteration eac GribField(u,850,20180801,1200,0,0) GribField(v,850,20180801,1200,0,0) +.. _batched: + +Iteration with ``.batched()`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When an earthkit-data data `source` or dataset provides a :class:`~data.core.fieldlist.FieldList` or message list, we can iterate through it in batches of fixed size using :meth:`~data.core.fieldlist.FieldList.batched`. This method also works for :ref:`streams `. + +In the the following example we read a GRIB file from disk and iterate through it in batches of 2. Each iteration step yields a :class:`~data.core.fieldlist.FieldList` of 2 fields. + +.. code-block:: python + + >>> import earthkit.data + >>> ds = earthkit.data.from_source("file", "docs/examples/test6.grib") + + >>> for f in ds.batched(2): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") + ... + len=2 [('t', 1000), ('u', 1000)] + len=2 [('v', 1000), ('t', 850)] + len=2 [('u', 850), ('v', 850)] + + +.. _group_by: + +Iteration with ``.group_by()`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +When an earthkit-data data `source` or dataset provides a :class:`~data.core.fieldlist.FieldList` or message list, we can iterate through it in groups defined by metadata keys using :meth:`~data.core.fieldlist.FieldList.group_by`. This method also works for :ref:`streams `. + +In the the following example we read a GRIB file from disk and iterate through it in groups defined by the "level" metadata key. Each iteration step yields a :class:`~data.core.fieldlist.FieldList` containing fields with the same "level" value. + +.. code-block:: python + + >>> import earthkit.data + >>> ds = earthkit.data.from_source("file", "docs/examples/test6.grib") + + >>> for f in ds.group_by("level"): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") + ... + len=3 [('t', 1000), ('u', 1000), ('v', 1000)] + len=3 [('t', 850), ('u', 850), ('v', 850)] + + .. _slice: Selection with ``[...]`` diff --git a/docs/guide/data_format/grib.rst b/docs/guide/data_format/grib.rst index 818f79a3..daf6f918 100644 --- a/docs/guide/data_format/grib.rst +++ b/docs/guide/data_format/grib.rst @@ -39,6 +39,10 @@ The following table gives us an overview of the GRIB :class:`~data.readers.grib. - * - :ref:`iter` - + * - :ref:`batched` + - :meth:`~data.readers.grib.index.GribFieldList.batched` + * - :ref:`group_by` + - :meth:`~data.readers.grib.index.GribFieldList.group_by` * - :ref:`slice` - * - :ref:`sel` diff --git a/docs/guide/index.rst b/docs/guide/index.rst index feed122e..dc9deeae 100644 --- a/docs/guide/index.rst +++ b/docs/guide/index.rst @@ -8,6 +8,7 @@ User guide sources data data_format/index.rst + streams settings caching misc/index.rst diff --git a/docs/guide/sources.rst b/docs/guide/sources.rst index 263f708d..8c1cde9d 100644 --- a/docs/guide/sources.rst +++ b/docs/guide/sources.rst @@ -181,7 +181,7 @@ Further examples: url --- -.. py:function:: from_source("url", url, unpack=True, parts=None, stream=False, batch_size=1, group_by=None) +.. py:function:: from_source("url", url, unpack=True, parts=None, stream=False, read_all=False) :noindex: The ``url`` source will download the data from the address specified and store it in the :ref:`cache `. The supported data formats are the same as for the :ref:`file ` data source above. @@ -191,9 +191,8 @@ url :param bool unpack: for archive formats such as ``.zip``, ``.tar``, ``.tar.gz``, etc, *earthkit-data* will attempt to open it and extract any usable file. To keep the downloaded file as is use ``unpack=False`` :param parts: the :ref:`parts ` to read from the resource(s) specified by ``url``. Cannot be used when ``url`` already defines the :ref:`parts `. :type parts: pair, list or tuple of pairs, None - :param bool stream: when it is ``True`` the data is read as a stream. Otherwise the data is retrieved into a file and stored in the :ref:`cache `. This option only works for GRIB data. No archive formats supported (``unpack`` is ignored). ``stream`` only works for ``http`` and ``https`` URLs. - :param int batch_size: used when ``stream=True`` and ``group_by`` is unset. It defines how many GRIB messages are consumed from the stream and kept in memory at a time. For details see :ref:`stream source `. - :param group_by: used when ``stream=True`` and can specify one or more metadata keys to control how GRIB messages are read from the stream. For details see :ref:`stream source `. + :param bool stream: when it is ``True`` the data is read as a :ref:`stream `. Otherwise the data is retrieved into a file and stored in the :ref:`cache `. This option only works for GRIB data. No archive formats supported (``unpack`` is ignored). ``stream`` only works for ``http`` and ``https`` URLs. See details about streams :ref:`here `. + :param bool read_all: when it is ``True`` all the data is read straight to memory from a :ref:`stream `. Used when ``stream=True``. *New in version 0.8.0* :type group_by: str, list of str :param dict **kwargs: other keyword arguments specifying the request @@ -227,6 +226,7 @@ url Further examples: - :ref:`/examples/url.ipynb` + - :ref:`/examples/url_parts.ipynb` - :ref:`/examples/url_stream.ipynb` @@ -302,19 +302,16 @@ sample stream -------------- -.. py:function:: from_source("stream", stream, batch_size=1, group_by=None) +.. py:function:: from_source("stream", stream, read_all=False) :noindex: - The ``stream`` source will read data from a stream, which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods. At the moment, it only works for :ref:`grib` and CoverageJson data. - - :param stream: the stream - :param int batch_size: used when ``group_by`` is unset. It defines how many GRIB messages are consumed from the stream and kept in memory at a time. ``batch_size=0`` means all the messages will be loaded and stored in memory. When ``batch_size`` is not zero ``from_source`` gives us a stream iterator object. During the iteration temporary objects are created for each message then get deleted when going out of scope. Used when ``group_by`` is unset. - :param group_by: specify one or more metadata keys to control how GRIB messages are read from the stream. When it is set ``from_source`` gives us a stream iterator object. Each iteration step results in a Fieldlist object, which is built by consuming GRIB messages from the stream until the values of the ``group_by`` metadata keys change. The generated Fieldlist keeps GRIB messages in memory then gets deleted when going out of scope. When ``group_by`` is set ``batch_size`` is ignored. - :type group_by: str, list of str - :param dict **kwargs: other keyword arguments specifying the request + The ``stream`` source will read data from a stream (or streams), which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods. At the moment it only works for :ref:`grib` and CoverageJson data. For more details see :ref:`here `. + :param stream: the stream(s) + :type stream: stream, list, tuple + :param bool read_all: when it is ``True`` all the data is read into memory from a stream. Used when ``stream=True``. *New in version 0.8.0* - In the examples below, for simplicity, we create a stream from a :ref:`grib` file. By default (``batch_size=1``) we will consume one message at a time: + In the examples below, for simplicity, we create a file stream from a :ref:`grib` file. By default :ref:`from_source() ` returns an object that can only be used as an iterator. .. code-block:: python @@ -331,54 +328,44 @@ stream GribField(t,850,20070101,1200,0,0) GribField(z,850,20070101,1200,0,0) - - We can use ``group_by`` to read fields with a matching level. ``ds`` is still just an iterator, but ``f`` is now a :obj:`FieldList `: + We can also iterate through the stream in batches of fixed size using ``batched()``: .. code-block:: python >>> import earthkit.data >>> stream = open("docs/examples/test4.grib", "rb") - >>> ds = earthkit.data.from_source("stream", stream, group_by="level") - >>> for f in ds: - ... print(len(f)) - ... for g in f: - ... print(f" {g}") + >>> ds = earthkit.data.from_source("stream", stream, batch_size=2) + + # f is a FieldList + >>> for f in ds.batched(2): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") ... - 2 - GribField(t,500,20070101,1200,0,0) - GribField(z,500,20070101,1200,0,0) - 2 - GribField(t,850,20070101,1200,0,0) - GribField(z,850,20070101,1200,0,0) + len=2 [('t', 500), ('z', 500)] + len=2 [('t', 850), ('z', 850)] - We can use ``batch_size=2`` to read 2 messages at a time: + + When using ``group_by()`` we can iterate through the stream in groups defined by metadata keys. In this case each iteration step yields a :obj:`FieldList `. .. code-block:: python >>> import earthkit.data >>> stream = open("docs/examples/test4.grib", "rb") - >>> ds = earthkit.data.from_source("stream", stream, batch_size=2) + >>> ds = earthkit.data.from_source("stream", stream) - # f is a FieldList containing 2 GribFields - >>> for f in ds: - ... print(len(f)) - ... for g in f: - ... print(f" {g}") + # f is a FieldList + >>> for f in ds.group_by("level"): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") ... - 2 - GribField(t,500,20070101,1200,0,0) - GribField(z,500,20070101,1200,0,0) - 2 - GribField(t,850,20070101,1200,0,0) - GribField(z,850,20070101,1200,0,0) + len=2 [('t', 500), ('z', 500)] + len=2 [('t', 850), ('z', 850)] - With ``batch_size=0`` the whole stream will be consumed resulting in a FieldList object storing all the messages in memory. **Use this option carefully!** + We can consume the whole stream and load all the data into memory by using ``read_all=True`` in :ref:`from_source() `. **Use this option carefully!** .. code-block:: python >>> import earthkit.data >>> stream = open("docs/examples/test4.grib", "rb") - >>> ds = earthkit.data.from_source("stream", stream, batch_size=0) + >>> ds = earthkit.data.from_source("stream", stream, read_all=True) # ds is empty at this point, but calling any method on it will # consume the whole stream @@ -585,16 +572,14 @@ ecmwf-open-data fdb --- -.. py:function:: from_source("fdb", *args, stream=True, batch_size=1, group_by=None, **kwargs) +.. py:function:: from_source("fdb", *args, stream=True, read_all=False, **kwargs) :noindex: The ``fdb`` source accesses the `FDB (Fields DataBase) `_, which is a domain-specific object store developed at ECMWF for storing, indexing and retrieving GRIB data. earthkit-data uses the `pyfdb `_ package to retrieve data from FDB. :param tuple *args: positional arguments specifying the request as a dict - :param bool stream: when it is ``True`` the data is read as a stream. Otherwise the it is retrieved into a file and stored in the :ref:`cache `. Stream-based access only works for :ref:`grib` data. - :param int batch_size: used when ``stream=True`` and ``group_by`` is unset. It defines how many GRIB messages are consumed from the stream and kept in memory at a time. ``batch_size=0`` means all the data is read into memory. For details see :ref:`stream source `. - :param group_by: used when ``stream=True`` and can specify one or more metadata keys to control how GRIB messages are read from the stream. For details see :ref:`stream source `. - :type group_by: str, list of str + :param bool stream: when it is ``True`` the data is read as a :ref:`stream `. Otherwise it is retrieved into a file and stored in the :ref:`cache `. Stream-based access only works for :ref:`grib` and CoverageJson data. See details about streams :ref:`here `. + :param bool read_all: when it is ``True`` all the data is read into memory from a :ref:`stream `. Used when ``stream=True``. *New in version 0.8.0* :param dict **kwargs: other keyword arguments specifying the request The following example retrieves analysis :ref:`grib` data for 3 surface parameters as stream. @@ -607,7 +592,7 @@ fdb ... "class": "od", ... "expver": "0001", ... "stream": "oper", - ... "date": "20230607", + ... "date": "20240421", ... "time": [0, 12], ... "domain": "g", ... "type": "an", @@ -620,49 +605,50 @@ fdb >>> for f in ds: ... print(f) ... - GribField(msl,None,20230607,0,0,0) - GribField(2t,None,20230607,0,0,0) - GribField(2d,None,20230607,0,0,0) - GribField(msl,None,20230607,1200,0,0) - GribField(2t,None,20230607,1200,0,0) - GribField(2d,None,20230607,1200,0,0) + GribField(msl,None,20240421,0,0,0) + GribField(2t,None,20240421,0,0,0) + GribField(2d,None,20240421,0,0,0) + GribField(msl,None,20240421,1200,0,0) + GribField(2t,None,20240421,1200,0,0) + GribField(2d,None,20240421,1200,0,0) + We can also iterate through the stream in batches of fixed size using ``batched``: - We can use ``group_by`` to read fields with a matching time. ``ds`` is still just an iterator, but ``f`` is now a :obj:`FieldList `: + .. code-block:: python - >>> ds = earthkit.data.from_source("fdb", request, group_by="time") - >>> for f in ds: - ... print(f) - ... for g in f: - ... print(f" {g}") + >>> ds = earthkit.data.from_source("fdb", request) + >>> for f in ds.batched(2): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") ... - - GribField(msl,None,20230607,0,0,0) - GribField(2t,None,20230607,0,0,0) - GribField(2d,None,20230607,0,0,0) - - GribField(msl,None,20230607,1200,0,0) - GribField(2t,None,20230607,1200,0,0) - GribField(2d,None,20230607,1200,0,0) - - We can use ``batch_size=2`` to read 2 fields at a time. ``ds`` is still just an iterator, but ``f`` is now a :obj:`FieldList ` containing 2 fields: - - >>> ds = earthkit.data.from_source("fdb", request, batch_size=2) - >>> for f in ds: - ... print(f) - ... for g in f: - ... print(f" {g}") + len=2 [('msl', 0), ('2t', 0)] + len=2 [('2d', 0), ('msl', 0)] + len=2 [('2t', 0), ('2d', 0)] + + + When using ``group_by()`` we can iterate through the stream in groups defined by metadata keys. In this case each iteration step yields a :obj:`FieldList `. + + .. code-block:: python + + >>> ds = earthkit.data.from_source("fdb", request) + >>> for f in ds.group_by("time"): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") ... - - GribField(msl,None,20230607,0,0,0) - GribField(2t,None,20230607,0,0,0) - - GribField(2d,None,20230607,0,0,0) - GribField(msl,None,20230607,1200,0,0) - - GribField(2t,None,20230607,1200,0,0) - GribField(2d,None,20230607,1200,0,0) + len=3 [('msl', 0), ('2t', 0), ('2d', 0)] + len=3 [('msl', 0), ('2t', 0), ('2d', 0)] + + We can consume the whole stream and load all the data into memory by using ``read_all=True`` in :ref:`from_source() `. **Use this option carefully!** + + .. code-block:: python + >>> import earthkit.data + >>> ds = earthkit.data.from_source("fdb", request, read_all=True) + + # ds is empty at this point, but calling any method on it will + # consume the whole stream + >>> len(ds) + 3 + + # now ds stores all the messages in memory Further examples: @@ -740,7 +726,7 @@ opendap polytope -------- -.. py:function:: from_source("polytope", collection, *args, address=None, user_email=None, user_key=None, stream=True, batch_size=1, group_by=None, **kwargs) +.. py:function:: from_source("polytope", collection, *args, address=None, user_email=None, user_key=None, stream=True, read_all=False, **kwargs) :noindex: The ``polytope`` source accesses the `Polytope web services `_ , using the polytope-client_ package. @@ -748,12 +734,10 @@ polytope :param str collection: the name of the polytope collection :param tuple *args: specify the request as a dict :param str address: specify the address of the polytope service - :param str user_email: specify the user email credential. Must be used together with ``user_key``. This is an alternative to using the ``POLYTOPE_USER_EMAIL`` environment variable. *Added in version 0.7.0* - :param str user_key: specify the user key credential. Must be used together with ``user_email``. This is an alternative to using the ``POLYTOPE_USER_KEY`` environment variable. *Added in version 0.7.0* - :param bool stream: when it is ``True`` the data is read as a stream. Otherwise the data is retrieved into a file and stored in the :ref:`cache `. Stream-based access only works for :ref:`grib` and CoverageJson data. - :param int batch_size: used when ``stream=True`` and ``group_by`` is unset. It defines how many GRIB messages are consumed from the stream and kept in memory at a time. ``batch_size=0`` means all the data is read into memory. For details see :ref:`stream source `. - :param group_by: used when ``stream=True`` and can specify one or more metadata keys to control how GRIB messages are read from the stream. For details see :ref:`stream source `. - :type group_by: str, list of str + :param str user_email: specify the user email credential. Must be used together with ``user_key``. This is an alternative to using the ``POLYTOPE_USER_EMAIL`` environment variable. *New in version 0.7.0* + :param str user_key: specify the user key credential. Must be used together with ``user_email``. This is an alternative to using the ``POLYTOPE_USER_KEY`` environment variable. *New in version 0.7.0* + :param bool stream: when ``True`` the data is read as a :ref:`stream `. Otherwise it is retrieved into a file and stored in the :ref:`cache `. Stream-based access only works for :ref:`grib` and CoverageJson data. See details about streams :ref:`here `. + :param bool read_all: when ``True`` all the data is read into memory from a :ref:`stream `. Used when ``stream=True``. *New in version 0.8.0* :param dict **kwargs: other keyword arguments, these can include options passed to the polytope-client_ @@ -789,7 +773,6 @@ polytope - :ref:`/examples/polytope.ipynb` - .. _data-sources-wekeo: wekeo diff --git a/docs/guide/streams.rst b/docs/guide/streams.rst new file mode 100644 index 00000000..9b3ed0f1 --- /dev/null +++ b/docs/guide/streams.rst @@ -0,0 +1,73 @@ +.. _streams: + +Streams +========== + +We can read :ref:`grib` and CoverageJson data as a stream by using the ``stream=True`` option in :func:`from_source`. It is only available for the following sources: + +- :ref:`data-sources-url` +- :ref:`data-sources-fdb` +- :ref:`data-sources-polytope` + +Iterating over a stream +------------------------ + +When reading a stream the returned object offers limited access to the data and primarily serves as an iterator. Once the iteration is finished the stream data is not available any longer. + +The example below shows how we iterate through a GRIB data stream field by field: + +.. code-block:: python + + >>> import earthkit.data + >>> url = "https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib" + >>> ds = earthkit.data.from_source("url", url, stream=True) + >>> for f in fields: + ... print(f) + ... + GribField(t,1000,20180801,1200,0,0) + GribField(u,1000,20180801,1200,0,0) + GribField(v,1000,20180801,1200,0,0) + GribField(t,850,20180801,1200,0,0) + GribField(u,850,20180801,1200,0,0) + GribField(v,850,20180801,1200,0,0) + +We can also use :meth:`~data.core.fieldlist.FieldList.batched` to iterate in batches of fixed size. Each iteration step now yields a :class:`Fieldlist`. + +.. code-block:: python + + >>> import earthkit.data + >>> url = "https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib" + >>> ds = earthkit.data.from_source("url", url, stream=True) + >>> for f in ds.batched(2): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") + ... + len=2 [('t', 1000), ('u', 1000)] + len=2 [('v', 1000), ('t', 850)] + len=2 [('u', 850), ('v', 850)] + +Another option is to use :meth:`~data.core.fieldlist.FieldList.group_by` to iterate in groups defined by metadata keys. Each iteration step results in a :class:`Fieldlist`, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. + +.. code-block:: python + + >>> import earthkit.data + >>> url = "https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib" + >>> ds = earthkit.data.from_source("url", url, stream=True) + >>> for f in ds._group_by("level"): + ... print(f"len={len(f)} {f.metadata(('param', 'level'))}") + ... + len=3 [('t', 1000), ('u', 1000), ('v', 1000)] + len=3 [('t', 850), ('u', 850), ('v', 850)] + + +Reading all the data into memory +---------------------------------- + +We can load the whole stream into memory by using ``read_all=True`` in :func:`from_source`. The resulting object will be a :py:class:`FieldList` storing all the GRIB messages in memory. **Use this option carefully!** + +.. code-block:: python + + >>> import earthkit.data + >>> url = "https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib" + >>> ds = earthkit.data.from_source("url", url, stream=True, read_all=True) + >>> len(ds) + 6 diff --git a/src/earthkit/data/core/__init__.py b/src/earthkit/data/core/__init__.py index 8bb8337b..31c13072 100644 --- a/src/earthkit/data/core/__init__.py +++ b/src/earthkit/data/core/__init__.py @@ -152,3 +152,11 @@ def _not_implemented(self): if hasattr(self, "path"): extra = f" on {self.path}" raise NotImplementedError(f"{module}.{name}.{func}(){extra}") + + def batched(self, *args): + """Return iterator for batches of data""" + self._not_implemented() + + def group_by(self, *args): + """Return iterator for batches of data grouped by metadata keys""" + self._not_implemented() diff --git a/src/earthkit/data/core/fieldlist.py b/src/earthkit/data/core/fieldlist.py index b6a799e3..36cb1368 100644 --- a/src/earthkit/data/core/fieldlist.py +++ b/src/earthkit/data/core/fieldlist.py @@ -653,6 +653,10 @@ def _init_from_mask(self, index): def _init_from_multi(self, index): self._array_backend = index._indexes[0].array_backend + @staticmethod + def from_fields(fields): + raise NotImplementedError + @staticmethod def from_numpy(array, metadata): from earthkit.data.sources.array_list import ArrayFieldList diff --git a/src/earthkit/data/core/index.py b/src/earthkit/data/core/index.py index f0634491..4db9ac8b 100644 --- a/src/earthkit/data/core/index.py +++ b/src/earthkit/data/core/index.py @@ -531,6 +531,9 @@ def from_mask(self, lst): def from_multi(self, a): import numpy as np + if not isinstance(a, list): + a = list(a) + # will raise IndexError if an index is out of bounds n = len(self) indices = np.arange(0, n if n > 0 else 0) @@ -553,6 +556,51 @@ def to_numpy(self, *args, **kwargs): def full(self, *coords): return FullIndex(self, *coords) + def batched(self, n): + """Iterate through the object in batches of ``n``. + + Parameters + ---------- + n: int + Batch size. + + Returns + ------- + object + Returns an iterator yielding batches of ``n`` elements. Each batch is a new object + containing a view to the data in the original object, so no data is copied. The last + batch may contain fewer than ``n`` elements. + + """ + from earthkit.data.utils.batch import batched + + return batched(self, n, mode="indexed") + + def group_by(self, *keys, sort=True): + """Iterate through the object in groups defined by metadata keys. + + Parameters + ---------- + *keys: tuple + Positional arguments specifying the metadata keys to group by. + Keys can be a single or multiple str, or a list or tuple of str. + + sort: bool, optional + If ``True`` (default), the object is sorted by the metadata ``keys`` before grouping. + Sorting is only applied if the object is supporting the sorting operation. + + Returns + ------- + object + Returns an iterator yielding batches of elements grouped by the metadata ``keys``. Each + batch is a new object containing a view to the data in the original object, so no data + is copied. It generates a new group every time the value of the ``keys`` change. + + """ + from earthkit.data.utils.batch import group_by + + return group_by(self, *keys, sort=sort, mode="indexed") + class MaskIndex(Index): def __init__(self, index, indices): diff --git a/src/earthkit/data/readers/covjson.py b/src/earthkit/data/readers/covjson.py index 3d366343..048d8bf1 100644 --- a/src/earthkit/data/readers/covjson.py +++ b/src/earthkit/data/readers/covjson.py @@ -47,6 +47,9 @@ class CovjsonStreamReader(Reader): def __init__(self, stream): self.stream = stream + def __iter__(self): + return self + def __next__(self): import json diff --git a/src/earthkit/data/readers/grib/memory.py b/src/earthkit/data/readers/grib/memory.py index b75a902c..f6bbafdf 100644 --- a/src/earthkit/data/readers/grib/memory.py +++ b/src/earthkit/data/readers/grib/memory.py @@ -20,7 +20,7 @@ class GribMemoryReader(Reader): - def __init__(self, array_backend=None): + def __init__(self, array_backend=None, **kwargs): self._peeked = None self._array_backend = ensure_backend(array_backend) @@ -36,6 +36,7 @@ def __next__(self): msg = self._message_from_handle(handle) if handle is not None: return msg + self.consumed_ = True raise StopIteration def _next_handle(self): @@ -47,46 +48,17 @@ def _message_from_handle(self, handle): GribCodesHandle(handle, None, None), self._array_backend ) - def peek(self): - """Returns the next available message without consuming it""" - if self._peeked is None: - handle = self._next_handle() - self._peeked = self._message_from_handle(handle) - return self._peeked - - def read_batch(self, n): - fields = [] - for _ in range(n): - try: - fields.append(self.__next__()) - except StopIteration: - break - if not fields: - raise StopIteration + def batched(self, n): + from earthkit.data.utils.batch import batched - return GribFieldListInMemory.from_fields(fields) + return batched(self, n, create=self.to_fieldlist) + + def group_by(self, *args, **kwargs): + from earthkit.data.utils.batch import group_by - def read_group(self, group): - assert isinstance(group, list) - - fields = [] - current_group = {} - while True: - f = self.peek() - if f is not None: - group_md = f._attributes(group) - if not current_group: - current_group = group_md - if current_group == group_md: - fields.append(f) - self.__next__() - else: - break - elif fields: - break - else: - raise StopIteration + return group_by(self, *args, create=self.to_fieldlist, sort=False) + def to_fieldlist(self, fields): return GribFieldListInMemory.from_fields(fields) @@ -126,7 +98,7 @@ class GribStreamReader(GribMemoryReader): """ def __init__(self, stream, **kwargs): - super().__init__() + super().__init__(**kwargs) self._stream = stream self._reader = eccodes.StreamReader(stream) @@ -158,6 +130,10 @@ def handle(self): def offset(self): return None + @staticmethod + def to_fieldlist(fields): + return GribFieldListInMemory.from_fields(fields) + class GribFieldListInMemory(GribFieldList, Reader): """Represent a GRIB field list in memory""" diff --git a/src/earthkit/data/sources/fdb.py b/src/earthkit/data/sources/fdb.py index 738d7106..bd8f3393 100644 --- a/src/earthkit/data/sources/fdb.py +++ b/src/earthkit/data/sources/fdb.py @@ -28,8 +28,14 @@ class FDBSource(Source): def __init__(self, *args, stream=True, **kwargs): super().__init__() - self._stream_kwargs = dict() for k in ["group_by", "batch_size"]: + if k in kwargs: + raise ValueError( + f"Invalid argument '{k}' for FDBSource. Deprecated since 0.8.0." + ) + + self._stream_kwargs = dict() + for k in ["read_all"]: if k in kwargs: self._stream_kwargs[k] = kwargs.pop(k) diff --git a/src/earthkit/data/sources/file.py b/src/earthkit/data/sources/file.py index 03fb0d25..870b8aef 100644 --- a/src/earthkit/data/sources/file.py +++ b/src/earthkit/data/sources/file.py @@ -205,6 +205,12 @@ def path(self, v): def parts(self): return self._path_and_parts.parts + def batched(self, *args): + return self._reader.batched(*args) + + def group_by(self, *args): + return self._reader.group_by(*args) + class IndexedFileSource(FileSource): def mutate(self): diff --git a/src/earthkit/data/sources/memory.py b/src/earthkit/data/sources/memory.py index 365a1457..d17a3772 100644 --- a/src/earthkit/data/sources/memory.py +++ b/src/earthkit/data/sources/memory.py @@ -84,6 +84,12 @@ def bounding_box(self): def statistics(self, **kwargs): return self._reader.statistics(**kwargs) + def batched(self, *args): + return self._reader.batched(*args) + + def group_by(self, *args): + return self._reader.group_by(*args) + class MemorySource(MemoryBaseSource): def __init__(self, buf, **kwargs): diff --git a/src/earthkit/data/sources/polytope.py b/src/earthkit/data/sources/polytope.py index 6230ed86..e62cc0a5 100644 --- a/src/earthkit/data/sources/polytope.py +++ b/src/earthkit/data/sources/polytope.py @@ -69,8 +69,14 @@ def __init__( super().__init__() assert isinstance(dataset, str) - self._stream_kwargs = dict() for k in ["group_by", "batch_size"]: + if k in kwargs: + raise ValueError( + f"Invalid argument '{k}' for Polytope. Deprecated since 0.8.0." + ) + + self._stream_kwargs = dict() + for k in ["read_all"]: if k in kwargs: self._stream_kwargs[k] = kwargs.pop(k) diff --git a/src/earthkit/data/sources/stream.py b/src/earthkit/data/sources/stream.py index 37e9ca56..4751e0b1 100644 --- a/src/earthkit/data/sources/stream.py +++ b/src/earthkit/data/sources/stream.py @@ -7,9 +7,10 @@ # nor does it submit to any jurisdiction. # +import itertools import logging -from collections import deque +from earthkit.data.core.fieldlist import FieldList from earthkit.data.readers import stream_reader from earthkit.data.sources.memory import MemoryBaseSource @@ -19,47 +20,25 @@ def parse_stream_kwargs(**kwargs): - group_by = kwargs.pop("group_by", None) - batch_size = kwargs.pop("batch_size", 1) - batch_size, group_by = check_stream_kwargs(batch_size, group_by) - stream_kwargs = dict(batch_size=batch_size, group_by=group_by) + read_all = kwargs.pop("read_all", False) + stream_kwargs = dict(read_all=read_all) return (stream_kwargs, kwargs) -def check_stream_kwargs(batch_size, group_by): - if group_by is None: - group_by = [] - - if isinstance(group_by, str): - group_by = [group_by] - else: - try: - group_by = list(group_by) - except Exception: - raise TypeError(f"unsupported types in group_by={group_by}") - - if group_by and not all([isinstance(x, str) for x in group_by]): - raise TypeError(f"group_by={group_by} must contain str values") - - if batch_size is None: - batch_size = 1 - - if batch_size < 0: - raise ValueError(f"batch_size={batch_size} cannot be negative") - - return (batch_size, group_by) - - class StreamMemorySource(MemoryBaseSource): def __init__(self, stream, **kwargs): super().__init__(**kwargs) + if not isinstance(stream, Stream): + raise ValueError(f"Invalid stream={stream}") self._stream = stream self._reader_ = None @property def _reader(self): if self._reader_ is None: - self._reader_ = stream_reader(self, self._stream, True, **self._kwargs) + self._reader_ = stream_reader( + self, self._stream.stream, True, **self._kwargs + ) if self._reader_ is None: raise TypeError(f"could not create reader for stream={self._stream}") return self._reader_ @@ -72,209 +51,198 @@ def mutate(self): return self -class StreamSourceBase(Source): - def __init__(self, stream, *, batch_size=1, group_by=None, **kwargs): +class StreamSource(Source): + def __init__(self, stream, *, read_all=False, **kwargs): super().__init__() self._reader_ = None - self._stream = stream - self.batch_size, self.group_by = check_stream_kwargs(batch_size, group_by) + self._stream = self._wrap_stream(stream) + self.memory = read_all + for k in ["group_by", "batch_size"]: + if k in kwargs: + raise ValueError( + f"Invalid argument '{k}' for StreamSource. Deprecated since 0.8.0." + ) + self._kwargs = kwargs def __iter__(self): - return self + return iter(self._reader) def mutate(self): + if isinstance(self._stream, (list, tuple)): + return MultiStreamSource(self._stream, read_all=self.memory) + else: + if self.memory: + return StreamMemorySource(self._stream, **self._kwargs) + elif hasattr(self._reader, "to_fieldlist"): + return StreamFieldList(self._reader, **self._kwargs) return self @property def _reader(self): if self._reader_ is None: - self._reader_ = stream_reader(self, self._stream, False, **self._kwargs) + self._reader_ = stream_reader( + self, self._stream.stream, False, **self._kwargs + ) if self._reader_ is None: - raise TypeError(f"could not create reader for stream={self._stream}") + raise TypeError( + f"could not create reader for stream={self._stream.stream}" + ) return self._reader_ - -class StreamSingleSource(StreamSourceBase): - def __init__(self, stream, **kwargs): - super().__init__(stream, **kwargs) - assert self.batch_size == 1 - - def __next__(self): - return self._reader.__next__() - - -class StreamBatchSource(StreamSourceBase): - def __init__(self, stream, **kwargs): - super().__init__(stream, **kwargs) - assert self.batch_size > 1 - - def __next__(self): - return self._get_batch(self.batch_size) - - def _get_batch(self, n): - return self._reader.read_batch(n) - - -class StreamGroupSource(StreamSourceBase): - def __init__(self, stream, **kwargs): - super().__init__(stream, **kwargs) - assert self.group_by - - def __next__(self): - return self._reader.read_group(self.group_by) + def batched(self, n): + """Iterate through the stream in batches of ``n``. + + Parameters + ---------- + n: int + Batch size. + + Returns + ------- + object + Returns an iterator yielding batches of ``n`` elements. Each batch is a new object + containing a view to the data in the original object, so no data is copied. The last + batch may contain fewer than ``n`` elements. + + """ + return self._reader.batched(n) + + def group_by(self, *keys, **kwargs): + """Iterate through the stream in groups defined by metadata keys. + + Parameters + ---------- + *keys: tuple + Positional arguments specifying the metadata keys to group by. + Keys can be a single or multiple str, or a list or tuple of str. + + Returns + ------- + object + Returns an iterator yielding batches of elements grouped by the metadata ``keys``. Each + batch is a new object containing a view to the data in the original object, so no data + is copied. It generates a new group every time the value of the ``keys`` change. + + """ + return self._reader.group_by(*keys) + + def _wrap_stream(self, stream): + if isinstance(stream, (list, tuple)): + r = [] + for s in stream: + if not isinstance(s, Stream): + r.append(Stream(s)) + else: + r.append(s) + return r + elif not isinstance(stream, Stream): + return Stream(stream) + + return stream + + def _status(self): + """For testing purposes.""" + return { + "reader": self._reader_ is not None, + "stream": self._stream._stream is not None, + } class MultiStreamSource(Source): - def __init__(self, sources, group_by=None, batch_size=1): - self.sources = sources - if not isinstance(self.sources, deque): - self.sources = deque(self.sources) - self.group_by = group_by - self.batch_size = batch_size - self.current = None + def __init__(self, sources, read_all=False, **kwargs): + super().__init__(**kwargs) + self.memory = read_all + self.sources = self._from_sources(sources) def mutate(self): - if not self.group_by: - if self.batch_size == 0: - from .multi import MultiSource - - return MultiSource([s() for s in self.sources]) - elif self.batch_size > 1: - return MultiStreamBatchSource( - self.sources, batch_size=self.batch_size, group_by=self.group_by - ) + if self.memory: + from .multi import MultiSource + + return MultiSource([s.mutate() for s in self.sources]) + else: + first = self.sources[0] + if hasattr(first._reader, "to_fieldlist"): + return StreamFieldList(self, **self._kwargs) return self def __iter__(self): - return self + return itertools.chain(*self.sources) - def _next_source(self): - try: - return self.sources.popleft()() - except IndexError: - self.sources.clear() - pass - - def __next__(self): - if self.current is None: - self.current = self._next_source() - if self.current is None: - raise StopIteration - - try: - return self.current.__next__() - except StopIteration: - self.current = self._next_source() - if self.current is None: - raise StopIteration - return self.current.__next__() - - -class MultiStreamBatchSource(MultiStreamSource): - def mutate(self): - return self + def batched(self, n): + from earthkit.data.utils.batch import batched - def __next__(self): - if self.current is None: - if self.sources: - self.current = self._next_source() - else: - raise StopIteration - - delta = self.batch_size - try: - r = self.current._get_batch(self.batch_size) - delta -= len(r) - except StopIteration: - r = None - - while delta > 0: - self.current = self._next_source() - if self.current is None: - break - else: - try: - r1 = self.current._get_batch(delta) - assert r1 is not None - assert len(r1) > 0 - r = r + r1 if r is not None else r1 - assert len(r) > 0 - delta = self.batch_size - len(r) - except StopIteration: - break - - if r is None or len(r) == 0: - raise StopIteration + return batched(self, n) - return r + def group_by(self, *args): + from earthkit.data.utils.batch import group_by + return group_by(self, *args) -class StreamSource(StreamSourceBase): - def __init__(self, stream, **kwargs): - super().__init__(stream, **kwargs) - - def mutate(self): - assert self._reader_ is None + def _from_sources(self, sources): + r = [] + for s in sources: + if isinstance(s, StreamSource): + r.append(s) + elif isinstance(s, Stream): + r.append(StreamSource(s, read_all=self.memory)) + else: + raise TypeError(f"Invalid source={s}") + return r - return _from_stream( - self._stream, - batch_size=self.batch_size, - group_by=self.group_by, - **self._kwargs, - ) + def _status(self): + """For testing purposes.""" + return [s._status() for s in self.sources] -class StreamSourceMaker: - def __init__(self, source, stream_kwargs, **kwargs): - self.in_source = source - self._kwargs = kwargs - self.stream_kwargs = dict(stream_kwargs) - self.source = None +class StreamFieldList(FieldList, Source): + def __init__(self, source, **kwargs): + FieldList.__init__(self, **kwargs) + self._source = source - def __call__(self): - if self.source is None: - stream = self.in_source.to_stream() - self.source = _from_stream(stream, **self.stream_kwargs, **self._kwargs) + def mutate(self): + return self - prev = None - src = self.source - while src is not prev: - prev = src - src = src.mutate() - self.source = src + def __iter__(self): + return iter(self._source) - return self.source + def batched(self, n): + return self._source.batched(n) + def group_by(self, *keys, **kwargs): + return self._source.group_by(*keys) -def _from_stream(stream, group_by, batch_size, **kwargs): - _kwargs = dict(batch_size=batch_size, group_by=group_by) - if group_by: - return StreamGroupSource(stream, **_kwargs, **kwargs) - elif batch_size == 0: - return StreamMemorySource(stream, **kwargs) - elif batch_size > 1: - return StreamBatchSource(stream, **_kwargs, **kwargs) - elif batch_size == 1: - return StreamSingleSource(stream, **_kwargs, **kwargs) +class Stream: + def __init__(self, stream=None, maker=None, **kwargs): + self._stream = stream + self.maker = maker + self.kwargs = kwargs + if self._stream is None and self.maker is None: + raise ValueError("Either stream or maker must be provided") - raise ValueError(f"Unsupported stream parameters {batch_size=} {group_by=}") + @property + def stream(self): + if self._stream is None: + self._stream = self.maker() + return self._stream -def _from_source(source, **kwargs): +def make_stream_source_from_other(source, **kwargs): stream_kwargs, kwargs = parse_stream_kwargs(**kwargs) if not isinstance(source, (list, tuple)): source = [source] + for i, s in enumerate(source): + stream = Stream(maker=s.to_stream) + source[i] = StreamSource(stream, **stream_kwargs, **kwargs) + if len(source) == 1: - maker = StreamSourceMaker(source[0], stream_kwargs, **kwargs) - return maker() + return source[0] else: - sources = [StreamSourceMaker(s, stream_kwargs, **kwargs) for s in source] - return MultiStreamSource(sources, **stream_kwargs) + return MultiStreamSource(source, **stream_kwargs) source = StreamSource diff --git a/src/earthkit/data/sources/url.py b/src/earthkit/data/sources/url.py index 85d0b1db..dab94acd 100644 --- a/src/earthkit/data/sources/url.py +++ b/src/earthkit/data/sources/url.py @@ -273,9 +273,9 @@ def mutate(self): ) ) - from .stream import _from_source + from .stream import make_stream_source_from_other - return _from_source(s, **self._kwargs) + return make_stream_source_from_other(s, **self._kwargs) else: return super().mutate() @@ -435,9 +435,9 @@ def __init__( raise NotImplementedError(f"Streams are not supported for {o.scheme} urls") def mutate(self): - from .stream import _from_source + from .stream import make_stream_source_from_other - return _from_source(self, **self._kwargs) + return make_stream_source_from_other(self, **self._kwargs) def to_stream(self): downloader = Downloader( diff --git a/src/earthkit/data/utils/batch.py b/src/earthkit/data/utils/batch.py new file mode 100644 index 00000000..34af316c --- /dev/null +++ b/src/earthkit/data/utils/batch.py @@ -0,0 +1,117 @@ +# (C) Copyright 2020 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + + +from abc import ABCMeta, abstractmethod + + +class Iter(metaclass=ABCMeta): + def __init__(self, data, create=None): + self.data = data + self.create = create + + @staticmethod + def _create(obj, batch): + if hasattr(obj, "from_list"): + return obj.from_list + elif hasattr(obj, "from_fields"): + return obj.from_fields + elif len(batch) > 0 and hasattr(batch[0], "to_fieldlist"): + return batch[0].to_fieldlist + + return lambda x: x + + @abstractmethod + def _iterator(self, data): + pass + + @abstractmethod + def _from_batch(self, obj, batch): + pass + + @abstractmethod + def _metadata(self, data, keys): + pass + + def batched(self, n): + if n < 1: + raise ValueError("n must be at least one") + + from itertools import islice + + it = self._iterator(self.data) + while batch := tuple(islice(it, n)): + yield self._from_batch(self.data, batch) + + def group_by(self, *args, sort=True): + keys = self._flatten(args) + + r = self.data.order_by(*keys) if sort else self.data + + from itertools import groupby + + it = self._iterator(r) + for batch in groupby(it, self._metadata(r, keys)): + batch = list(batch[1]) + yield self._from_batch(r, batch) + + def _flatten(self, keys): + _keys = [] + for v in keys: + if isinstance(v, str): + _keys.append(v) + elif isinstance(v, (tuple, list)): + _keys.extend(v) + return _keys + + +class BasicIter(Iter): + def _iterator(self, data): + return iter(data) + + def _from_batch(self, obj, batch): + if self.create is None: + self.create = self._create(obj, batch) + return self.create(batch) + + def _metadata(self, data, keys): + return lambda f: f._attributes(keys) + + +class IndexedIter(Iter): + def _iterator(self, data): + print(f"{data=} {len(data)}") + return iter(range(len(data))) + + def _from_batch(self, obj, batch): + if len(batch) >= 2: + batch = slice(batch[0], batch[-1] + 1) + return obj[batch] + + def _metadata(self, data, keys): + return lambda idx: data[idx]._attributes(keys) + + +def batched(data, n, mode="iter", create=None): + it = _ITERS.get(mode, None) + if it is not None: + return it(data, create=create).batched(n) + else: + raise ValueError(f"invalid mode={mode}") + + +def group_by(data, *args, mode="iter", sort=True, create=None): + it = _ITERS.get(mode, None) + if it is not None: + return it(data, create=create).group_by(*args, sort=sort) + else: + raise ValueError(f"invalid mode={mode}") + + +_ITERS = {"iter": BasicIter, "indexed": IndexedIter} diff --git a/tests/data/test4.nc b/tests/data/test4.nc new file mode 100644 index 00000000..301468b8 Binary files /dev/null and b/tests/data/test4.nc differ diff --git a/tests/data/test6.nc b/tests/data/test6.nc new file mode 100644 index 00000000..b8fd58d4 Binary files /dev/null and b/tests/data/test6.nc differ diff --git a/tests/grib/test_grib_iter.py b/tests/grib/test_grib_iter.py new file mode 100644 index 00000000..501cde2b --- /dev/null +++ b/tests/grib/test_grib_iter.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 + +# (C) Copyright 2020 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + + +import pytest + +from earthkit.data import from_source +from earthkit.data.testing import ARRAY_BACKENDS, earthkit_examples_file + + +@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) +@pytest.mark.parametrize("group", ["param"]) +def test_grib_group_by(array_backend, group): + ds = from_source( + "file", earthkit_examples_file("test6.grib"), array_backend=array_backend + ) + + ref = [ + [("t", 1000), ("t", 850)], + [("u", 1000), ("u", 850)], + [("v", 1000), ("v", 850)], + ] + cnt = 0 + for i, f in enumerate(ds.group_by(group)): + assert len(f) == 2 + assert f.metadata(("param", "level")) == ref[i] + afl = f.to_fieldlist(array_backend=array_backend) + assert afl is not f + assert len(afl) == 2 + cnt += len(f) + + assert cnt == len(ds) + + +@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) +@pytest.mark.parametrize("group", ["level", ["level", "gridType"]]) +def test_grib_multi_group_by(array_backend, group): + ds = from_source( + "file", + [earthkit_examples_file("test4.grib"), earthkit_examples_file("test6.grib")], + array_backend=array_backend, + ) + + ref = [ + [("t", 500), ("z", 500)], + [("t", 850), ("z", 850), ("t", 850), ("u", 850), ("v", 850)], + [("t", 1000), ("u", 1000), ("v", 1000)], + ] + cnt = 0 + for i, f in enumerate(ds.group_by(group)): + assert f.metadata(("param", "level")) == ref[i] + cnt += len(f) + + assert cnt == len(ds) + + +@pytest.mark.parametrize( + "_kwargs,expected_meta", + [ + ({"n": 1}, [["t"], ["u"], ["v"], ["t"], ["u"], ["v"]]), + ({"n": 3}, [["t", "u", "v"], ["t", "u", "v"]]), + ({"n": 4}, [["t", "u", "v", "t"], ["u", "v"]]), + ], +) +def test_grib_batched(_kwargs, expected_meta): + ds = from_source("file", earthkit_examples_file("test6.grib")) + + cnt = 0 + for i, f in enumerate(ds.batched(_kwargs["n"])): + assert len(f) == len(expected_meta[i]) + f.metadata("param") == expected_meta[i] + cnt += len(f) + + assert cnt == len(ds) + + +@pytest.mark.parametrize( + "_kwargs,expected_meta", + [ + ({"n": 1}, [["2t"], ["msl"], ["t"], ["z"], ["t"], ["z"]]), + ({"n": 2}, [["2t", "msl"], ["t", "z"], ["t", "z"]]), + ({"n": 3}, [["2t", "msl", "t"], ["z", "t", "z"]]), + ({"n": 4}, [["2t", "msl", "t", "z"], ["t", "z"]]), + ], +) +def test_grib_multi_batched(_kwargs, expected_meta): + ds = from_source( + "file", + [earthkit_examples_file("test.grib"), earthkit_examples_file("test4.grib")], + ) + + cnt = 0 + n = _kwargs["n"] + for i, f in enumerate(ds.batched(n)): + assert len(f) == len(expected_meta[i]) + f.metadata("param") == expected_meta[i] + cnt += len(f) + + assert cnt == len(ds) + + +if __name__ == "__main__": + from earthkit.data.testing import main + + main() diff --git a/tests/grib/test_grib_stream.py b/tests/grib/test_grib_stream.py index 59d4a1f8..49711b56 100644 --- a/tests/grib/test_grib_stream.py +++ b/tests/grib/test_grib_stream.py @@ -14,6 +14,7 @@ from earthkit.data import from_source from earthkit.data.core.temporary import temp_file +from earthkit.data.sources.stream import StreamFieldList from earthkit.data.testing import ARRAY_BACKENDS, earthkit_examples_file @@ -21,107 +22,28 @@ def repeat_list_items(items, count): return sum([[x] * count for x in items], []) -@pytest.mark.parametrize( - "_kwargs,error", - [ - # (dict(order_by="level"), TypeError), - (dict(group_by=1), TypeError), - (dict(group_by=["level", 1]), TypeError), - # (dict(group_by="level", batch_size=1), TypeError), - (dict(batch_size=-1), ValueError), - ], -) -def test_grib_from_stream_invalid_args(_kwargs, error): - with open(earthkit_examples_file("test6.grib"), "rb") as stream: - with pytest.raises(error): - from_source("stream", stream, **_kwargs) - - -@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) -@pytest.mark.parametrize( - "_kwargs", - [ - {"group_by": "level"}, - {"group_by": "level", "batch_size": 0}, - {"group_by": "level", "batch_size": 1}, - {"group_by": ["level", "gridType"]}, - ], -) -def test_grib_from_stream_group_by(array_backend, _kwargs): - with open(earthkit_examples_file("test6.grib"), "rb") as stream: - fs = from_source("stream", stream, **_kwargs, array_backend=array_backend) - - # no methods are available - with pytest.raises(TypeError): - len(fs) - - ref = [ - [("t", 1000), ("u", 1000), ("v", 1000)], - [("t", 850), ("u", 850), ("v", 850)], - ] - for i, f in enumerate(fs): - assert len(f) == 3 - assert f.metadata(("param", "level")) == ref[i] - afl = f.to_fieldlist(array_backend=array_backend) - assert afl is not f - assert len(afl) == 3 - - # stream consumed, no data is available - assert sum([1 for _ in fs]) == 0 - - -@pytest.mark.parametrize( - "convert_kwargs,expected_shape", - [ - ({}, (3, 7, 12)), - (None, (3, 7, 12)), - (None, (3, 7, 12)), - ({"flatten": False}, (3, 7, 12)), - ({"flatten": True}, (3, 84)), - ], -) -def test_grib_from_stream_group_by_convert_to_numpy(convert_kwargs, expected_shape): - group_by = "level" - with open(earthkit_examples_file("test6.grib"), "rb") as stream: - ds = from_source("stream", stream, group_by=group_by) - - # no fieldlist methods are available on a StreamSource - with pytest.raises(TypeError): - len(ds) - - ref = [ - [("t", 1000), ("u", 1000), ("v", 1000)], - [("t", 850), ("u", 850), ("v", 850)], - ] - - if convert_kwargs is None: - convert_kwargs = {} - - for i, f in enumerate(ds): - df = f.to_fieldlist(array_backend="numpy", **convert_kwargs) - assert len(df) == 3 - assert df.metadata(("param", "level")) == ref[i] - assert df._array.shape == expected_shape - assert df.to_numpy(**convert_kwargs).shape == expected_shape - assert df.to_fieldlist(array_backend="numpy", **convert_kwargs) is df - - # stream consumed, no data is available - assert sum([1 for _ in ds]) == 0 +# @pytest.mark.parametrize( +# "_kwargs,error", +# [ +# # (dict(order_by="level"), TypeError), +# (dict(group_by=1), TypeError), +# (dict(group_by=["level", 1]), TypeError), +# # (dict(group_by="level", batch_size=1), TypeError), +# (dict(batch_size=-1), ValueError), +# ], +# ) +# def test_grib_from_stream_invalid_args(_kwargs, error): +# with open(earthkit_examples_file("test6.grib"), "rb") as stream: +# with pytest.raises(error): +# from_source("stream", stream, **_kwargs) -@pytest.mark.parametrize( - "_kwargs", - [ - {}, - {"batch_size": 1}, - ], -) -def test_grib_from_stream_single_batch(_kwargs): +def test_grib_from_stream_iter(): with open(earthkit_examples_file("test6.grib"), "rb") as stream: - ds = from_source("stream", stream, **_kwargs) + ds = from_source("stream", stream) # no fieldlist methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) ref = [ @@ -140,22 +62,39 @@ def test_grib_from_stream_single_batch(_kwargs): assert sum([1 for _ in ds]) == 0 +@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) +def test_grib_from_stream_fieldlist_backend(array_backend): + with open(earthkit_examples_file("test6.grib"), "rb") as stream: + ds = from_source("stream", stream, array_backend=array_backend) + + assert isinstance(ds, StreamFieldList) + + assert ds.array_backend.name == array_backend + assert ds.to_array().shape == (6, 7, 12) + + assert sum([1 for _ in ds]) == 0 + + with pytest.raises((RuntimeError, ValueError)): + ds.to_array() + + @pytest.mark.parametrize( "_kwargs,expected_meta", [ - ({"batch_size": 3}, [["t", "u", "v"], ["t", "u", "v"]]), - ({"batch_size": 4}, [["t", "u", "v", "t"], ["u", "v"]]), + ({"n": 1}, [["t"], ["u"], ["v"], ["t"], ["u"], ["v"]]), + ({"n": 3}, [["t", "u", "v"], ["t", "u", "v"]]), + ({"n": 4}, [["t", "u", "v", "t"], ["u", "v"]]), ], ) -def test_grib_from_stream_multi_batch(_kwargs, expected_meta): +def test_grib_from_stream_batched(_kwargs, expected_meta): with open(earthkit_examples_file("test6.grib"), "rb") as stream: - ds = from_source("stream", stream, **_kwargs) + ds = from_source("stream", stream) # no methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) - for i, f in enumerate(ds): + for i, f in enumerate(ds.batched(_kwargs["n"])): assert len(f) == len(expected_meta[i]) f.metadata("param") == expected_meta[i] @@ -179,9 +118,9 @@ def test_grib_from_stream_multi_batch(_kwargs, expected_meta): ), ], ) -def test_grib_from_stream_multi_batch_convert_to_numpy(convert_kwargs, expected_shape): +def test_grib_from_stream_batched_convert_to_numpy(convert_kwargs, expected_shape): with open(earthkit_examples_file("test6.grib"), "rb") as stream: - ds = from_source("stream", stream, batch_size=2) + ds = from_source("stream", stream) ref = [ [("t", 1000), ("u", 1000)], @@ -192,7 +131,7 @@ def test_grib_from_stream_multi_batch_convert_to_numpy(convert_kwargs, expected_ if convert_kwargs is None: convert_kwargs = {} - for i, f in enumerate(ds): + for i, f in enumerate(ds.batched(2)): df = f.to_fieldlist(array_backend="numpy", **convert_kwargs) assert df.metadata(("param", "level")) == ref[i], i assert df._array.shape == expected_shape, i @@ -203,12 +142,76 @@ def test_grib_from_stream_multi_batch_convert_to_numpy(convert_kwargs, expected_ assert sum([1 for _ in ds]) == 0 +@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) +@pytest.mark.parametrize("group", ["level", ["level", "gridType"]]) +def test_grib_from_stream_group_by(array_backend, group): + with open(earthkit_examples_file("test6.grib"), "rb") as stream: + ds = from_source("stream", stream, array_backend=array_backend) + + # no methods are available + with pytest.raises((TypeError, NotImplementedError)): + len(ds) + + ref = [ + [("t", 1000), ("u", 1000), ("v", 1000)], + [("t", 850), ("u", 850), ("v", 850)], + ] + for i, f in enumerate(ds.group_by(group)): + assert len(f) == 3 + assert f.metadata(("param", "level")) == ref[i] + afl = f.to_fieldlist(array_backend=array_backend) + assert afl is not f + assert len(afl) == 3 + + # stream consumed, no data is available + assert sum([1 for _ in ds]) == 0 + + +@pytest.mark.parametrize( + "convert_kwargs,expected_shape", + [ + ({}, (3, 7, 12)), + (None, (3, 7, 12)), + (None, (3, 7, 12)), + ({"flatten": False}, (3, 7, 12)), + ({"flatten": True}, (3, 84)), + ], +) +def test_grib_from_stream_group_by_convert_to_numpy(convert_kwargs, expected_shape): + group = "level" + with open(earthkit_examples_file("test6.grib"), "rb") as stream: + ds = from_source("stream", stream) + + # no fieldlist methods are available on a StreamSource + with pytest.raises((TypeError, NotImplementedError)): + len(ds) + + ref = [ + [("t", 1000), ("u", 1000), ("v", 1000)], + [("t", 850), ("u", 850), ("v", 850)], + ] + + if convert_kwargs is None: + convert_kwargs = {} + + for i, f in enumerate(ds.group_by(group)): + df = f.to_fieldlist(array_backend="numpy", **convert_kwargs) + assert len(df) == 3 + assert df.metadata(("param", "level")) == ref[i] + assert df._array.shape == expected_shape + assert df.to_numpy(**convert_kwargs).shape == expected_shape + assert df.to_fieldlist(array_backend="numpy", **convert_kwargs) is df + + # stream consumed, no data is available + assert sum([1 for _ in ds]) == 0 + + def test_grib_from_stream_in_memory(): with open(earthkit_examples_file("test6.grib"), "rb") as stream: ds = from_source( "stream", stream, - batch_size=0, + read_all=True, ) assert len(ds) == 6 @@ -222,17 +225,12 @@ def test_grib_from_stream_in_memory(): ("u", 850), ("v", 850), ] - val = [] # iteration - for f in ds: - v = f.metadata(("param", "level")) - val.append(v) - + val = [f.metadata(("param", "level")) for f in ds] assert val == md_ref, "iteration" # metadata - val = [] val = ds.metadata(("param", "level")) assert val == md_ref, "method" @@ -283,28 +281,19 @@ def test_grib_from_stream_in_memory(): ) def test_grib_from_stream_in_memory_convert_to_numpy(convert_kwargs, expected_shape): with open(earthkit_examples_file("test6.grib"), "rb") as stream: - ds_s = from_source( - "stream", - stream, - batch_size=0, - ) + ds_s = from_source("stream", stream, read_all=True) ds = ds_s.to_fieldlist(array_backend="numpy", **convert_kwargs) assert len(ds) == 6 ref = ["t", "u", "v", "t", "u", "v"] - val = [] # iteration - for f in ds: - v = f.metadata("param") - val.append(v) - + val = [f.metadata("param") for f in ds] assert val == ref, "iteration" # metadata - val = [] val = ds.metadata("param") assert val == ref, "method" @@ -334,7 +323,7 @@ def test_grib_from_stream_in_memory_convert_to_numpy(convert_kwargs, expected_sh def test_grib_save_when_loaded_from_stream(): with open(earthkit_examples_file("test6.grib"), "rb") as stream: - fs = from_source("stream", stream, batch_size=0) + fs = from_source("stream", stream, read_all=True) assert len(fs) == 6 with temp_file() as tmp: fs.save(tmp) @@ -342,6 +331,130 @@ def test_grib_save_when_loaded_from_stream(): assert len(fs) == len(fs_saved) +def test_grib_multi_from_stream_iter(): + stream1 = open(earthkit_examples_file("test.grib"), "rb") + stream2 = open(earthkit_examples_file("test4.grib"), "rb") + ds = from_source("stream", [stream1, stream2]) + + assert isinstance(ds, StreamFieldList) + + # no fieldlist methods are available + with pytest.raises((TypeError, NotImplementedError)): + len(ds) + + ref = [ + ("2t", 0), + ("msl", 0), + ("t", 500), + ("z", 500), + ("t", 850), + ("z", 850), + ] + + for i, f in enumerate(ds): + assert f.metadata(("param", "level")) == ref[i], i + + # stream consumed, no data is available + assert sum([1 for _ in ds]) == 0 + + +@pytest.mark.parametrize( + "_kwargs,expected_meta", + [ + ({"n": 1}, [["2t"], ["msl"], ["t"], ["z"], ["t"], ["z"]]), + ({"n": 2}, [["2t", "msl"], ["t", "z"], ["t", "z"]]), + ({"n": 3}, [["2t", "msl", "t"], ["z", "t", "z"]]), + ({"n": 4}, [["2t", "msl", "t", "z"], ["t", "z"]]), + ], +) +def test_grib_multi_grib_from_stream_batched(_kwargs, expected_meta): + stream1 = open(earthkit_examples_file("test.grib"), "rb") + stream2 = open(earthkit_examples_file("test4.grib"), "rb") + ds = from_source("stream", [stream1, stream2]) + + assert isinstance(ds, StreamFieldList) + + # no methods are available + with pytest.raises((TypeError, NotImplementedError)): + len(ds) + + cnt = 0 + for i, f in enumerate(ds.batched(_kwargs["n"])): + assert len(f) == len(expected_meta[i]) + f.metadata("param") == expected_meta[i] + cnt += 1 + + assert cnt == len(expected_meta) + + # stream consumed, no data is available + assert sum([1 for _ in ds]) == 0 + + +def test_grib_multi_stream_memory(): + stream1 = open(earthkit_examples_file("test.grib"), "rb") + stream2 = open(earthkit_examples_file("test4.grib"), "rb") + ds = from_source("stream", [stream1, stream2], read_all=True) + + assert len(ds) == 6 + + md_ref = [ + ("2t", 0), + ("msl", 0), + ("t", 500), + ("z", 500), + ("t", 850), + ("z", 850), + ] + # iteration + val = [f.metadata(("param", "level")) for f in ds] + assert val == md_ref, "iteration" + + # metadata + val = ds.metadata(("param", "level")) + assert val == md_ref, "method" + + # data + with pytest.raises(ValueError): + ds.to_numpy().shape + + # first part + expected_shape = (2, 11, 19) + assert ds[0:2].to_numpy().shape == expected_shape + + ref = np.array([262.78027344, 101947.8125]) + + vals = ds[0:2].to_numpy()[:, 0, 0] + assert np.allclose(vals, ref) + + # second part + expected_shape = (4, 181, 360) + assert ds[2:].to_numpy().shape == expected_shape + + ref = np.array([228.04600525, 48126.859375, 246.61032104, 11786.1132812]) + + vals = ds[2:].to_numpy()[:, 0, 0] + assert np.allclose(vals, ref) + + # slicing + r = ds[0:3] + assert len(r) == 3 + val = r.metadata(("param", "level")) + assert val == md_ref[0:3] + + r = ds[-2:] + assert len(r) == 2 + val = r.metadata(("param", "level")) + assert val == md_ref[-2:] + + r = ds.sel(param="t") + assert len(r) == 2 + val = r.metadata(("param", "level")) + assert val == [ + ("t", 500), + ("t", 850), + ] + + if __name__ == "__main__": from earthkit.data.testing import main diff --git a/tests/grib/test_grib_url_stream.py b/tests/grib/test_grib_url_stream.py index 0bd26dc0..3a88450c 100644 --- a/tests/grib/test_grib_url_stream.py +++ b/tests/grib/test_grib_url_stream.py @@ -14,6 +14,7 @@ from earthkit.data import from_source from earthkit.data.core.temporary import temp_file +from earthkit.data.sources.stream import StreamFieldList from earthkit.data.testing import earthkit_remote_test_data_file @@ -21,81 +22,53 @@ def repeat_list_items(items, count): return sum([[x] * count for x in items], []) -@pytest.mark.parametrize( - "_kwargs,error", - [ - # (dict(order_by="level"), TypeError), - (dict(group_by=1), TypeError), - (dict(group_by=["level", 1]), TypeError), - # (dict(group_by="level", batch_size=1), TypeError), - (dict(batch_size=-1), ValueError), - ], -) -def test_grib_url_stream_invalid_args(_kwargs, error): - with pytest.raises(error): - from_source( - "url", - earthkit_remote_test_data_file("examples/test6.grib"), - stream=True, - **_kwargs, - ) - - -@pytest.mark.parametrize( - "_kwargs", - [ - {"group_by": "level"}, - {"group_by": "level", "batch_size": 0}, - {"group_by": "level", "batch_size": 1}, - {"group_by": ["level", "gridType"]}, - ], -) -def test_grib_url_stream_group_by(_kwargs): - fs = from_source( - "url", - earthkit_remote_test_data_file("examples/test6.grib"), - stream=True, - **_kwargs, - ) - - # no methods are available - with pytest.raises(TypeError): - len(fs) - - ref = [ - [("t", 1000), ("u", 1000), ("v", 1000)], - [("t", 850), ("u", 850), ("v", 850)], - ] - cnt = 0 - for i, f in enumerate(fs): - assert len(f) == 3 - assert f.metadata(("param", "level")) == ref[i] - assert f.to_fieldlist(array_backend="numpy") is not f - cnt += 1 - - assert cnt == len(ref) - - # stream consumed, no data is available - assert sum([1 for _ in fs]) == 0 - - -@pytest.mark.parametrize( - "_kwargs", - [ - {}, - {"batch_size": 1}, - ], -) -def test_grib_url_stream_single_batch(_kwargs): +# @pytest.mark.parametrize( +# "_kwargs,error", +# [ +# # (dict(order_by="level"), TypeError), +# (dict(group_by=1), TypeError), +# (dict(group_by=["level", 1]), TypeError), +# # (dict(group_by="level", batch_size=1), TypeError), +# (dict(batch_size=-1), ValueError), +# ], +# ) +# def test_grib_url_stream_invalid_args(_kwargs, error): +# with pytest.raises(error): +# from_source( +# "url", +# earthkit_remote_test_data_file("examples/test6.grib"), +# stream=True, +# **_kwargs, +# ) + + +# @pytest.mark.parametrize( +# "_kwargs", +# [ +# {"group_by": "level"}, +# {"group_by": "level", "batch_size": 0}, +# {"group_by": "level", "batch_size": 1}, +# {"group_by": ["level", "gridType"]}, +# ], +# ) + + +# @pytest.mark.parametrize( +# "_kwargs", +# [ +# {}, +# {"batch_size": 1}, +# ], +# ) +def test_grib_url_stream_iter(): ds = from_source( "url", earthkit_remote_test_data_file("examples/test6.grib"), stream=True, - **_kwargs, ) # no fieldlist methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) ref = [ @@ -120,24 +93,24 @@ def test_grib_url_stream_single_batch(_kwargs): @pytest.mark.parametrize( "_kwargs,expected_meta", [ - ({"batch_size": 3}, [["t", "u", "v"], ["t", "u", "v"]]), - ({"batch_size": 4}, [["t", "u", "v", "t"], ["u", "v"]]), + ({"n": 1}, [["t"], ["u"], ["v"], ["t"], ["u"], ["v"]]), + ({"n": 3}, [["t", "u", "v"], ["t", "u", "v"]]), + ({"n": 4}, [["t", "u", "v", "t"], ["u", "v"]]), ], ) -def test_grib_url_stream_multi_batch(_kwargs, expected_meta): +def test_grib_from_stream_batched(_kwargs, expected_meta): ds = from_source( "url", earthkit_remote_test_data_file("examples/test6.grib"), stream=True, - **_kwargs, ) # no methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) cnt = 0 - for i, f in enumerate(ds): + for i, f in enumerate(ds.batched(_kwargs["n"])): assert len(f) == len(expected_meta[i]) f.metadata("param") == expected_meta[i] cnt += 1 @@ -148,29 +121,51 @@ def test_grib_url_stream_multi_batch(_kwargs, expected_meta): assert sum([1 for _ in ds]) == 0 +@pytest.mark.parametrize("group", ["level", ["level", "gridType"]]) +def test_grib_url_stream_group_by(group): + ds = from_source( + "url", earthkit_remote_test_data_file("examples/test6.grib"), stream=True + ) + + # no methods are available + with pytest.raises((TypeError, NotImplementedError)): + len(ds) + + ref = [ + [("t", 1000), ("u", 1000), ("v", 1000)], + [("t", 850), ("u", 850), ("v", 850)], + ] + cnt = 0 + for i, f in enumerate(ds.group_by(group)): + assert len(f) == 3 + assert f.metadata(("param", "level")) == ref[i] + assert f.to_fieldlist(array_backend="numpy") is not f + cnt += 1 + + assert cnt == len(ref) + + # stream consumed, no data is available + assert sum([1 for _ in ds]) == 0 + + def test_grib_url_stream_in_memory(): ds = from_source( "url", earthkit_remote_test_data_file("examples/test6.grib"), stream=True, - batch_size=0, + read_all=True, ) assert len(ds) == 6 expected_shape = (6, 7, 12) ref = ["t", "u", "v", "t", "u", "v"] - val = [] # iteration - for f in ds: - v = f.metadata("param") - val.append(v) - + val = [f.metadata(("param")) for f in ds] assert val == ref, "iteration" # metadata - val = [] val = ds.metadata("param") assert val == ref, "method" @@ -197,7 +192,7 @@ def test_grib_save_when_loaded_from_url_stream(): "url", earthkit_remote_test_data_file("examples/test6.grib"), stream=True, - batch_size=0, + read_all=True, ) assert len(ds) == 6 with temp_file() as tmp: @@ -206,14 +201,14 @@ def test_grib_save_when_loaded_from_url_stream(): assert len(ds) == len(ds_saved) -@pytest.mark.parametrize( - "_kwargs", - [ - {}, - {"batch_size": 1}, - ], -) -def test_grib_multi_url_stream_single_batch(_kwargs): +# @pytest.mark.parametrize( +# "_kwargs", +# [ +# {}, +# {"batch_size": 1}, +# ], +# ) +def test_grib_multi_url_stream_iter(): ds = from_source( "url", [ @@ -221,11 +216,17 @@ def test_grib_multi_url_stream_single_batch(_kwargs): earthkit_remote_test_data_file("examples/test4.grib"), ], stream=True, - **_kwargs, ) + assert isinstance(ds, StreamFieldList) + assert len(ds._source.sources) == 2 + assert ds._source._status() == [ + {"reader": True, "stream": True}, + {"reader": False, "stream": False}, + ] + # no fieldlist methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) ref = [ @@ -246,16 +247,22 @@ def test_grib_multi_url_stream_single_batch(_kwargs): # stream consumed, no data is available assert sum([1 for _ in ds]) == 0 + assert ds._source._status() == [ + {"reader": True, "stream": True}, + {"reader": True, "stream": True}, + ] + @pytest.mark.parametrize( "_kwargs,expected_meta", [ - ({"batch_size": 2}, [["2t", "msl"], ["t", "z"], ["t", "z"]]), - ({"batch_size": 3}, [["2t", "msl", "t"], ["z", "t", "z"]]), - ({"batch_size": 4}, [["2t", "msl", "t", "z"], ["t", "z"]]), + ({"n": 1}, [["2t"], ["msl"], ["t"], ["z"], ["t"], ["z"]]), + ({"n": 2}, [["2t", "msl"], ["t", "z"], ["t", "z"]]), + ({"n": 3}, [["2t", "msl", "t"], ["z", "t", "z"]]), + ({"n": 4}, [["2t", "msl", "t", "z"], ["t", "z"]]), ], ) -def test_grib_multi_url_stream_batch(_kwargs, expected_meta): +def test_grib_multi_url_stream_batched(_kwargs, expected_meta): ds = from_source( "url", [ @@ -263,15 +270,14 @@ def test_grib_multi_url_stream_batch(_kwargs, expected_meta): earthkit_remote_test_data_file("examples/test4.grib"), ], stream=True, - **_kwargs, ) # no methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) cnt = 0 - for i, f in enumerate(ds): + for i, f in enumerate(ds.batched(_kwargs["n"])): assert len(f) == len(expected_meta[i]) f.metadata("param") == expected_meta[i] cnt += 1 @@ -290,7 +296,7 @@ def test_grib_multi_url_stream_memory(): earthkit_remote_test_data_file("examples/test4.grib"), ], stream=True, - batch_size=0, + read_all=True, ) assert len(ds) == 6 @@ -303,18 +309,11 @@ def test_grib_multi_url_stream_memory(): ("t", 850), ("z", 850), ] - - val = [] - # iteration - for f in ds: - v = f.metadata(("param", "level")) - val.append(v) - + val = [f.metadata(("param", "level")) for f in ds] assert val == md_ref, "iteration" # metadata - val = [] val = ds.metadata(("param", "level")) assert val == md_ref, "method" @@ -385,7 +384,7 @@ def test_grib_single_url_stream_parts(path, parts, expected_meta): ) # no fieldlist methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) cnt = 0 @@ -417,7 +416,7 @@ def test_grib_single_url_stream_parts_as_arg_valid(parts, expected_meta): ) # no fieldlist methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) cnt = 0 @@ -485,7 +484,7 @@ def test_grib_multi_url_stream_parts(parts1, parts2, expected_meta): ) # no fieldlist methods are available - with pytest.raises(TypeError): + with pytest.raises((TypeError, NotImplementedError)): len(ds) cnt = 0 diff --git a/tests/netcdf/test_netcdf_iter.py b/tests/netcdf/test_netcdf_iter.py new file mode 100644 index 00000000..ca9c830b --- /dev/null +++ b/tests/netcdf/test_netcdf_iter.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 + +# (C) Copyright 2020 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation +# nor does it submit to any jurisdiction. +# + + +import pytest + +from earthkit.data import from_source +from earthkit.data.testing import ( + ARRAY_BACKENDS, + earthkit_examples_file, + earthkit_test_data_file, +) + + +@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) +@pytest.mark.parametrize("group", ["param"]) +def test_netcdf_group_by(array_backend, group): + ds = from_source( + "file", earthkit_test_data_file("test6.nc"), array_backend=array_backend + ) + + ref = [ + [("t", 1000), ("t", 850)], + [("u", 1000), ("u", 850)], + [("v", 1000), ("v", 850)], + ] + cnt = 0 + for i, f in enumerate(ds.group_by(group)): + assert len(f) == 2 + assert f.metadata(("param", "level")) == ref[i] + cnt += len(f) + + assert cnt == len(ds) + + +@pytest.mark.parametrize("array_backend", ARRAY_BACKENDS) +@pytest.mark.parametrize("group", ["level", ["level", "gridType"]]) +def test_netcdf_multi_group_by(array_backend, group): + ds = from_source( + "file", + [earthkit_test_data_file("test4.nc"), earthkit_test_data_file("test6.nc")], + array_backend=array_backend, + ) + + ref = [ + [("t", 500), ("z", 500)], + [("t", 850), ("z", 850), ("t", 850), ("u", 850), ("v", 850)], + [("t", 1000), ("u", 1000), ("v", 1000)], + ] + cnt = 0 + for i, f in enumerate(ds.group_by(group)): + assert f.metadata(("param", "level")) == ref[i] + cnt += len(f) + + assert cnt == len(ds) + + +@pytest.mark.parametrize( + "_kwargs,expected_meta", + [ + ({"n": 1}, [["t"], ["u"], ["v"], ["t"], ["u"], ["v"]]), + ({"n": 3}, [["t", "u", "v"], ["t", "u", "v"]]), + ({"n": 4}, [["t", "u", "v", "t"], ["u", "v"]]), + ], +) +def test_netcdf_batched(_kwargs, expected_meta): + ds = from_source("file", earthkit_test_data_file("test6.nc")) + + cnt = 0 + for i, f in enumerate(ds.batched(_kwargs["n"])): + assert len(f) == len(expected_meta[i]) + f.metadata("param") == expected_meta[i] + cnt += len(f) + + assert cnt == len(ds) + + +@pytest.mark.parametrize( + "_kwargs,expected_meta", + [ + ({"n": 1}, [["2t"], ["msl"], ["t"], ["z"], ["t"], ["z"]]), + ({"n": 2}, [["2t", "msl"], ["t", "z"], ["t", "z"]]), + ({"n": 3}, [["2t", "msl", "t"], ["z", "t", "z"]]), + ({"n": 4}, [["2t", "msl", "t", "z"], ["t", "z"]]), + ], +) +def test_netcdf_multi_batched(_kwargs, expected_meta): + ds = from_source( + "file", + [earthkit_examples_file("test.nc"), earthkit_test_data_file("test4.nc")], + ) + + cnt = 0 + n = _kwargs["n"] + for i, f in enumerate(ds.batched(n)): + assert len(f) == len(expected_meta[i]) + f.metadata("param") == expected_meta[i] + cnt += len(f) + + assert cnt == len(ds) + + +if __name__ == "__main__": + from earthkit.data.testing import main + + main() diff --git a/tests/readers/test_covjson_reader.py b/tests/readers/test_covjson_reader.py index 84a88c4a..4297da76 100644 --- a/tests/readers/test_covjson_reader.py +++ b/tests/readers/test_covjson_reader.py @@ -44,19 +44,20 @@ def test_covjson_stream(): ds = from_source("stream", stream) assert ds - c = next(ds) + it = iter(ds) + c = next(it) a = c.to_xarray() assert len(a.data_vars) == 1 with pytest.raises(StopIteration): - next(ds) + next(it) @pytest.mark.skipif(NO_ECCOVJSON, reason="no eccovjson available") def test_covjson_stream_memory(): stream = open(earthkit_test_data_file("time_series.covjson"), "rb") - ds = from_source("stream", stream, batch_size=0) + ds = from_source("stream", stream, read_all=True) assert ds a = ds.to_xarray() assert len(a.data_vars) == 1