diff --git a/gen3_tracker/gen3/jobs.py b/gen3_tracker/gen3/jobs.py index c1d44d78..f544d4a6 100644 --- a/gen3_tracker/gen3/jobs.py +++ b/gen3_tracker/gen3/jobs.py @@ -9,6 +9,7 @@ from gen3.auth import Gen3Auth from gen3.jobs import Gen3Jobs +import pytz from gen3_tracker import Config from gen3_tracker.common import Push, Commit @@ -129,11 +130,13 @@ def publish_commits(config: Config, wait: bool, auth: Gen3Auth, bucket_name: str from cdislogging import get_logger # noqa cdis_logging = get_logger("__name__") cdis_logging.setLevel(logging.WARN) - + if wait: + # async_run_job_and_wait monkeypatched below _ = asyncio.run(jobs_client.async_run_job_and_wait(job_name='fhir_import_export', job_input=args, spinner=spinner)) else: _ = jobs_client.create_job('fhir_import_export', args) + if not isinstance(_, dict): _ = {'output': _} if isinstance(_['output'], str): @@ -183,7 +186,15 @@ async def async_run_job_and_wait(self, job_name, job_input, spinner=None, _ssl=N spinner.text = f"{status.get('name')} {status.get('status')}" if status.get("status") != "Completed": - raise Exception(f"Job status not complete: {status.get('status')}.") + # write failed output to log file before raising exception + response = await self.async_get_output(job_create_response.get("uid")) + with open("logs/publish.log", 'a') as f: + log_msg = {'timestamp': datetime.now(pytz.UTC).isoformat()} + log_msg.update(response) + f.write(json.dumps(log_msg, separators=(',', ':'))) + f.write('\n') + + raise Exception(f"Job status not complete: {status.get('status')}") response = await self.async_get_output(job_create_response.get("uid")) return response diff --git a/gen3_tracker/git/cli.py b/gen3_tracker/git/cli.py index bc655f8b..f3aed5cf 100644 --- a/gen3_tracker/git/cli.py +++ b/gen3_tracker/git/cli.py @@ -452,7 +452,7 @@ def push(ctx, step: str, transfer_method: str, overwrite: bool, re_run: bool, wa headers = {"Authorization": f"{auth._access_token}"} result = requests.delete(url=f'{auth.endpoint}/Bundle', data=orjson.dumps(bundle_data, default=_default_json_serializer, option=orjson.OPT_APPEND_NEWLINE).decode(), headers=headers) - + with open("logs/publish.log", 'a') as f: log_msg = {'timestamp': datetime.now(pytz.UTC).isoformat(), "result": f"{result}"} click.secho('Published project. See logs/publish.log', fg=SUCCESS_COLOR, file=sys.stderr) @@ -489,20 +489,28 @@ def push(ctx, step: str, transfer_method: str, overwrite: bool, re_run: bool, wa return if step in ['publish', 'all'] and not fhir_server: + log_path = "logs/publish.log" + with Halo(text='Uploading snapshot', spinner='line', placement='right', color='white'): # push the snapshot of the `.git` sub-directory in the current directory push_snapshot(config, auth=auth) if transfer_method == 'gen3': - with Halo(text='Publishing', spinner='line', placement='right', color='white') as spinner: + try: # legacy, "old" fhir_import_export use publish_commits to publish the META - _ = publish_commits(config, wait=wait, auth=auth, bucket_name=bucket_name, spinner=spinner) - click.secho('Published project. See logs/publish.log', fg=SUCCESS_COLOR, file=sys.stderr) - with open("logs/publish.log", 'a') as f: + with Halo(text='Publishing', spinner='line', placement='right', color='white') as spinner: + _ = publish_commits(config, wait=wait, auth=auth, bucket_name=bucket_name, spinner=spinner) + except Exception as e: + click.secho(f'Unable to publish project. See {log_path} for more info', fg=ERROR_COLOR, file=sys.stderr) + raise e + + # print success message and save logs + with open(log_path, 'a') as f: log_msg = {'timestamp': datetime.now(pytz.UTC).isoformat()} log_msg.update(_) f.write(json.dumps(log_msg, separators=(',', ':'))) f.write('\n') + click.secho(f'Published project. Logs found at {log_path}', fg=SUCCESS_COLOR, file=sys.stderr) else: click.secho(f'Auto-publishing not supported for {transfer_method}. Please use --step publish after uploading', fg=ERROR_COLOR, file=sys.stderr) diff --git a/gen3_tracker/meta/cli.py b/gen3_tracker/meta/cli.py index d7659fa5..3dd37bfd 100644 --- a/gen3_tracker/meta/cli.py +++ b/gen3_tracker/meta/cli.py @@ -101,14 +101,16 @@ def render_graph(config: Config, directory_path: str, output_path: str, browser: @meta.command("dataframe") +@click.argument('data_type', + required=True, + type=click.Choice(['Specimen', 'DocumentReference', 'ResearchSubject']), + default=None) @click.argument("directory_path", type=click.Path(exists=True, file_okay=False), default="./META", required=False) @click.argument("output_path", - type=click.Path(file_okay=True), - default="meta.csv", required=False) + type=click.Path(file_okay=True), required=False) @click.option('--dtale', 'launch_dtale', default=False, show_default=True, is_flag=True, help='Open the graph in a browser using the dtale package for interactive data exploration.') -@click.option('--data_type', required=True, type=click.Choice(['Specimen', 'DocumentReference', 'ResearchSubject']), default=None, show_default=True, help='Create a data frame for a specific data type.') @click.option('--debug', is_flag=True) @click.pass_obj def render_df(config: Config, directory_path: str, output_path: str, launch_dtale: bool, data_type: str, debug: bool): @@ -127,9 +129,13 @@ def render_df(config: Config, directory_path: str, output_path: str, launch_dtal dtale.show(df, subprocess=False, open_browser=True, port=40000) else: # export to csv - df.to_csv(output_path, index=False) - click.secho(f"Saved {output_path}", fg=INFO_COLOR, file=sys.stderr) + file_name = output_path if output_path else f"{data_type}.csv" + df.to_csv(file_name, index=False) + click.secho(f"Saved {file_name}", fg=INFO_COLOR, file=sys.stderr) except Exception as e: click.secho(str(e), fg=ERROR_COLOR, file=sys.stderr) if config.debug or debug: raise + + +meta.add_command(render_df, name='df') diff --git a/gen3_tracker/meta/dataframer.py b/gen3_tracker/meta/dataframer.py index b2c8a23d..c7d1cd3f 100644 --- a/gen3_tracker/meta/dataframer.py +++ b/gen3_tracker/meta/dataframer.py @@ -405,13 +405,11 @@ def handle_units(self, value_normalized: str): string and float data in the same column and gives errors because it is expecting only one data type per column""" - print("UNITS: ", value_normalized) if value_normalized is not None: value_normalized_split = value_normalized.split(" ") if isinstance(value_normalized_split, list): value_numeric = value_normalized_split[0] if is_number(value_numeric): - # print("VALUE NUMERIC: ", float(value_numeric)) value_normalized = float(value_numeric) return value_normalized return None @@ -699,20 +697,19 @@ def get_resources_by_reference( # determine which how to process the field if reference_field == "focus": - # error if multiple focuses - if reference_field in resource: - assert ( - len(resource["focus"]) <= 1 - ), "unable to support more than 1 focus for a single observation" - nested_keys = ["focus", 0] - elif reference_field == "subject": - nested_keys = ["subject"] + # add the resource (eg observation) for each focus reference to the dict + for i in range(len(resource["focus"])): + reference_key = get_nested_value(resource, [reference_field, i, "reference"]) + if reference_key is not None and reference_type in reference_key: + reference_id = reference_key.split("/")[-1] + resource_by_reference_id[reference_id].append(resource) - # add observation to dict if a reference resource exists - reference_key = get_nested_value(resource, [*nested_keys, "reference"]) - if reference_key is not None and reference_type in reference_key: - reference_id = reference_key.split("/")[-1] - resource_by_reference_id[reference_id].append(resource) + elif reference_field == "subject": + # add the resource (eg observation) to the dict + reference_key = get_nested_value(resource, [reference_field, "reference"]) + if reference_key is not None and reference_type in reference_key: + reference_id = reference_key.split("/")[-1] + resource_by_reference_id[reference_id].append(resource) return resource_by_reference_id diff --git a/gen3_tracker/meta/entities.py b/gen3_tracker/meta/entities.py index 9c9ce4d5..888cf903 100644 --- a/gen3_tracker/meta/entities.py +++ b/gen3_tracker/meta/entities.py @@ -3,6 +3,7 @@ from pydantic import BaseModel, computed_field from typing import Dict, List, Optional, Tuple + ####################### # FHIR HELPER METHODS # ####################### @@ -185,7 +186,7 @@ class SimplifiedFHIR(BaseModel): @computed_field() @property def simplified(self) -> dict: - _ = {"identifier": self.identifier.get("value", None)} + _ = self.identifiers.copy() if self.identifiers else {} _.update(self.scalars) _.update(self.codings) _.update(self.extensions) @@ -245,6 +246,7 @@ def scalars(self) -> dict: if (not isinstance(v, list) and not isinstance(v, dict)) } + @computed_field @property def codings(self) -> dict: @@ -259,25 +261,31 @@ def codings(self) -> dict: if isinstance(elem, dict): # TODO: implement hierarchy of codes rather than just taking last code? for value, source in normalize_coding(elem): - _codings[k] = value + if len(v) > 1 and get_nested_value(elem, [source, 0, 'system']): + _codings[elem[source][0]["system"].split("/")[-1]] = value + else: + _codings[k] = value elif isinstance(v, dict): for value, elem in normalize_coding(v): _codings[k] = value + return _codings + @computed_field @property - def identifier(self) -> dict: - """Return the official identifier, or first of a resource.""" + def identifiers(self) -> dict: + """Return the first of a resource and any other resources""" identifiers = self.resource.get("identifier", []) - official_identifiers = [ - _ for _ in identifiers if _.get("use", "") == "official" - ] - if not official_identifiers and identifiers: - return identifiers[0] - elif official_identifiers: - return official_identifiers[0] + identifiers_len = len(identifiers) + + if not identifiers_len: + return {"identifier": None} + elif identifiers_len == 1: + return {"identifier": identifiers[0].get('value')} else: - return {} + base_identifier = {"identifier": identifiers[0].get('value')} + base_identifier.update({identifier.get("system").split("/")[-1]: identifier.get("value") for identifier in identifiers[1:]}) + return base_identifier @computed_field @property @@ -364,6 +372,9 @@ def values(self) -> dict: if not value: continue _values[source] = value + if "code" in self.resource and "text" in self.resource["code"]: + _values["observation_code"] = self.resource["code"]["text"] + assert len(_values) > 0, f"no values found in Observation: {self.resource}" diff --git a/tests/fixtures/negative-examples/fhir-gdc-DocumentReference-invalid-date.ndjson b/tests/fixtures/negative-examples/fhir-gdc-DocumentReference-invalid-date.ndjson new file mode 100644 index 00000000..1129ab88 --- /dev/null +++ b/tests/fixtures/negative-examples/fhir-gdc-DocumentReference-invalid-date.ndjson @@ -0,0 +1 @@ +{"resourceType":"DocumentReference","id":"6b024bdd-810f-5af0-8f36-9effc5da9b02","identifier":[{"system":"https://gdc.cancer.gov/file_id","value":"adb14cb5-6c6a-4c9a-9611-4f560e696e21"}],"version":"2","basedOn":[{"reference":"Specimen/91749e5f-aff9-5da6-a396-e53da6572bb7"},{"reference":"Specimen/ef84de6b-474e-5f96-a22f-2f105e4006ce"}],"status":"current","type":{"coding":[{"system":"https://gdc.cancer.gov/data_type","code":"VCF","display":"VCF"}]},"category":[{"coding":[{"system":"https://gdc.cancer.gov/data_category","code":"Simple Nucleotide Variation","display":"Simple Nucleotide Variation"}]},{"coding":[{"system":"https://gdc.cancer.gov/experimental_strategy","code":"WXS","display":"WXS"}]}],"subject":{"reference":"Patient/ccf801ae-7d28-5446-9c98-d79185ce2eb1"},"date":"2022-02-03T20:41:18.844488-06:00","content":[{"attachment":{"contentType":"Annotated Somatic Mutation","url":"https://api.gdc.cancer.gov/data/adb14cb5-6c6a-4c9a-9611-4f560e696e21","size":192763,"hash":"e389d0afe4c523b366853d32d89455ad","title":"TCGA_LUAD.c78eaf74-7747-40c3-be4d-0065e4e4d0f2.wxs.VarScan2.somatic_annotation.vcf.gz", "creation": "2023-10-05T12:07:50"},"profile":[{"valueCoding":{"system":"https://gdc.cancer.gov/data_format","code":"VCF","display":"VCF"}}]}]} \ No newline at end of file diff --git a/tests/integration/test_end_to_end_workflow.py b/tests/integration/test_end_to_end_workflow.py index 2f868b9f..25c843b6 100644 --- a/tests/integration/test_end_to_end_workflow.py +++ b/tests/integration/test_end_to_end_workflow.py @@ -1,21 +1,21 @@ import os -import pathlib -import pytest +import shutil +import pandas as pd import yaml from click.testing import CliRunner from gen3_tracker.config import ensure_auth, default from gen3_tracker.git import DVC, run_command +from pathlib import Path from tests.integration import validate_document_in_elastic, validate_document_in_grip from tests import run -# @pytest.mark.skip(reason="dataframer is not currently operational for adding single file use case") def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: """Test the init command.""" # change to the temporary directory assert tmpdir.chdir() - print(pathlib.Path.cwd()) + print(Path.cwd()) assert os.environ.get("G3T_PROFILE"), "G3T_PROFILE environment variable must be set." @@ -28,7 +28,7 @@ def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: run(runner, ["--debug", "ping"], expected_output=["bucket_programs", "your_access", "endpoint", "username"]) # create a test file - test_file = pathlib.Path("my-project-data/hello.txt") + test_file = Path("my-project-data/hello.txt") test_file.parent.mkdir(parents=True, exist_ok=True) test_file.write_text('hello\n') @@ -36,7 +36,7 @@ def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: run(runner, ["--debug", "add", str(test_file)], expected_files=["MANIFEST/my-project-data/hello.txt.dvc"]) # should create a dvc file - dvc_path = pathlib.Path("MANIFEST/my-project-data/hello.txt.dvc") + dvc_path = Path("MANIFEST/my-project-data/hello.txt.dvc") assert dvc_path.exists(), f"{dvc_path} does not exist." with open(dvc_path) as f: yaml_data = yaml.safe_load(f) @@ -61,7 +61,7 @@ def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: run(runner, ["--debug", "meta", "graph"], expected_files=["meta.html"]) # create a dataframe - run(runner, ["--debug", "meta", "dataframe", '--data_type', 'DocumentReference'], expected_files=["meta.csv"]) + run(runner, ["--debug", "meta", "dataframe", 'DocumentReference'], expected_files=["DocumentReference.csv"]) # push to the server run(runner, ["--debug", "push"]) @@ -75,7 +75,7 @@ def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: validate_document_in_elastic(object_id, auth=auth) # clone the project in new directory - clone_dir = pathlib.Path("clone") + clone_dir = Path("clone") os.mkdir(clone_dir) os.chdir("clone") run(runner, ["--debug", "clone", project_id]) @@ -93,7 +93,7 @@ def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: # check the files exist in the cloned directory run_command("ls -l") - assert pathlib.Path("my-project-data/hello.txt").exists(), "hello.txt does not exist in the cloned directory." + assert Path("my-project-data/hello.txt").exists(), "hello.txt does not exist in the cloned directory." # remove the project from the server. # TODO note, this does not remove the files from the bucket (UChicago bug) @@ -115,12 +115,11 @@ def test_simple_workflow(runner: CliRunner, project_id, tmpdir) -> None: run(runner, ["--debug", "collaborator", "add", "foo2@bar.com", f"/programs/{program}/projects/{project}", "--write", "--approve"]) -@pytest.mark.skip(reason="dataframer is not currently operational for adding single file use case") def test_simple_fhir_server_workflow(runner: CliRunner, project_id, tmpdir) -> None: """Test the init command.""" # change to the temporary directory assert tmpdir.chdir() - print(pathlib.Path.cwd()) + print(Path.cwd()) assert os.environ.get("G3T_PROFILE"), "G3T_PROFILE environment variable must be set." @@ -130,7 +129,7 @@ def test_simple_fhir_server_workflow(runner: CliRunner, project_id, tmpdir) -> N expected_files=[".g3t", ".git"]) # create a test file - test_file = pathlib.Path("my-project-data/hello.txt") + test_file = Path("my-project-data/hello.txt") test_file.parent.mkdir(parents=True, exist_ok=True) test_file.write_text('hello\n') @@ -138,7 +137,7 @@ def test_simple_fhir_server_workflow(runner: CliRunner, project_id, tmpdir) -> N run(runner, ["--debug", "add", str(test_file)], expected_files=["MANIFEST/my-project-data/hello.txt.dvc"]) # should create a dvc file - dvc_path = pathlib.Path("MANIFEST/my-project-data/hello.txt.dvc") + dvc_path = Path("MANIFEST/my-project-data/hello.txt.dvc") assert dvc_path.exists(), f"{dvc_path} does not exist." with open(dvc_path) as f: yaml_data = yaml.safe_load(f) @@ -176,3 +175,82 @@ def test_simple_fhir_server_workflow(runner: CliRunner, project_id, tmpdir) -> N # TODO note, this does not remove the files from the bucket (UChicago bug) # See https://ohsucomputationalbio.slack.com/archives/C043HPV0VMY/p1714065633867229 run(runner, ["--debug", "projects", "empty", "--project_id", project_id, "--confirm", "empty"]) + + +def test_push_fails_with_invalid_doc_ref_creation_date(runner: CliRunner, project_id: str, tmp_path: Path): + + # check + assert os.environ.get("G3T_PROFILE"), "G3T_PROFILE environment variable must be set." + + # copy fixture to temp test dir + project_dir = "fhir-gdc-examples" + fixtures_path = Path(os.path.dirname(__file__)).parent / "fixtures" + fhir_gdc_dir = fixtures_path / project_dir + modified_doc_ref_path = fixtures_path / "negative-examples/fhir-gdc-DocumentReference-invalid-date.ndjson" + + # init project + new_project_dir = tmp_path / project_dir + shutil.copytree(fhir_gdc_dir, new_project_dir) + shutil.copy(modified_doc_ref_path, new_project_dir / "META" / "DocumentReference.ndjson" ) + + # get invalid date from fixture + doc_ref_content = pd.read_json(modified_doc_ref_path, lines=True)["content"][0] + invalid_date = doc_ref_content[0]["attachment"]["creation"] + + # ensure that push fails and writes to logs + log_file_path = "logs/publish.log" + os.chdir(new_project_dir) + run(runner, ["init", project_id, "--approve"]) + result = run(runner, + ["push", "--skip_validate", "--overwrite"], + expected_exit_code=0, + expected_files=[log_file_path] + ) + + # ensure push has useful useful error logs + assert log_file_path in result.output, f"expected log file path in stdout, instead got:\n{result.output}" + + # ensure saved log file contains info about invalid date + with open(log_file_path, "r") as log_file: + lines = log_file.readlines() + str_lines = str(lines) + + assert "/content/0/attachment/creation" in str_lines, f"expected errors to describe to /content/0/attachment/creation, instead got: \n{str_lines}" + assert "jsonschema" in str_lines, f"expected errors to mention jsonschema, instead got: \n{str_lines}" + assert invalid_date in str_lines, f"expected invalid date {invalid_date} to be logged, instead got: \n{str_lines} " + + +def test_push_fails_with_no_write_permissions(runner: CliRunner, project_id: str, tmp_path: Path): + + # setup + assert os.environ.get("G3T_PROFILE"), "G3T_PROFILE environment variable must be set." + os.chdir(tmp_path) + + # initialize project without approving permissions + log_file_path = "logs/publish.log" + run(runner, [ "init", project_id], + expected_files=[".g3t", ".git"]) + + # create test file + test_file = Path("my-project-data/hello.txt") + test_file.parent.mkdir(parents=True, exist_ok=True) + test_file.write_text('hello\n') + + # prepare test file for submission + run(runner, ["add", str(test_file)], expected_files=["MANIFEST/my-project-data/hello.txt.dvc"]) + run(runner, ["meta", "init"], expected_files=["META/DocumentReference.ndjson"]) + print("current directory:",os.getcwd()) + run(runner, ["commit", "-m", "initial commit"]) + + # push + result = run(runner, ["push"], expected_exit_code=1, expected_files=[log_file_path]) + + # ensure stdout mentions log files + assert log_file_path in result.output, f"expected log file path in stdout, instead got:\n{result.output}" + + # check valid error messages within + with open(log_file_path, "r") as log_file: + # grab last line + line = [l for l in log_file.readlines()][-1] + for output in ["401", "permission"]: + assert "401" in line, f"expected {log_file_path} to contain {output}, instead got: \n{line}" diff --git a/tests/unit/dataframer/test_dataframer.py b/tests/unit/dataframer/test_dataframer.py index 1779bd6d..87a51f22 100644 --- a/tests/unit/dataframer/test_dataframer.py +++ b/tests/unit/dataframer/test_dataframer.py @@ -176,7 +176,7 @@ def simplified_resources( "identifier": "LabA_ORGANIZATION", "resourceType": "Organization", "id": "89c8dc4c-2d9c-48c7-8862-241a49a78f14", - "type": "Educational Institute", + "organization-type": "Educational Institute", }, patient_key: { "identifier": "patientX_1234",