Skip to content

Commit

Permalink
fix: don't take up more memory than needed when calculating schema
Browse files Browse the repository at this point in the history
Previously, we expanded the iterator of rows we were handed and turned
it into an in-memory list.

This isn't actually necessary - we can just iterate through the rows
and calculate what we need.
  • Loading branch information
mikix committed Jan 29, 2025
1 parent c6a4698 commit 503c739
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cumulus_fhir_support/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""FHIR support code for the Cumulus project"""

__version__ = "1.3.0"
__version__ = "1.3.1"

from .ml_json import list_multiline_json_in_dir, read_multiline_json, read_multiline_json_from_dir
from .schemas import pyarrow_schema_from_rows
35 changes: 17 additions & 18 deletions cumulus_fhir_support/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,26 @@ def pyarrow_schema_from_rows(
:param rows: optionally a set of JSON FHIR resources to ensure are covered by the schema
:returns: a PyArrow schema that covers the unified shape of all provided rows
"""
rows = list(rows or [])
rows = rows or []

# Examine batch to see the full shape of it, in order to detect any deeply nested fields
# that we want to make sure to include in the final schema (normally, we go wide but only as
# deep as we need to)
batch_shape = _get_shape_of_dicts(None, rows)

# Note: be careful to only iterate through `rows` once, to allow passing in pure iterables.
batch_shape = {}
contained_types = set()
for row in rows:
# Build up a complete picture of the shape of all rows
batch_shape = _get_shape_of_dicts(batch_shape, row)

# Also gather up which kind of contained resources exist.
for contained_obj in row.get("contained", []):
if contained_type := contained_obj.get("resourceType"):
contained_types.add(contained_type)

# Now actually create the schema
schema = _create_pyarrow_schema_for_resource(resource_type, batch_shape)
schema = _include_contained_schemas(schema, rows, batch_shape)
schema = _include_contained_schemas(schema, contained_types, batch_shape)
return schema


Expand Down Expand Up @@ -86,7 +97,7 @@ def _get_shape_of_dicts(total_shape: Optional[dict], item: Any) -> dict:


def _include_contained_schemas(
schema: pyarrow.Schema, rows: list[dict], batch_shape: dict
schema: pyarrow.Schema, contained_types: set[str], batch_shape: dict
) -> pyarrow.Schema:
"""
This will include all contained resource schemas into one big contained schema.
Expand All @@ -97,25 +108,13 @@ def _include_contained_schemas(
Also see https://github.com/smart-on-fhir/cumulus-etl/issues/250 for discussion
of whether it is wise to just comingle the schemas like this.
"""
# Grab all contained resource types that we have in the source data,
# which will inform the expected schema inside there.
contained_types = sorted(
filter(
None,
{
contained_obj.get("resourceType")
for row in rows
for contained_obj in row.get("contained", [])
},
)
)
if not contained_types:
return schema # no need to do anything
contained_shape = batch_shape.get("contained")

# Allow any found fields in any of the contained types
fields = {}
for contained_type in contained_types:
for contained_type in sorted(contained_types):
subschema = _create_pyarrow_schema_for_resource(contained_type, contained_shape, wide=False)
for name in subschema.names:
fields[name] = subschema.field(name) # will overwrite previous field of same name
Expand Down

0 comments on commit 503c739

Please sign in to comment.