-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Python can do distributed writes #131
Conversation
It's quite hacky, we need better usability, but it works. There is a new python test that uses Dask to do concurrent writers on the same array. Serialization works like this: coordinator -> worker: we serialize a config dict, workers use it to instantiate the Store done worker -> coordinator: we serialize the bytes of the change set, the coordinator uses that to do a distributed commit Both paths can (and should) be improved, making objects picklable would help, it's not trivial in pyo3 but doable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From a functional point of view, I think this is a great proof of concept that we have all the building blocks we need to do a distributed write from Python! 🚀
From an API point of view, I think we can do a lot better. I'm not sure if that's the priority right now. But here is how I would think about it.
Ultimately, in order for Dask to work, we will need to be able to serialize a Zarr Array, which refers to an Icechunk Store. So the Store needs to be serializable. We need to be able to take a store and send it over the network (via pickle).
This is straightforward to implement (we have to implement __setstate__
and __getstate__
for the class, and we can do this at the Python level if it's hard in Rust world.)
If we could also serialize up the changeset in this same same pickle, then we could do something like the following:
store = mk_store()
results = client.map(mutate_store, store) # do some distributed work on each store
store += sum(results) # each result is also a store
store.commit("distributed write")
getstate on store would look something like this
class Store:
def __getstate__(self):
return self.store_config, self.change_set
Overall, I think that making Store serializable is a good goal to work towards, and it would simplify this demo a lot.
group = zarr.group(store=store, overwrite=False) | ||
array = group["array"] | ||
array[task.area] = generate_task_array(task) | ||
return store.change_set_bytes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to be able to just return the store!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we just need to hack a few things around to make it work nicely. Decide if we want to serialize the config classes or bytes, etc. But this is the direction we'll take
|
||
|
||
async def execute_task(task: Task): | ||
store = await mk_store("w", task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to be able to pass the store as an argument to the task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
# we can use the current store as the commit coordinator, because it doesn't have any pending changes, | ||
# all changes come from the tasks, Icechunk doesn't care about where the changes come from, the only | ||
# important thing is to not count changes twice | ||
commit_res = await store.distributed_commit("distributed commit", change_sets_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a big fan of having a different method for distributed commit vs regular commit.
I'd rather be able to merge the change sets into the store, or even better, merge stores themselves, and call regular commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, yes, API can improve a lot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving for incremental progress as a base to work with toward full serialiation
It's quite hacky, we need better usability, but it works.
There is a new python test that uses Dask to do concurrent writers on the same array.
Serialization works like this:
coordinator -> worker: we serialize a config dict, workers use it to instantiate the Store
done worker -> coordinator: we serialize the bytes of the change set, the coordinator uses that to do a distributed commit
Both paths can (and should) be improved, making objects picklable would help, it's not trivial in pyo3 but doable.