Skip to content

Commit

Permalink
Add multi-processing write example (#779)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherian authored Feb 27, 2025
1 parent e78387c commit 1c60e5b
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 2 deletions.
10 changes: 8 additions & 2 deletions docs/docs/icechunk-python/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ xr.testing.assert_identical(ds, ondisk)

## Distributed writes

!!! info

This code will not execute with a `ProcessPoolExecutor` without [some changes](https://docs.python.org/3/library/multiprocessing.html#programming-guidelines).
Specifically it requires wrapping the code in a `if __name__ == "__main__":` block.
See a full executable example [here](https://github.com/earth-mover/icechunk/blob/main/icechunk-python/examples/mpwrite.py).

Any task execution framework (e.g. `ProcessPoolExecutor`, Joblib, Lithops, Dask Distributed, Ray, etc.)
can be used instead of the `ThreadPoolExecutor`. However such workloads should account for
Icehunk being a "stateful" store that records changes executed in a write session.
Expand Down Expand Up @@ -123,8 +129,8 @@ with ProcessPoolExecutor() as executor:
executor.submit(write_timestamp, itime=i, session=session)
for i in range(ds.sizes["time"])
]
# grab the Session objects from each individual write task
sessions = [f.result() for f in futures]
# grab the Session objects from each individual write task
sessions = [f.result() for f in futures]

# manually merge the remote sessions in to the local session
session = merge_sessions(session, *sessions)
Expand Down
48 changes: 48 additions & 0 deletions icechunk-python/examples/mpwrite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# An example of using multiprocessing to write to an Icechunk dataset

import tempfile
from concurrent.futures import ProcessPoolExecutor

import xarray as xr
from icechunk import Repository, Session, local_filesystem_storage
from icechunk.distributed import merge_sessions


def write_timestamp(*, itime: int, session: Session) -> Session:
# pass a list to isel to preserve the time dimension
ds = xr.tutorial.open_dataset("rasm").isel(time=[itime])
# region="auto" tells Xarray to infer which "region" of the output arrays to write to.
ds.to_zarr(session.store, region="auto", consolidated=False)
return session


if __name__ == "__main__":
ds = xr.tutorial.open_dataset("rasm").isel(time=slice(24))
repo = Repository.create(local_filesystem_storage(tempfile.mkdtemp()))
session = repo.writable_session("main")

chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims}
ds.to_zarr(
session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w"
)
# this commit is optional, but may be useful in your workflow
session.commit("initialize store")

session = repo.writable_session("main")
with ProcessPoolExecutor() as executor:
# opt-in to successful pickling of a writable session
with session.allow_pickling():
# submit the writes
futures = [
executor.submit(write_timestamp, itime=i, session=session)
for i in range(ds.sizes["time"])
]
# grab the Session objects from each individual write task
sessions = [f.result() for f in futures]

# manually merge the remote sessions in to the local session
session = merge_sessions(session, *sessions)
session.commit("finished writes")

ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)

0 comments on commit 1c60e5b

Please sign in to comment.