Skip to content

Commit

Permalink
ruff
Browse files Browse the repository at this point in the history
  • Loading branch information
paraseba committed Oct 4, 2024
1 parent 2f1537a commit cd097b5
Showing 1 changed file with 101 additions and 28 deletions.
129 changes: 101 additions & 28 deletions icechunk-python/examples/dask_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
python ./examples/dask_write.py verify --name test --t-from 0 --t-to 1500 --workers 16
```
"""

import argparse
from dataclasses import dataclass
import time
Expand Down Expand Up @@ -50,6 +51,7 @@ def generate_task_array(task: Task, shape):

async def execute_write_task(task: Task):
from zarr import config

config.set({"async.concurrency": 10})

store = await mk_store("w", task)
Expand All @@ -58,7 +60,7 @@ async def execute_write_task(task: Task):
array = group["array"]
print(f"Writing at t={task.time}")
data = generate_task_array(task, array.shape[0:2])
array[:,:,task.time] = data
array[:, :, task.time] = data
print(f"Writing at t={task.time} done")
if task.sleep != 0:
print(f"Sleeping for {task.sleep} secs")
Expand All @@ -72,26 +74,31 @@ async def execute_read_task(task: Task):
group = zarr.group(store=store, overwrite=False)
array = group["array"]

actual = array[:,:,task.time]
actual = array[:, :, task.time]
expected = generate_task_array(task, array.shape[0:2])
np.testing.assert_array_equal(actual, expected)


def run_write_task(task: Task):
return asyncio.run(execute_write_task(task))


def run_read_task(task: Task):
return asyncio.run(execute_read_task(task))


def storage_config(args):
prefix = f"seba-tests/icechunk/{args.name}"
return {
"bucket":"arraylake-test",
"bucket": "arraylake-test",
"prefix": prefix,
}


def store_config(args):
return {"inline_chunk_threshold_bytes": 1}


async def create(args):
store = await icechunk.IcechunkStore.open(
storage=icechunk.StorageConfig.s3_from_env(**storage_config(args)),
Expand All @@ -100,10 +107,14 @@ async def create(args):
)

group = zarr.group(store=store, overwrite=True)
shape = (args.x_chunks * args.chunk_x_size, args.y_chunks * args.chunk_y_size, args.t_chunks * 1)
shape = (
args.x_chunks * args.chunk_x_size,
args.y_chunks * args.chunk_y_size,
args.t_chunks * 1,
)
chunk_shape = (args.chunk_x_size, args.chunk_y_size, 1)

array = group.create_array(
group.create_array(
"array",
shape=shape,
chunk_shape=chunk_shape,
Expand All @@ -113,6 +124,7 @@ async def create(args):
_first_snap = await store.commit("array created")
print("Array initialized")


async def update(args):
storage_conf = storage_config(args)
store_conf = store_config(args)
Expand All @@ -133,7 +145,11 @@ async def update(args):
store_config=store_conf,
time=time,
seed=time,
sleep=max(0, args.max_sleep - ((args.max_sleep - args.min_sleep)/args.sleep_tasks * time))
sleep=max(
0,
args.max_sleep
- ((args.max_sleep - args.min_sleep) / args.sleep_tasks * time),
),
)
for time in range(args.t_from, args.t_to, 1)
]
Expand All @@ -151,6 +167,7 @@ async def update(args):
assert commit_res
print("Distributed commit done")


async def verify(args):
storage_conf = storage_config(args)
store_conf = store_config(args)
Expand All @@ -171,7 +188,7 @@ async def verify(args):
store_config=store_conf,
time=time,
seed=time,
sleep=0
sleep=0,
)
for time in range(args.t_from, args.t_to, 1)
]
Expand All @@ -180,8 +197,7 @@ async def verify(args):

map_result = client.map(run_read_task, tasks)
client.gather(map_result)
print(f"done, all good")

print("done, all good")


async def distributed_write():
Expand All @@ -198,30 +214,87 @@ async def distributed_write():
subparsers = global_parser.add_subparsers(title="subcommands", required=True)

create_parser = subparsers.add_parser("create", help="create repo and array")
create_parser.add_argument("--x-chunks", type=int, help="number of chunks in the x dimension", default=4)
create_parser.add_argument("--y-chunks", type=int, help="number of chunks in the y dimension", default=4)
create_parser.add_argument("--t-chunks", type=int, help="number of chunks in the t dimension", default=1000)
create_parser.add_argument("--chunk-x-size", type=int, help="size of chunks in the x dimension", default=112)
create_parser.add_argument("--chunk-y-size", type=int, help="size of chunks in the y dimension", default=112)
create_parser.add_argument("--name", type=str, help="repository name", required=True)
create_parser.add_argument(
"--x-chunks", type=int, help="number of chunks in the x dimension", default=4
)
create_parser.add_argument(
"--y-chunks", type=int, help="number of chunks in the y dimension", default=4
)
create_parser.add_argument(
"--t-chunks", type=int, help="number of chunks in the t dimension", default=1000
)
create_parser.add_argument(
"--chunk-x-size",
type=int,
help="size of chunks in the x dimension",
default=112,
)
create_parser.add_argument(
"--chunk-y-size",
type=int,
help="size of chunks in the y dimension",
default=112,
)
create_parser.add_argument(
"--name", type=str, help="repository name", required=True
)
create_parser.set_defaults(command="create")


update_parser = subparsers.add_parser("update", help="add chunks to the array")
update_parser.add_argument("--t-from", type=int, help="time position where to start adding chunks (included)", required=True)
update_parser.add_argument("--t-to", type=int, help="time position where to stop adding chunks (not included)", required=True)
update_parser.add_argument("--workers", type=int, help="number of workers to use", required=True)
update_parser.add_argument("--name", type=str, help="repository name", required=True)
update_parser.add_argument("--max-sleep", type=float, help="initial tasks sleep by these many seconds", default=0.3)
update_parser.add_argument("--min-sleep", type=float, help="last task that sleeps does it by these many seconds, a ramp from --max-sleep", default=0)
update_parser.add_argument("--sleep-tasks", type=int, help="this many tasks sleep", default=0.3)
update_parser.add_argument(
"--t-from",
type=int,
help="time position where to start adding chunks (included)",
required=True,
)
update_parser.add_argument(
"--t-to",
type=int,
help="time position where to stop adding chunks (not included)",
required=True,
)
update_parser.add_argument(
"--workers", type=int, help="number of workers to use", required=True
)
update_parser.add_argument(
"--name", type=str, help="repository name", required=True
)
update_parser.add_argument(
"--max-sleep",
type=float,
help="initial tasks sleep by these many seconds",
default=0.3,
)
update_parser.add_argument(
"--min-sleep",
type=float,
help="last task that sleeps does it by these many seconds, a ramp from --max-sleep",
default=0,
)
update_parser.add_argument(
"--sleep-tasks", type=int, help="this many tasks sleep", default=0.3
)
update_parser.set_defaults(command="update")

update_parser = subparsers.add_parser("verify", help="verify array chunks")
update_parser.add_argument("--t-from", type=int, help="time position where to start adding chunks (included)", required=True)
update_parser.add_argument("--t-to", type=int, help="time position where to stop adding chunks (not included)", required=True)
update_parser.add_argument("--workers", type=int, help="number of workers to use", required=True)
update_parser.add_argument("--name", type=str, help="repository name", required=True)
update_parser.add_argument(
"--t-from",
type=int,
help="time position where to start adding chunks (included)",
required=True,
)
update_parser.add_argument(
"--t-to",
type=int,
help="time position where to stop adding chunks (not included)",
required=True,
)
update_parser.add_argument(
"--workers", type=int, help="number of workers to use", required=True
)
update_parser.add_argument(
"--name", type=str, help="repository name", required=True
)
update_parser.set_defaults(command="verify")

args = global_parser.parse_args()
Expand All @@ -233,6 +306,6 @@ async def distributed_write():
case "verify":
await verify(args)


if __name__ == "__main__":
asyncio.run(distributed_write())

0 comments on commit cd097b5

Please sign in to comment.