From 1c60e5b9b16586a4128bef0097b895bc4437346d Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Thu, 27 Feb 2025 10:35:44 -0700 Subject: [PATCH] Add multi-processing write example (#779) --- docs/docs/icechunk-python/parallel.md | 10 ++++-- icechunk-python/examples/mpwrite.py | 48 +++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) create mode 100644 icechunk-python/examples/mpwrite.py diff --git a/docs/docs/icechunk-python/parallel.md b/docs/docs/icechunk-python/parallel.md index 9a830a9e..8a23a713 100644 --- a/docs/docs/icechunk-python/parallel.md +++ b/docs/docs/icechunk-python/parallel.md @@ -81,6 +81,12 @@ xr.testing.assert_identical(ds, ondisk) ## Distributed writes +!!! info + + This code will not execute with a `ProcessPoolExecutor` without [some changes](https://docs.python.org/3/library/multiprocessing.html#programming-guidelines). + Specifically it requires wrapping the code in a `if __name__ == "__main__":` block. + See a full executable example [here](https://github.com/earth-mover/icechunk/blob/main/icechunk-python/examples/mpwrite.py). + Any task execution framework (e.g. `ProcessPoolExecutor`, Joblib, Lithops, Dask Distributed, Ray, etc.) can be used instead of the `ThreadPoolExecutor`. However such workloads should account for Icehunk being a "stateful" store that records changes executed in a write session. @@ -123,8 +129,8 @@ with ProcessPoolExecutor() as executor: executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"]) ] - # grab the Session objects from each individual write task - sessions = [f.result() for f in futures] + # grab the Session objects from each individual write task + sessions = [f.result() for f in futures] # manually merge the remote sessions in to the local session session = merge_sessions(session, *sessions) diff --git a/icechunk-python/examples/mpwrite.py b/icechunk-python/examples/mpwrite.py new file mode 100644 index 00000000..a86f8a3a --- /dev/null +++ b/icechunk-python/examples/mpwrite.py @@ -0,0 +1,48 @@ +# An example of using multiprocessing to write to an Icechunk dataset + +import tempfile +from concurrent.futures import ProcessPoolExecutor + +import xarray as xr +from icechunk import Repository, Session, local_filesystem_storage +from icechunk.distributed import merge_sessions + + +def write_timestamp(*, itime: int, session: Session) -> Session: + # pass a list to isel to preserve the time dimension + ds = xr.tutorial.open_dataset("rasm").isel(time=[itime]) + # region="auto" tells Xarray to infer which "region" of the output arrays to write to. + ds.to_zarr(session.store, region="auto", consolidated=False) + return session + + +if __name__ == "__main__": + ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24)) + repo = Repository.create(local_filesystem_storage(tempfile.mkdtemp())) + session = repo.writable_session("main") + + chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims} + ds.to_zarr( + session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w" + ) + # this commit is optional, but may be useful in your workflow + session.commit("initialize store") + + session = repo.writable_session("main") + with ProcessPoolExecutor() as executor: + # opt-in to successful pickling of a writable session + with session.allow_pickling(): + # submit the writes + futures = [ + executor.submit(write_timestamp, itime=i, session=session) + for i in range(ds.sizes["time"]) + ] + # grab the Session objects from each individual write task + sessions = [f.result() for f in futures] + + # manually merge the remote sessions in to the local session + session = merge_sessions(session, *sessions) + session.commit("finished writes") + + ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False) + xr.testing.assert_identical(ds, ondisk)