Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python can do distributed writes #131

Merged
merged 3 commits into from
Oct 3, 2024
Merged

Python can do distributed writes #131

merged 3 commits into from
Oct 3, 2024

Conversation

paraseba
Copy link
Collaborator

@paraseba paraseba commented Oct 2, 2024

It's quite hacky, we need better usability, but it works.

There is a new python test that uses Dask to do concurrent writers on the same array.

Serialization works like this:

coordinator -> worker: we serialize a config dict, workers use it to instantiate the Store

done worker -> coordinator: we serialize the bytes of the change set, the coordinator uses that to do a distributed commit

Both paths can (and should) be improved, making objects picklable would help, it's not trivial in pyo3 but doable.

It's quite hacky, we need better usability, but it works.

There is a new python test that uses Dask to do concurrent writers on
the same array.

Serialization works like this:

coordinator -> worker: we serialize a config dict, workers use it
to instantiate the Store

done worker -> coordinator: we serialize the bytes of the change set,
the coordinator uses that to do a distributed commit

Both paths can (and should) be improved, making objects picklable would
help, it's not trivial in pyo3 but doable.
Copy link
Contributor

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a functional point of view, I think this is a great proof of concept that we have all the building blocks we need to do a distributed write from Python! 🚀

From an API point of view, I think we can do a lot better. I'm not sure if that's the priority right now. But here is how I would think about it.

Ultimately, in order for Dask to work, we will need to be able to serialize a Zarr Array, which refers to an Icechunk Store. So the Store needs to be serializable. We need to be able to take a store and send it over the network (via pickle).

This is straightforward to implement (we have to implement __setstate__ and __getstate__ for the class, and we can do this at the Python level if it's hard in Rust world.)

If we could also serialize up the changeset in this same same pickle, then we could do something like the following:

store = mk_store()
results = client.map(mutate_store, store)  # do some distributed work on each store
store += sum(results)  # each result is also a store
store.commit("distributed write")

getstate on store would look something like this

class Store:

    def __getstate__(self):
        return self.store_config, self.change_set

Overall, I think that making Store serializable is a good goal to work towards, and it would simplify this demo a lot.

group = zarr.group(store=store, overwrite=False)
array = group["array"]
array[task.area] = generate_task_array(task)
return store.change_set_bytes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to be able to just return the store!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we just need to hack a few things around to make it work nicely. Decide if we want to serialize the config classes or bytes, etc. But this is the direction we'll take



async def execute_task(task: Task):
store = await mk_store("w", task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to be able to pass the store as an argument to the task.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

# we can use the current store as the commit coordinator, because it doesn't have any pending changes,
# all changes come from the tasks, Icechunk doesn't care about where the changes come from, the only
# important thing is to not count changes twice
commit_res = await store.distributed_commit("distributed commit", change_sets_bytes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of having a different method for distributed commit vs regular commit.

I'd rather be able to merge the change sets into the store, or even better, merge stores themselves, and call regular commit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea, yes, API can improve a lot

@mpiannucci mpiannucci self-requested a review October 3, 2024 17:24
Copy link
Contributor

@mpiannucci mpiannucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving for incremental progress as a base to work with toward full serialiation

@paraseba paraseba merged commit 70e707f into main Oct 3, 2024
3 checks passed
@paraseba paraseba deleted the seba/distributed-python branch October 3, 2024 17:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants