diff --git a/docs/docs/icechunk-python/parallel.md b/docs/docs/icechunk-python/parallel.md index 9a830a9e..c9970683 100644 --- a/docs/docs/icechunk-python/parallel.md +++ b/docs/docs/icechunk-python/parallel.md @@ -16,7 +16,7 @@ including those executed remotely in a multi-processing or any other remote exec Here is how you can execute such writes with Icechunk, illustrate with a `ThreadPoolExecutor`. First read some example data, and create an Icechunk Repository. -```python +```python exec="on" session="parallel" source="material-block" import xarray as xr import tempfile from icechunk import Repository, local_filesystem_storage @@ -29,7 +29,7 @@ session = repo.writable_session("main") We will orchestrate so that each task writes one timestep. This is an arbitrary choice but determines what we set for the Zarr chunk size. -```python +```python exec="on" session="parallel" source="material-block" chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims} ``` @@ -37,17 +37,17 @@ Initialize the dataset using [`Dataset.to_zarr`](https://docs.xarray.dev/en/stab and `compute=False`, this will NOT write any chunked array data, but will write all array metadata, and any in-memory arrays (only `time` in this case). -```python +```python exec="on" session="parallel" source="material-block" ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w") # this commit is optional, but may be useful in your workflow -session.commit("initialize store") +print(session.commit("initialize store")) ``` ## Multi-threading First define a function that constitutes one "write task". -```python +```python exec="on" session="parallel" source="material-block" from icechunk import Session def write_timestamp(*, itime: int, session: Session) -> None: @@ -59,7 +59,7 @@ def write_timestamp(*, itime: int, session: Session) -> None: Now execute the writes. -```python +```python exec="on" session="parallel" source="material-block" from concurrent.futures import ThreadPoolExecutor, wait from icechunk.distributed import merge_sessions @@ -69,12 +69,12 @@ with ThreadPoolExecutor() as executor: futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])] wait(futures) -session.commit("finished writes") +print(session.commit("finished writes")) ``` Verify that the writes worked as expected: -```python +```python exec="on" session="parallel" source="material-block" ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False) xr.testing.assert_identical(ds, ondisk) ``` @@ -96,7 +96,7 @@ There are three key points to keep in mind: First we modify `write_task` to return the `Session`: -```python +```python exec="on" session="parallel" source="material-block" from icechunk import Session def write_timestamp(*, itime: int, session: Session) -> Session: @@ -110,7 +110,7 @@ def write_timestamp(*, itime: int, session: Session) -> Session: Now we issue write tasks within the [`session.allow_pickling()`](./reference/md#icechunk.Session.allow_pickling) context, gather the Sessions from individual tasks, merge them, and make a successful commit. -```python +```python exec="on" session="parallel" source="material-block" from concurrent.futures import ProcessPoolExecutor from icechunk.distributed import merge_sessions @@ -128,12 +128,12 @@ with ProcessPoolExecutor() as executor: # manually merge the remote sessions in to the local session session = merge_sessions(session, *sessions) -session.commit("finished writes") +print(session.commit("finished writes")) ``` Verify that the writes worked as expected: -```python +```python exec="on" session="parallel" source="material-block" ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False) xr.testing.assert_identical(ds, ondisk) ```