Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/refactor streaming #371

Merged
merged 14 commits into from
May 15, 2024
214 changes: 85 additions & 129 deletions docs/examples/data_from_stream.ipynb

Large diffs are not rendered by default.

218 changes: 75 additions & 143 deletions docs/examples/fdb.ipynb

Large diffs are not rendered by default.

274 changes: 226 additions & 48 deletions docs/examples/url_stream.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"tags": []
},
"source": [
"earthkit-data can read GRIB data from a URL as a stream without writing anything to disk. This can be activated with the **stream=True** kwarg when calling :ref:`from_source() <data-sources-url>`."
"earthkit-data can read GRIB data from a URL as a stream without writing anything to disk. This can be activated with the ``stream=True`` kwarg when calling :ref:`from_source() <data-sources-url>`."
]
},
{
Expand All @@ -65,28 +65,45 @@
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "51964579-bf3f-4948-918d-fcd35581950e",
"cell_type": "markdown",
"id": "1661f2d1-2c30-4021-826f-1c2c5c565fd0",
"metadata": {},
"outputs": [],
"source": [
"for f in ds:\n",
" # f is GribField object. It gets deleted when going out of scope\n",
" print(f)"
"The resulting object only supports one iteration. Having finsihed the iteration the stream is consumed and no more data is available."
]
},
{
"cell_type": "markdown",
"id": "7f595ec8-008f-4198-8a3a-f8346fbfe09c",
"metadata": {},
"cell_type": "code",
"execution_count": 3,
"id": "51964579-bf3f-4948-918d-fcd35581950e",
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"GribField(t,500,20070101,1200,0,0)\n",
"GribField(z,500,20070101,1200,0,0)\n",
"GribField(t,850,20070101,1200,0,0)\n",
"GribField(z,850,20070101,1200,0,0)\n"
]
}
],
"source": [
"### Stream options"
"for f in ds:\n",
" # f is GribField object. It gets deleted when going out of scope\n",
" print(f)"
]
},
{
"cell_type": "raw",
"id": "328b1d7b-500d-4014-8346-b6d47c40ee86",
"id": "df31a770-f5e9-46c7-8d5f-c9f73c57483c",
"metadata": {
"editable": true,
"raw_mimetype": "text/restructuredtext",
Expand All @@ -96,13 +113,13 @@
"tags": []
},
"source": [
"To control how the stream is read use :ref:`batch_size <data-sources-url>` and :ref:`group_by <data-sources-url>`. E.g. the following code reads the GRIB data in messages of 2."
"The iteration can be done in batches by using :py:meth:`batched <data.sources.stream.StreamSource.batched>` or :py:meth:`group_by <data.sources.stream.StreamSource.group_by>`."
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "b7c9a685-6b38-4236-ae74-a14445e8bb95",
"id": "754dc030-da97-4076-88ba-f31fed2f091d",
"metadata": {
"editable": true,
"slideshow": {
Expand All @@ -115,25 +132,186 @@
"name": "stdout",
"output_type": "stream",
"text": [
"len=2\n",
" GribField(t,500,20070101,1200,0,0)\n",
" GribField(z,500,20070101,1200,0,0)\n",
"len=2\n",
" GribField(t,850,20070101,1200,0,0)\n",
" GribField(z,850,20070101,1200,0,0)\n"
"len=2 [('t', 500), ('z', 500)]\n",
"len=2 [('t', 850), ('z', 850)]\n"
]
}
],
"source": [
"ds = earthkit.data.from_source(\"url\", \n",
" \"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test4.grib\", \n",
" stream=True, batch_size=2)\n",
" stream=True)\n",
"\n",
"for f in ds:\n",
"for f in ds.batched(2):\n",
" # f is a fieldlist\n",
" print(f\"len={len(f)}\")\n",
" for g in f:\n",
" print(f\" {g}\")"
" print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")"
]
},
{
"cell_type": "markdown",
"id": "9db87b26-20e2-4cb5-89a1-2643d97fcf13",
"metadata": {},
"source": [
"#### Reading the whole stream into memory"
]
},
{
"cell_type": "raw",
"id": "f8622372-d2c9-4261-86cc-db46531b0cd5",
"metadata": {
"editable": true,
"raw_mimetype": "text/restructuredtext",
"slideshow": {
"slide_type": ""
},
"tags": []
},
"source": [
"We can load the whole stream into memory by using ``read_all=True`` in :ref:`from_source() <data-sources-url>`. The resulting object will be a py:class:`FieldList` storing all the GRIB messages in memory."
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "359ceb8d-3c88-411d-af86-78c7cb403a08",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds = earthkit.data.from_source(\"url\", \n",
" \"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test4.grib\", \n",
" stream=True, read_all=True)\n",
"\n",
"len(ds)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "4608964a-3849-4c40-bcd3-086f54b2f67a",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>centre</th>\n",
" <th>shortName</th>\n",
" <th>typeOfLevel</th>\n",
" <th>level</th>\n",
" <th>dataDate</th>\n",
" <th>dataTime</th>\n",
" <th>stepRange</th>\n",
" <th>dataType</th>\n",
" <th>number</th>\n",
" <th>gridType</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>ecmf</td>\n",
" <td>t</td>\n",
" <td>isobaricInhPa</td>\n",
" <td>500</td>\n",
" <td>20070101</td>\n",
" <td>1200</td>\n",
" <td>0</td>\n",
" <td>an</td>\n",
" <td>0</td>\n",
" <td>regular_ll</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>ecmf</td>\n",
" <td>z</td>\n",
" <td>isobaricInhPa</td>\n",
" <td>500</td>\n",
" <td>20070101</td>\n",
" <td>1200</td>\n",
" <td>0</td>\n",
" <td>an</td>\n",
" <td>0</td>\n",
" <td>regular_ll</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>ecmf</td>\n",
" <td>t</td>\n",
" <td>isobaricInhPa</td>\n",
" <td>850</td>\n",
" <td>20070101</td>\n",
" <td>1200</td>\n",
" <td>0</td>\n",
" <td>an</td>\n",
" <td>0</td>\n",
" <td>regular_ll</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>ecmf</td>\n",
" <td>z</td>\n",
" <td>isobaricInhPa</td>\n",
" <td>850</td>\n",
" <td>20070101</td>\n",
" <td>1200</td>\n",
" <td>0</td>\n",
" <td>an</td>\n",
" <td>0</td>\n",
" <td>regular_ll</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" centre shortName typeOfLevel level dataDate dataTime stepRange \\\n",
"0 ecmf t isobaricInhPa 500 20070101 1200 0 \n",
"1 ecmf z isobaricInhPa 500 20070101 1200 0 \n",
"2 ecmf t isobaricInhPa 850 20070101 1200 0 \n",
"3 ecmf z isobaricInhPa 850 20070101 1200 0 \n",
"\n",
" dataType number gridType \n",
"0 an 0 regular_ll \n",
"1 an 0 regular_ll \n",
"2 an 0 regular_ll \n",
"3 an 0 regular_ll "
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ds.ls()"
]
},
{
Expand Down Expand Up @@ -166,49 +344,49 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 7,
"id": "891af3e5-5aa5-451f-b7bb-a86eaf7c8ffe",
"metadata": {},
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"len=3\n",
" GribField(t,500,20070101,1200,0,0)\n",
" GribField(z,500,20070101,1200,0,0)\n",
" GribField(t,850,20070101,1200,0,0)\n",
"len=3\n",
" GribField(z,850,20070101,1200,0,0)\n",
" GribField(t,1000,20180801,1200,0,0)\n",
" GribField(u,1000,20180801,1200,0,0)\n",
"len=3\n",
" GribField(v,1000,20180801,1200,0,0)\n",
" GribField(t,850,20180801,1200,0,0)\n",
" GribField(u,850,20180801,1200,0,0)\n",
"len=1\n",
" GribField(v,850,20180801,1200,0,0)\n"
"len=3 [('t', 500), ('z', 500), ('t', 850)]\n",
"len=3 [('z', 850), ('t', 1000), ('u', 1000)]\n",
"len=3 [('v', 1000), ('t', 850), ('u', 850)]\n",
"len=1 [('v', 850)]\n"
]
}
],
"source": [
"ds = earthkit.data.from_source(\"url\", \n",
" [\"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test4.grib\", \n",
" \"https://get.ecmwf.int/repository/test-data/earthkit-data/examples/test6.grib\"], \n",
" stream=True, batch_size=3)\n",
" stream=True)\n",
"\n",
"for f in ds:\n",
"for f in ds.batched(3):\n",
" # f is a fieldlist\n",
" print(f\"len={len(f)}\")\n",
" for g in f:\n",
" print(f\" {g}\")"
" print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d287fb70-66fb-4b2f-8c07-a3cca7c3035d",
"metadata": {},
"metadata": {
"editable": true,
"slideshow": {
"slide_type": ""
},
"tags": []
},
"outputs": [],
"source": []
}
Expand Down
Loading
Loading