From 2ce53d1bfd9aa6acefb5ecd9606317bc32930c00 Mon Sep 17 00:00:00 2001 From: Scott Wales Date: Thu, 26 Aug 2021 17:20:53 +1000 Subject: [PATCH 1/4] Allow setting malloc_threshold, add OOD Client --- src/climtas/nci/__init__.py | 87 ++++++++++++++++++++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/src/climtas/nci/__init__.py b/src/climtas/nci/__init__.py index c156e55..7de58e5 100644 --- a/src/climtas/nci/__init__.py +++ b/src/climtas/nci/__init__.py @@ -25,7 +25,72 @@ _tmpdir = None -def GadiClient(threads=1): +def Client(threads=1, malloc_trim_threshold=None): + """Start a Dask client at NCI + + An appropriate client will be started for the current system + + Args: + threads: Number of threads per worker process. The total number of + workers will be ncpus/threads, so that each thread gets its own + CPU + malloc_trim_threshold: Threshold for automatic memory trimming. Can be + either a string e.g. '64kib' or a number of bytes e.g. 65536. + Smaller values may reduce out of memory errors at the cost of + running slower + + https://distributed.dask.org/en/latest/worker.html?highlight=worker#automatically-trim-memory + """ + + if os.environ["HOSTNAME"].startswith("ood"): + return OODClient(threads, malloc_trim_threshold) + else: + return GadiClient(threads, malloc_trim_threshold) + + +def OODClient(threads=1, malloc_trim_threshold=None): + """Start a Dask client on OOD + + This function is mostly to be consistent with the Gadi version + + Args: + threads: Number of threads per worker process. The total number of + workers will be ncpus/threads, so that each thread gets its own + CPU + malloc_trim_threshold: Threshold for automatic memory trimming. Can be + either a string e.g. '64kib' or a number of bytes e.g. 65536. + Smaller values may reduce out of memory errors at the cost of + running slower + + https://distributed.dask.org/en/latest/worker.html?highlight=worker#automatically-trim-memory + """ + global _dask_client, _tmpdir + + env = {} + + if malloc_trim_threshold is not None: + env["MALLOC_TRIM_THRESHOLD_"] = str( + dask.utils.parse_bytes(malloc_trim_threshold) + ) + + if _dask_client is None: + try: + # Works in sidebar and can follow the link + dask.config.set( + { + "distributed.dashboard.link": f'/node/{os.environ["host"]}/{os.environ["port"]}/proxy/{{port}}/status' + } + ) + except KeyError: + # Works in sidebar, but can't follow the link + dask.config.set({"distributed.dashboard.link": "/proxy/{port}/status"}) + + _dask_client = dask.distributed.Client(threads_per_worker=threads, env=env) + + return _dask_client + + +def GadiClient(threads=1, malloc_trim_threshold=None): """Start a Dask client on Gadi If run on a compute node it will check the PBS resources to know how many @@ -33,9 +98,27 @@ def GadiClient(threads=1): If run on a login node it will ask for 2 workers each with a 1GB memory limit + + Args: + threads: Number of threads per worker process. The total number of + workers will be $PBS_NCPUS/threads, so that each thread gets its own + CPU + malloc_trim_threshold: Threshold for automatic memory trimming. Can be + either a string e.g. '64kib' or a number of bytes e.g. 65536. + Smaller values may reduce out of memory errors at the cost of + running slower + + https://distributed.dask.org/en/latest/worker.html?highlight=worker#automatically-trim-memory """ global _dask_client, _tmpdir + env = {} + + if malloc_trim_threshold is not None: + env["MALLOC_TRIM_THRESHOLD_"] = str( + dask.utils.parse_bytes(malloc_trim_threshold) + ) + if _dask_client is None: _tmpdir = tempfile.TemporaryDirectory("dask-worker-space") @@ -45,6 +128,7 @@ def GadiClient(threads=1): threads_per_worker=threads, memory_limit="1000mb", local_directory=_tmpdir.name, + env=env, ) else: workers = int(os.environ["PBS_NCPUS"]) // threads @@ -53,5 +137,6 @@ def GadiClient(threads=1): threads_per_worker=threads, memory_limit=int(os.environ["PBS_VMEM"]) / workers, local_directory=_tmpdir.name, + env=env, ) return _dask_client From 1d8553dde16cd52417a4ca730081af8278b1226c Mon Sep 17 00:00:00 2001 From: Scott Wales Date: Fri, 27 Aug 2021 09:43:54 +1000 Subject: [PATCH 2/4] Fix mypy, blacken --- src/climtas/blocked.py | 44 +++++++++++------------------------------- src/climtas/helpers.py | 7 +------ test/test_blocked.py | 26 +++---------------------- test/test_event.py | 9 +++------ test/test_helpers.py | 18 ++--------------- 5 files changed, 20 insertions(+), 84 deletions(-) diff --git a/src/climtas/blocked.py b/src/climtas/blocked.py index 0539ee3..2bca6fe 100644 --- a/src/climtas/blocked.py +++ b/src/climtas/blocked.py @@ -669,23 +669,8 @@ def _merge_approx_percentile( out[ii + numpy.s_[...,] + kk] = merge_percentiles( finalpcts, pcts, - chunk_pcts[ - numpy.s_[ - :, - ] - + ii - + numpy.s_[ - :, - ] - + kk - ].T, - Ns=chunk_counts[ - ii - + numpy.s_[ - :, - ] - + kk - ], + chunk_pcts[numpy.s_[:,] + ii + numpy.s_[:,] + kk].T, + Ns=chunk_counts[ii + numpy.s_[:,] + kk], interpolation=interpolation, ) @@ -693,11 +678,7 @@ def _merge_approx_percentile( def dask_approx_percentile( - array: dask.array.array, - pcts, - axis: int, - interpolation="linear", - skipna=True, + array: dask.array.array, pcts, axis: int, interpolation="linear", skipna=True, ): """ Get the approximate percentiles of a Dask array along 'axis', using the 'dask' @@ -776,7 +757,7 @@ def dask_approx_percentile( def approx_percentile( da: T.Union[xarray.DataArray, dask.array.Array, numpy.ndarray], - q: float, + q: T.Union[float, T.List[float]], dim: str = None, axis: int = None, skipna: bool = True, @@ -803,8 +784,10 @@ def approx_percentile( Array of the same type as da, otherwise as :func:`numpy.percentile` """ - if isinstance(q, numbers.Number): - q = [q] + if isinstance(q, float): + qlist = [q] + else: + qlist = q if skipna: pctile = numpy.nanpercentile @@ -818,17 +801,12 @@ def approx_percentile( data = dask_approx_percentile(da.data, pcts=q, axis=axis, skipna=skipna) dims = ["percentile", *[d for i, d in enumerate(da.dims) if i != axis]] coords = {k: v for k, v in da.coords.items() if k in dims} - coords["percentile"] = q - return xarray.DataArray( - data, - name=da.name, - dims=dims, - coords=coords, - ) + coords["percentile"] = xarray.DataArray(q, dims="percentile") + return xarray.DataArray(data, name=da.name, dims=dims, coords=coords,) if isinstance(da, xarray.DataArray): # Xarray+Numpy - return da.quantile([p / 100 for p in q], dim=dim, skipna=skipna) + return da.quantile([p / 100 for p in qlist], dim=dim, skipna=skipna) assert dim is None assert axis is not None diff --git a/src/climtas/helpers.py b/src/climtas/helpers.py index 2460605..a49c12b 100644 --- a/src/climtas/helpers.py +++ b/src/climtas/helpers.py @@ -33,12 +33,7 @@ def map_blocks_to_delayed( - da: xarray.DataArray, - func, - axis=None, - name="blocks-to-delayed", - args=[], - kwargs={}, + da: xarray.DataArray, func, axis=None, name="blocks-to-delayed", args=[], kwargs={}, ) -> T.List[T.Tuple[T.List[int], T.Any]]: """ Run some function 'func' on each dask chunk of 'da' diff --git a/test/test_blocked.py b/test/test_blocked.py index e0160c5..b507219 100644 --- a/test/test_blocked.py +++ b/test/test_blocked.py @@ -243,21 +243,9 @@ def test_percentile(sample): b = numpy.zeros(sample[0].shape) for ii in numpy.ndindex(b.shape): - b[ii] = dask.array.percentile( - sample.data[ - numpy.s_[ - :, - ] - + ii - ], - 90, - )[0] + b[ii] = dask.array.percentile(sample.data[numpy.s_[:,] + ii], 90,)[0] else: - b = numpy.percentile( - sample.data, - [90], - axis=0, - ) + b = numpy.percentile(sample.data, [90], axis=0,) numpy.testing.assert_array_equal(a, b) @@ -277,15 +265,7 @@ def test_dask_approx_percentile(): # Compare with applying dask.percentile along the time axis b = numpy.zeros(sample[0].shape) for ii in numpy.ndindex(b.shape): - b[ii] = dask.array.percentile( - sample[ - numpy.s_[ - :, - ] - + ii - ], - 90, - )[0] + b[ii] = dask.array.percentile(sample[numpy.s_[:,] + ii], 90,)[0] numpy.testing.assert_array_equal(a, b) diff --git a/test/test_event.py b/test/test_event.py index 6779ab0..f5dd4f2 100644 --- a/test/test_event.py +++ b/test/test_event.py @@ -192,8 +192,7 @@ def test_event_values(): events = find_events(da > 0) values = event_values(da_dask, events).compute().sort_values(["time", "event_id"]) numpy.testing.assert_array_equal( - values.to_numpy(), - [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], + values.to_numpy(), [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], ) # Make sure the values aren't evaluated when using dask @@ -215,8 +214,7 @@ def test_event_values_dask_nd(): values = values.compute().sort_values(["time", "event_id"]) numpy.testing.assert_array_equal( - values.to_numpy(), - [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], + values.to_numpy(), [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], ) @@ -252,6 +250,5 @@ def test_event_values_reduce(): stats = values.groupby("event_id")["value"].min().compute() numpy.testing.assert_array_equal( - stats.to_numpy(), - [2, 8, 7], + stats.to_numpy(), [2, 8, 7], ) diff --git a/test/test_helpers.py b/test/test_helpers.py index 655d23d..18d51f8 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -25,26 +25,12 @@ def func(da, block_info=None): numpy.testing.assert_array_equal( df["chunk-location"].sort_values().apply(lambda x: x[0]), - numpy.array( - [ - 0, - 0, - 1, - 1, - ] - ), + numpy.array([0, 0, 1, 1,]), ) numpy.testing.assert_array_equal( df["chunk-location"].sort_values().apply(lambda x: x[1]), - numpy.array( - [ - 0, - 1, - 0, - 1, - ] - ), + numpy.array([0, 1, 0, 1,]), ) From ac33c4987443f5cdbe686cdd9ac8a11a17387828 Mon Sep 17 00:00:00 2001 From: Scott Wales Date: Fri, 27 Aug 2021 09:55:23 +1000 Subject: [PATCH 3/4] Reblacken with newer version --- src/climtas/blocked.py | 32 ++++++++++++++++++++++++++++---- src/climtas/helpers.py | 7 ++++++- test/test_blocked.py | 26 +++++++++++++++++++++++--- test/test_event.py | 9 ++++++--- test/test_helpers.py | 18 ++++++++++++++++-- 5 files changed, 79 insertions(+), 13 deletions(-) diff --git a/src/climtas/blocked.py b/src/climtas/blocked.py index 2bca6fe..c5b1c2f 100644 --- a/src/climtas/blocked.py +++ b/src/climtas/blocked.py @@ -669,8 +669,23 @@ def _merge_approx_percentile( out[ii + numpy.s_[...,] + kk] = merge_percentiles( finalpcts, pcts, - chunk_pcts[numpy.s_[:,] + ii + numpy.s_[:,] + kk].T, - Ns=chunk_counts[ii + numpy.s_[:,] + kk], + chunk_pcts[ + numpy.s_[ + :, + ] + + ii + + numpy.s_[ + :, + ] + + kk + ].T, + Ns=chunk_counts[ + ii + + numpy.s_[ + :, + ] + + kk + ], interpolation=interpolation, ) @@ -678,7 +693,11 @@ def _merge_approx_percentile( def dask_approx_percentile( - array: dask.array.array, pcts, axis: int, interpolation="linear", skipna=True, + array: dask.array.array, + pcts, + axis: int, + interpolation="linear", + skipna=True, ): """ Get the approximate percentiles of a Dask array along 'axis', using the 'dask' @@ -802,7 +821,12 @@ def approx_percentile( dims = ["percentile", *[d for i, d in enumerate(da.dims) if i != axis]] coords = {k: v for k, v in da.coords.items() if k in dims} coords["percentile"] = xarray.DataArray(q, dims="percentile") - return xarray.DataArray(data, name=da.name, dims=dims, coords=coords,) + return xarray.DataArray( + data, + name=da.name, + dims=dims, + coords=coords, + ) if isinstance(da, xarray.DataArray): # Xarray+Numpy diff --git a/src/climtas/helpers.py b/src/climtas/helpers.py index a49c12b..2460605 100644 --- a/src/climtas/helpers.py +++ b/src/climtas/helpers.py @@ -33,7 +33,12 @@ def map_blocks_to_delayed( - da: xarray.DataArray, func, axis=None, name="blocks-to-delayed", args=[], kwargs={}, + da: xarray.DataArray, + func, + axis=None, + name="blocks-to-delayed", + args=[], + kwargs={}, ) -> T.List[T.Tuple[T.List[int], T.Any]]: """ Run some function 'func' on each dask chunk of 'da' diff --git a/test/test_blocked.py b/test/test_blocked.py index b507219..e0160c5 100644 --- a/test/test_blocked.py +++ b/test/test_blocked.py @@ -243,9 +243,21 @@ def test_percentile(sample): b = numpy.zeros(sample[0].shape) for ii in numpy.ndindex(b.shape): - b[ii] = dask.array.percentile(sample.data[numpy.s_[:,] + ii], 90,)[0] + b[ii] = dask.array.percentile( + sample.data[ + numpy.s_[ + :, + ] + + ii + ], + 90, + )[0] else: - b = numpy.percentile(sample.data, [90], axis=0,) + b = numpy.percentile( + sample.data, + [90], + axis=0, + ) numpy.testing.assert_array_equal(a, b) @@ -265,7 +277,15 @@ def test_dask_approx_percentile(): # Compare with applying dask.percentile along the time axis b = numpy.zeros(sample[0].shape) for ii in numpy.ndindex(b.shape): - b[ii] = dask.array.percentile(sample[numpy.s_[:,] + ii], 90,)[0] + b[ii] = dask.array.percentile( + sample[ + numpy.s_[ + :, + ] + + ii + ], + 90, + )[0] numpy.testing.assert_array_equal(a, b) diff --git a/test/test_event.py b/test/test_event.py index f5dd4f2..6779ab0 100644 --- a/test/test_event.py +++ b/test/test_event.py @@ -192,7 +192,8 @@ def test_event_values(): events = find_events(da > 0) values = event_values(da_dask, events).compute().sort_values(["time", "event_id"]) numpy.testing.assert_array_equal( - values.to_numpy(), [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], + values.to_numpy(), + [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], ) # Make sure the values aren't evaluated when using dask @@ -214,7 +215,8 @@ def test_event_values_dask_nd(): values = values.compute().sort_values(["time", "event_id"]) numpy.testing.assert_array_equal( - values.to_numpy(), [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], + values.to_numpy(), + [[0, 0, 9], [1, 0, 8], [2, 1, 3], [3, 1, 2], [4, 2, 7]], ) @@ -250,5 +252,6 @@ def test_event_values_reduce(): stats = values.groupby("event_id")["value"].min().compute() numpy.testing.assert_array_equal( - stats.to_numpy(), [2, 8, 7], + stats.to_numpy(), + [2, 8, 7], ) diff --git a/test/test_helpers.py b/test/test_helpers.py index 18d51f8..655d23d 100644 --- a/test/test_helpers.py +++ b/test/test_helpers.py @@ -25,12 +25,26 @@ def func(da, block_info=None): numpy.testing.assert_array_equal( df["chunk-location"].sort_values().apply(lambda x: x[0]), - numpy.array([0, 0, 1, 1,]), + numpy.array( + [ + 0, + 0, + 1, + 1, + ] + ), ) numpy.testing.assert_array_equal( df["chunk-location"].sort_values().apply(lambda x: x[1]), - numpy.array([0, 1, 0, 1,]), + numpy.array( + [ + 0, + 1, + 0, + 1, + ] + ), ) From 17fb26c9edc80ff44a3960b4f126545bcb66f583 Mon Sep 17 00:00:00 2001 From: Scott Wales Date: Fri, 27 Aug 2021 10:13:28 +1000 Subject: [PATCH 4/4] Fixup tests --- src/climtas/blocked.py | 6 +++--- src/climtas/grid.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/climtas/blocked.py b/src/climtas/blocked.py index c5b1c2f..cd9c80b 100644 --- a/src/climtas/blocked.py +++ b/src/climtas/blocked.py @@ -776,7 +776,7 @@ def dask_approx_percentile( def approx_percentile( da: T.Union[xarray.DataArray, dask.array.Array, numpy.ndarray], - q: T.Union[float, T.List[float]], + q: T.Union[numbers.Real, T.List[numbers.Real]], dim: str = None, axis: int = None, skipna: bool = True, @@ -803,7 +803,7 @@ def approx_percentile( Array of the same type as da, otherwise as :func:`numpy.percentile` """ - if isinstance(q, float): + if isinstance(q, numbers.Real): qlist = [q] else: qlist = q @@ -820,7 +820,7 @@ def approx_percentile( data = dask_approx_percentile(da.data, pcts=q, axis=axis, skipna=skipna) dims = ["percentile", *[d for i, d in enumerate(da.dims) if i != axis]] coords = {k: v for k, v in da.coords.items() if k in dims} - coords["percentile"] = xarray.DataArray(q, dims="percentile") + coords["percentile"] = xarray.DataArray(qlist, dims="percentile") return xarray.DataArray( data, name=da.name, diff --git a/src/climtas/grid.py b/src/climtas/grid.py index 67ffe90..c011357 100644 --- a/src/climtas/grid.py +++ b/src/climtas/grid.py @@ -135,7 +135,7 @@ def to_cdo_grid(self, outfile): def to_netcdf(self, outfile): ds = xarray.DataArray( data=numpy.zeros((len(self.lats), len(self.lons))), - coords=[("lat", self.lats), ("lon", self.lons)], + coords=[("lat", self.lats.data), ("lon", self.lons.data)], ) ds.lat.attrs["units"] = "degrees_north" ds.lon.attrs["units"] = "degrees_east"