Skip to content

Commit

Permalink
[AG-1621] Adds dataversion Uploading & [AG-543] Fixes Manifest Vers…
Browse files Browse the repository at this point in the history
…ion Bug (#168)

* adds dataversion uploading

* updates tests

* fixes manifest version bug

* reverses logic for clarity

* handle no id
  • Loading branch information
BWMac authored Feb 3, 2025
1 parent 306e9c5 commit 5e592f8
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 18 deletions.
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ destination: &dest syn12177492
staging_path: ./staging
gx_folder: syn52948668
gx_table: syn60527066
team_images_id: syn12861877
sources:
- genes_biodomains:
genes_biodomains_files: &genes_biodomains_files
Expand Down
70 changes: 63 additions & 7 deletions src/agoradatatools/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Union
from typing import Optional, Union

import synapseclient
from pandas import DataFrame
Expand Down Expand Up @@ -69,6 +69,43 @@ def apply_custom_transformations(
return None


def upload_dataversion_metadata(
syn: synapseclient.Synapse,
file_id: str,
file_version: str,
staging_path: str,
destination: str,
team_images_id: Optional[str] = None,
) -> None:
"""Uploads dataversion.json file to Synapse with metadata about the manifest file.
Model-AD runs do not have a team_images_id, which will be left out of the dataversion.json file.
Args:
syn (synapseclient.Synapse): Synapse client session
file_id (str): Synapse ID of the manifest file
file_version (str): Version number of the manifest file
staging_path (str): Path to the staging directory
destination (str): Synapse ID of the destination folder
team_images_id (str, optional): Synapse ID of the team_images folder if provided. Defaults to None.
"""
dataversion_dict = {
"data_file": file_id,
"data_version": file_version,
}
if team_images_id:
dataversion_dict["team_images_id"] = team_images_id

dataversion_json_path = load.dict_to_json(
df=dataversion_dict, staging_path=staging_path, filename="dataversion.json"
)
load.load(
file_path=dataversion_json_path,
provenance=[file_id],
destination=destination,
syn=syn,
)


@log_time(func_name="process_dataset", logger=logger)
def process_dataset(
dataset_obj: dict,
Expand Down Expand Up @@ -193,9 +230,9 @@ def process_dataset(


def create_data_manifest(
syn: synapseclient.Synapse, parent: synapseclient.Folder = None
syn: synapseclient.Synapse, parent: Union[synapseclient.Folder, str] = None
) -> Union[DataFrame, None]:
"""Creates data manifest (dataframe) that has the IDs and version numbers of child synapse folders
"""Creates data manifest (dataframe) that has the IDs and version numbers of child synapse files
Args:
syn (synapseclient.Synapse): Synapse client session.
Expand All @@ -208,12 +245,21 @@ def create_data_manifest(
if not parent:
return None

folders = syn.getChildren(parent)
folder = [
{"id": folder["id"], "version": folder["versionNumber"]} for folder in folders
files = syn.getChildren(parent)

manifest_rows = [
{
"id": file["id"],
"version": (
file["versionNumber"] + 1
if file["name"] == "data_manifest.csv"
else file["versionNumber"]
),
}
for file in files
]

return DataFrame(folder)
return DataFrame(manifest_rows)


@log_time(func_name="process_all_files", logger=logger)
Expand Down Expand Up @@ -292,6 +338,16 @@ def process_all_files(
destination=destination,
syn=syn,
)

upload_dataversion_metadata(
syn=syn,
file_id=file_id,
file_version=file_version,
team_images_id=config.get("team_images_id", None),
staging_path=staging_path,
destination=destination,
)

reporter.data_manifest_file = file_id
reporter.data_manifest_version = file_version
reporter.data_manifest_link = DatasetReport.format_link(
Expand Down
1 change: 1 addition & 0 deletions test_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ destination: &dest syn17015333
staging_path: ./staging
gx_folder: syn52948670
gx_table: syn60527065
team_images_id: syn12861877
sources:
- genes_biodomains:
genes_biodomains_files: &genes_biodomains_files
Expand Down
124 changes: 113 additions & 11 deletions tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,91 @@
import pandas as pd
import pytest

from synapseclient import File

from agoradatatools import process
from agoradatatools.errors import ADTDataProcessingError
from agoradatatools.etl import load, utils, extract
from agoradatatools.reporter import DatasetReport, ADTGXReporter
from agoradatatools.constants import Platform
from agoradatatools.gx import GreatExpectationsRunner


STAGING_PATH = "./staging"
GX_FOLDER = "test_folder"


class TestUploadDataversionMetadata:
file_id = "syn1111111"
file_version = "1"
team_images_id = "syn12861877"
destination = "syn1111113"
dataversion_dict_with_team_images_id = {
"data_file": file_id,
"data_version": file_version,
"team_images_id": team_images_id,
}
dataversion_dict_without_team_images_id = {
"data_file": file_id,
"data_version": file_version,
}

@pytest.fixture(scope="function", autouse=True)
def setup_method(self):
self.patch_dict_to_json = patch.object(
load, "dict_to_json", return_value="path/to/json"
).start()
self.patch_load = patch.object(load, "load", return_value=("syn123", 1)).start()

def test_upload_dataversion_metadata_with_team_images_id(self, syn: Any):
# WHEN I call upload_dataversion_metadata with a team_images_id
process.upload_dataversion_metadata(
syn=syn,
file_id=self.file_id,
file_version=self.file_version,
team_images_id=self.team_images_id,
staging_path=STAGING_PATH,
destination=self.destination,
)
# THEN I expect the dict_to_json function to be called with the correct arguments
self.patch_dict_to_json.assert_called_once_with(
df=self.dataversion_dict_with_team_images_id,
staging_path=STAGING_PATH,
filename="dataversion.json",
)
# AND I expect the load function to be called with the correct arguments
self.patch_load.assert_called_once_with(
file_path="path/to/json",
provenance=[self.file_id],
destination=self.destination,
syn=syn,
)

def test_upload_dataversion_metadata_without_team_images_id(self, syn: Any):
# WHEN I call upload_dataversion_metadata without a team_images_id
process.upload_dataversion_metadata(
syn=syn,
file_id=self.file_id,
file_version=self.file_version,
staging_path=STAGING_PATH,
destination=self.destination,
team_images_id=None,
)
# THEN I expect the dict_to_json function to be called with the correct arguments
self.patch_dict_to_json.assert_called_once_with(
df=self.dataversion_dict_without_team_images_id,
staging_path=STAGING_PATH,
filename="dataversion.json",
)
# AND I expect the load function to be called with the correct arguments
self.patch_load.assert_called_once_with(
file_path="path/to/json",
provenance=[self.file_id],
destination=self.destination,
syn=syn,
)


class TestProcessDataset:
dataset_object = {
"neuropath_corr": {
Expand Down Expand Up @@ -383,27 +457,40 @@ def test_process_dataset_upload_false_gx_enabled(self, syn: Any):


class TestCreateDataManifest:
files = [
File(id="syn123", name="not_a_manifest", versionNumber=1),
File(id="syn456", name="data_manifest.csv", versionNumber=1),
]
manifest_rows = [
{"id": "syn123", "version": 1},
{"id": "syn456", "version": 2},
]

@pytest.fixture(scope="function", autouse=True)
def setup_method(self, syn: Any):
self.patch_syn_login = patch.object(
utils, "_login_to_synapse", return_value=syn
).start()
self.patch_get_children = patch.object(
syn, "getChildren", return_value=[{"id": "123", "versionNumber": 1}]
syn, "getChildren", return_value=self.files
).start()

def teardown_method(self):
mock.patch.stopall()

def test_create_data_manifest_parent_none(self, syn: Any):
assert process.create_data_manifest(syn=syn, parent=None) is None
self.patch_syn_login.assert_not_called()

def test_create_data_manifest_no_none(self, syn: Any):
df = process.create_data_manifest(syn=syn, parent="syn1111111")
# WHEN I call create_data_manifest with a parent of None
result = process.create_data_manifest(syn=syn, parent=None)
# THEN I expect the result to be None
assert result is None
# AND I expect the getChildren method to not be called
self.patch_get_children.assert_not_called()

def test_create_data_manifest_with_parent(self, syn: Any):
# WHEN I call create_data_manifest with a parent
result_df = process.create_data_manifest(syn=syn, parent="syn1111111")
# THEN I expect the getChildren method to be called with the parent
self.patch_get_children.assert_called_once_with("syn1111111")
self.patch_syn_login.assert_not_called()
assert isinstance(df, pd.DataFrame)
# AND I expect the result to be a dataframe with the correct rows
# Including incrementing the version number for the data_manifest.csv file
pd.testing.assert_frame_equal(result_df, pd.DataFrame(self.manifest_rows))


class TestProcessAllFiles:
Expand All @@ -430,6 +517,7 @@ def setup_method(self):
"gx_folder": GX_FOLDER,
"gx_table": "syn321",
"staging_path": STAGING_PATH,
"team_images_id": "syn987",
"datasets": [{"a": {"b": "c"}}, {"d": {"e": "f"}}, {"g": {"h": "i"}}],
},
).start()
Expand All @@ -455,6 +543,9 @@ def setup_method(self):
load, "df_to_csv", return_value="path/to/csv"
).start()
self.patch_load = patch.object(load, "load", return_value=("syn123", 1)).start()
self.patch_upload_dataversion_metadata = patch.object(
process, "upload_dataversion_metadata", return_value=None
).start()
self.patch_update_table = patch.object(
ADTGXReporter,
"update_table",
Expand Down Expand Up @@ -505,6 +596,7 @@ def test_process_all_files_upload_false(self, syn: Any):
staging_path=STAGING_PATH,
filename="data_manifest.csv",
)
self.patch_upload_dataversion_metadata.assert_not_called()
self.patch_load.assert_not_called()
self.patch_format_link.assert_not_called()
self.patch_update_table.assert_called_once()
Expand Down Expand Up @@ -551,6 +643,14 @@ def test_process_all_files_upload_true(self, syn: Any):
staging_path=STAGING_PATH,
filename="data_manifest.csv",
)
self.patch_upload_dataversion_metadata.assert_called_once_with(
syn=syn,
file_id="syn123",
file_version=1,
team_images_id="syn987",
staging_path=STAGING_PATH,
destination="destination",
)
self.patch_load.assert_called_once_with(
file_path="path/to/csv",
provenance=["a", "b", "c"],
Expand Down Expand Up @@ -603,6 +703,7 @@ def test_process_all_files_upload_false_gx_failure(self, syn: Any):
)
self.patch_create_data_manifest.assert_not_called()
self.patch_df_to_csv.assert_not_called()
self.patch_upload_dataversion_metadata.assert_not_called()
self.patch_load.assert_not_called()
self.patch_format_link.assert_not_called()
self.patch_update_table.assert_called_once()
Expand Down Expand Up @@ -647,6 +748,7 @@ def test_process_all_files_upload_false_process_dataset_fail(self, syn: Any):
)
self.patch_create_data_manifest.assert_not_called()
self.patch_df_to_csv.assert_not_called()
self.patch_upload_dataversion_metadata.assert_not_called()
self.patch_load.assert_not_called()
self.patch_format_link.assert_not_called()
self.patch_update_table.assert_called_once()

0 comments on commit 5e592f8

Please sign in to comment.