Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip]: Helper to create a collection from a summary object. #176

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions pctasks/core/pctasks/core/utils/dask_summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import contextlib
import functools
import json
import logging
import random
import typing

import azure.storage.blob
import dask
import dask.bag
import dask_kubernetes.operator

import pctasks.core.utils.summary

logger = logging.getLogger(__name__)


@dask.delayed
def list_prefixes(prefix: str, depth: int, storage_options: dict[str, typing.Any]):
prefix = prefix.rstrip("/") + "/"
d = prefix.count("/")
cc = azure.storage.blob.ContainerClient(**storage_options)
blob_names = []
with cc:
if d < depth:
prefixes = [x.name for x in cc.walk_blobs(prefix)]
xs = [list_prefixes(x, depth, storage_options) for x in prefixes]
for x in xs:
blob_names.extend(x.compute())
elif d == depth:
return [prefix]
return blob_names


@dask.delayed
def read_prefix(
x: str, storage_options: dict[str, typing.Any], fraction=1.0
) -> typing.List[bytes]:
cc = azure.storage.blob.ContainerClient(**storage_options)
assert 0 <= fraction <= 1.0

items: typing.List[bytes] = []
blobs = list(cc.list_blobs(x))
blobs = random.sample(blobs, int(len(blobs) * fraction))

with cc:
for blob in blobs:
content = cc.get_blob_client(blob).download_blob().readall()
items.append(content)

return items


def summarize_partition(
items: typing.List[typing.Dict],
) -> pctasks.core.utils.summary.ObjectSummary:
first, *rest = items
result = pctasks.core.utils.summary.ObjectSummary.summarize_dict(first)
for item in rest:
result = result.merge(
pctasks.core.utils.summary.ObjectSummary.summarize_dict(item)
)
return result


aggregate = lambda x: functools.reduce(lambda a, b: a.merge(b), x)


@contextlib.contextmanager
def get_compute():
with dask_kubernetes.operator.KubeCluster(
namespace="dask",
image="pccomponents.azurecr.io/pctasks-dask:2023.4.13.0",
resources={
"requests": {"memory": "7Gi", "cpu": "0.9"},
"limit": {"memory": "8Gi", "cpu": "1"},
},
) as cluster:
with cluster.get_client() as client:
cluster.scale(8)
print(client.dashboard_link)
yield client


def summarize(prefix, depth, storage_options):
"""
prefix="OLCI/OL_2_LFR___/", depth=5
"""
with get_compute():
logger.info("Listing prefixes. prefix=%s", prefix)
prefixes = list_prefixes(prefix, depth, storage_options).compute()
logger.info("prefix_count=%d", len(prefixes))
bag = dask.bag.from_delayed(
[read_prefix(x, storage_options) for x in prefixes]
).map(json.loads)
summary = bag.reduction(summarize, aggregate).compute()

return summary
146 changes: 145 additions & 1 deletion pctasks/core/pctasks/core/utils/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@


class SummarySettings(BaseModel):

max_distinct_values = 4
"""The max number of distinct values to collect in a distinct value summary.

Expand Down Expand Up @@ -853,3 +852,148 @@ def empty(cls) -> "ObjectSummary":
ObjectListSummary.update_forward_refs()
ObjectPropertySummary.update_forward_refs()
MixedObjectListSummary.update_forward_refs()


def make_collection(
summary: ObjectSummary,
collection_id: str,
keywords: list[str] | None = None,
stac_extensions: list[str] | None = None,
title: str | None = None,
description: str = "{{ collection.description }}",
links: list[str] | None = None,
assets: dict[str, dict] | None = None,
extra_summary_exclude: set[str] | None = None,
item_assets_exclude: set[str] | None = None,
extra_fields: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Create a STAC collection from an ObjectSummary.

Parameters
----------
summary
The summary of the items in the collection, built from analyzing many items
and merging the results.
collection_id
The ID of the STAC collection.
keywords
An optional list of keywords to include in the collection.
stac_extensions
An optional list of STAC extensions to include in the collection.
title
An optional title for the collection.
description
An optional description for the collection.
links
Optional list of links to include in the collection.
assets
Optional mapping of collection-level assets for the collection.
extra_summary_exclude
Additional keys to exclude from ``summaries``. By default, all
keys in the ``properties`` will be included *except*

- datetime
- start_datetime
- end_datetime

item_assets_exclude
A set of keys to exclude from the automatic ``item_assets``.
For example, passing ``item_assets_exclude={'eo:bands'}`` will
prevent the ``eo:bands`` property from being included in
the ``item_assets`` for any asset that does have ``eo:bands``.

extra_fields
A mapping of additional fields to include on the collection.

Returns
-------
dict
The dictionary containing the STAC collection.

Notes
-----
This returns ...
"""
# TODO: auto-include item_assets when finding some
# TODO: flag to enable / disable item_assets, summaries
# TODO: Cusom merge for `geometry` type to get the union (for extent)
asset_summary = summary.keys["assets"].summary

item_assets = {}
item_assets_exclude = item_assets_exclude or set()

for k, asset_summary in summary.keys["assets"].summary.keys.items():
# assuming we'll move these from description to title
# TODO: assert one
item_assets[k] = {}

for field, value in asset_summary.summary.keys.items():
if value.type == "distinct" and field not in item_assets_exclude:
item_assets[k][field] = (
asset_summary.summary.keys[field].values[0].value
)

if eo_bands := asset_summary.summary.keys.get("eo:bands"):
item_assets[k]["eo:bands"] = [
{
"name": band.keys["name"].values[0].value,
"description": band.keys["description"].values[0].value,
"center_wavelength": band.keys["center_wavelength"].values[0].value,
"band_width": band.keys["band_width"].values[0].value,
}
for band in eo_bands.values
]

collection = {
"stac_version": "1.0.0",
"id": collection_id,
"type": "Collection",
"description": description,
"links": links or [],
"keywords": keywords or [],
"stac_extensions": stac_extensions or [],
"summaries": {},
"item_assets": item_assets,
}

optional = {"title": title, "assets": assets}
for k, v in optional.items():
if v is not None:
collection[k] = v

exclude = {
"start_datetime",
"end_datetime",
"datetime",
}

exclude |= extra_summary_exclude or set()

properties = summary.keys["properties"].summary

for key, summary_value in properties.keys.items():
if key in exclude:
continue
value = None
# if key == "constellation":
# breakpoint()
if summary_value.type == "distinct":
if summary_value.type == "string":
value = summary_value.values[0]
value = [value.value]
else:
value = [value.value for value in summary_value.values]
elif summary_value.type in ("int-range", "float-range"):
value = {
"minimum": summary_value.min,
"maximum": summary_value.max,
}
else:
value = [x.value for x in summary_value.values]

collection["summaries"][key] = value

collection.update(extra_fields or {})

return collection
Loading