Skip to content

Commit

Permalink
docs: add parallel to executed docs
Browse files Browse the repository at this point in the history
  • Loading branch information
ianhi committed Feb 24, 2025
1 parent 506ce49 commit 27e3e89
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions docs/docs/icechunk-python/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ including those executed remotely in a multi-processing or any other remote exec
Here is how you can execute such writes with Icechunk, illustrate with a `ThreadPoolExecutor`.
First read some example data, and create an Icechunk Repository.

```python
```python exec="on" session="parallel" source="material-block"
import xarray as xr
import tempfile
from icechunk import Repository, local_filesystem_storage
Expand All @@ -29,25 +29,25 @@ session = repo.writable_session("main")
We will orchestrate so that each task writes one timestep.
This is an arbitrary choice but determines what we set for the Zarr chunk size.

```python
```python exec="on" session="parallel" source="material-block"
chunks = {1 if dim == "time" else ds.sizes[dim] for dim in ds.Tair.dims}
```

Initialize the dataset using [`Dataset.to_zarr`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.to_zarr.html)
and `compute=False`, this will NOT write any chunked array data, but will write all array metadata, and any
in-memory arrays (only `time` in this case).

```python
```python exec="on" session="parallel" source="material-block"
ds.to_zarr(session.store, compute=False, encoding={"Tair": {"chunks": chunks}}, mode="w")
# this commit is optional, but may be useful in your workflow
session.commit("initialize store")
print(session.commit("initialize store"))
```

## Multi-threading

First define a function that constitutes one "write task".

```python
```python exec="on" session="parallel" source="material-block"
from icechunk import Session

def write_timestamp(*, itime: int, session: Session) -> None:
Expand All @@ -59,7 +59,7 @@ def write_timestamp(*, itime: int, session: Session) -> None:

Now execute the writes.

```python
```python exec="on" session="parallel" source="material-block"
from concurrent.futures import ThreadPoolExecutor, wait
from icechunk.distributed import merge_sessions

Expand All @@ -69,12 +69,12 @@ with ThreadPoolExecutor() as executor:
futures = [executor.submit(write_timestamp, itime=i, session=session) for i in range(ds.sizes["time"])]
wait(futures)

session.commit("finished writes")
print(session.commit("finished writes"))
```

Verify that the writes worked as expected:

```python
```python exec="on" session="parallel" source="material-block"
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
```
Expand All @@ -96,7 +96,7 @@ There are three key points to keep in mind:

First we modify `write_task` to return the `Session`:

```python
```python exec="on" session="parallel" source="material-block"
from icechunk import Session

def write_timestamp(*, itime: int, session: Session) -> Session:
Expand All @@ -110,7 +110,7 @@ def write_timestamp(*, itime: int, session: Session) -> Session:
Now we issue write tasks within the [`session.allow_pickling()`](./reference/md#icechunk.Session.allow_pickling) context, gather the Sessions from individual tasks,
merge them, and make a successful commit.

```python
```python exec="on" session="parallel" source="material-block"
from concurrent.futures import ProcessPoolExecutor
from icechunk.distributed import merge_sessions

Expand All @@ -128,12 +128,12 @@ with ProcessPoolExecutor() as executor:

# manually merge the remote sessions in to the local session
session = merge_sessions(session, *sessions)
session.commit("finished writes")
print(session.commit("finished writes"))
```

Verify that the writes worked as expected:

```python
```python exec="on" session="parallel" source="material-block"
ondisk = xr.open_zarr(repo.readonly_session("main").store, consolidated=False)
xr.testing.assert_identical(ds, ondisk)
```

0 comments on commit 27e3e89

Please sign in to comment.