diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index de6b99f7..a7fbd3ed 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -744,7 +744,7 @@ def biosample_submission_ingest(): "config": { "benthic_data_product": { "product_id": "DP1.20279.001", - "product_tables": "mms_benthicMetagenomeSequencing, mms_benthicMetagenomeDnaExtraction, mms_benthicRawDataFiles, amb_fieldParent", + "product_tables": "mms_benthicMetagenomeSequencing, mms_benthicMetagenomeDnaExtraction, mms_benthicRawDataFiles, amb_fieldParent, mms_mms_benthicRawDataFiles", } } }, @@ -771,7 +771,7 @@ def biosample_submission_ingest(): "config": { "benthic_data_product": { "product_id": "DP1.20279.001", - "product_tables": "mms_benthicMetagenomeSequencing, mms_benthicMetagenomeDnaExtraction, mms_benthicRawDataFiles, amb_fieldParent", + "product_tables": "mms_benthicMetagenomeSequencing, mms_benthicMetagenomeDnaExtraction, mms_benthicRawDataFiles, amb_fieldParent, mms_mms_benthicRawDataFiles", } } }, diff --git a/nmdc_runtime/site/translation/neon_benthic_translator.py b/nmdc_runtime/site/translation/neon_benthic_translator.py index efbd9e7e..06da6754 100644 --- a/nmdc_runtime/site/translation/neon_benthic_translator.py +++ b/nmdc_runtime/site/translation/neon_benthic_translator.py @@ -1,6 +1,6 @@ import re import sqlite3 -from typing import Union +from typing import Optional, Union import pandas as pd import requests_cache @@ -61,6 +61,7 @@ def __init__( "mms_benthicMetagenomeSequencing", "mms_benthicMetagenomeDnaExtraction", "amb_fieldParent", + "mms_benthicRawDataFiles", # <--- ensure this is present ) if all(k in benthic_data for k in neon_amb_data_tables): @@ -79,6 +80,12 @@ def __init__( benthic_data["amb_fieldParent"].to_sql( "amb_fieldParent", self.conn, if_exists="replace", index=False ) + benthic_data["mms_benthicRawDataFiles"].to_sql( + "mms_benthicRawDataFiles", + self.conn, + if_exists="replace", + index=False, + ) else: raise ValueError( f"You are missing one of the aquatic benthic microbiome tables: {neon_amb_data_tables}" @@ -88,14 +95,19 @@ def __init__( "neonEnvoTerms", self.conn, if_exists="replace", index=False ) - self.neon_raw_data_file_mappings_df = neon_raw_data_file_mappings_file - self.neon_raw_data_file_mappings_df.to_sql( - "neonRawDataFile", self.conn, if_exists="replace", index=False - ) + self.neon_raw_data_file_mappings_df = benthic_data["mms_benthicRawDataFiles"] self.site_code_mapping = site_code_mapping + self.neon_nmdc_instrument_map_df = neon_nmdc_instrument_map_df + def _translate_manifest(self, manifest_id: str) -> nmdc.Manifest: + return nmdc.Manifest( + id=manifest_id, + manifest_category=nmdc.ManifestCategoryEnum.poolable_replicates, + type="nmdc:Manifest", + ) + def _translate_biosample( self, neon_id: str, nmdc_id: str, biosample_row: pd.DataFrame ) -> nmdc.Biosample: @@ -313,7 +325,7 @@ def _translate_processed_sample( ) def _translate_data_object( - self, do_id: str, url: str, do_type: str, checksum: str + self, do_id: str, url: str, do_type: str, manifest_id: str ) -> nmdc.DataObject: """Create nmdc DataObject which is the output of a NucleotideSequencing process. This object mainly contains information about the sequencing file that was generated as @@ -324,7 +336,6 @@ def _translate_data_object( :param url: URL of zipped FASTQ file on NEON file server. Retrieved from file provided by Hugh Cross at NEON. :param do_type: Indicate whether it is FASTQ for Read 1 or Read 2 (paired end sequencing). - :param checksum: Checksum value for FASTQ in zip file, once again provided by Hugh Cross at NEON. :return: DataObject with all the sequencing file metadata. """ @@ -337,14 +348,14 @@ def _translate_data_object( url=url, description=f"sequencing results for {basename}", type="nmdc:DataObject", - md5_checksum=checksum, data_object_type=do_type, + in_manifest=manifest_id, ) - def get_database(self): + def get_database(self) -> nmdc.Database: database = nmdc.Database() - query = """ + join_query = """ SELECT merged.laboratoryName, merged.sequencingFacilityID, @@ -372,202 +383,190 @@ def get_database(self): afp.siteID, afp.sampleID, afp.collectDate - FROM - ( - SELECT - bs.collectDate, - bs.laboratoryName, - bs.sequencingFacilityID, - bs.processedDate, - bs.dnaSampleID, - bs.dnaSampleCode, - bs.internalLabID, - bs.instrument_model, - bs.sequencingMethod, - bs.investigation_type, - bs.qaqcStatus, - bs.ncbiProjectID, - bd.genomicsSampleID, - bd.sequenceAnalysisType, - bd.sampleMass, - bd.nucleicAcidConcentration - FROM - mms_benthicMetagenomeSequencing AS bs - JOIN - mms_benthicMetagenomeDnaExtraction AS bd - ON - bs.dnaSampleID = bd.dnaSampleID - ) AS merged + FROM ( + SELECT + bs.collectDate, + bs.laboratoryName, + bs.sequencingFacilityID, + bs.processedDate, + bs.dnaSampleID, + bs.dnaSampleCode, + bs.internalLabID, + bs.instrument_model, + bs.sequencingMethod, + bs.investigation_type, + bs.qaqcStatus, + bs.ncbiProjectID, + bd.genomicsSampleID, + bd.sequenceAnalysisType, + bd.sampleMass, + bd.nucleicAcidConcentration + FROM mms_benthicMetagenomeSequencing AS bs + JOIN mms_benthicMetagenomeDnaExtraction AS bd + ON bs.dnaSampleID = bd.dnaSampleID + ) AS merged LEFT JOIN amb_fieldParent AS afp - ON - merged.genomicsSampleID = afp.geneticSampleID + ON merged.genomicsSampleID = afp.geneticSampleID """ - benthic_samples = pd.read_sql_query(query, self.conn) + benthic_samples = pd.read_sql_query(join_query, self.conn) benthic_samples.to_sql( "benthicSamples", self.conn, if_exists="replace", index=False ) - neon_biosample_ids = benthic_samples["sampleID"] - nmdc_biosample_ids = self._id_minter("nmdc:Biosample", len(neon_biosample_ids)) - neon_to_nmdc_biosample_ids = dict(zip(neon_biosample_ids, nmdc_biosample_ids)) + sample_ids = benthic_samples["sampleID"] + nmdc_biosample_ids = self._id_minter("nmdc:Biosample", len(sample_ids)) + neon_to_nmdc_biosample_ids = dict(zip(sample_ids, nmdc_biosample_ids)) - neon_extraction_ids = benthic_samples["sampleID"] - nmdc_extraction_ids = self._id_minter( - "nmdc:Extraction", len(neon_extraction_ids) - ) - neon_to_nmdc_extraction_ids = dict( - zip(neon_extraction_ids, nmdc_extraction_ids) - ) + nmdc_extraction_ids = self._id_minter("nmdc:Extraction", len(sample_ids)) + neon_to_nmdc_extraction_ids = dict(zip(sample_ids, nmdc_extraction_ids)) - neon_extraction_processed_ids = benthic_samples["sampleID"] nmdc_extraction_processed_ids = self._id_minter( - "nmdc:ProcessedSample", len(neon_extraction_processed_ids) + "nmdc:ProcessedSample", len(sample_ids) ) neon_to_nmdc_extraction_processed_ids = dict( - zip(neon_extraction_processed_ids, nmdc_extraction_processed_ids) + zip(sample_ids, nmdc_extraction_processed_ids) ) - neon_lib_prep_ids = benthic_samples["sampleID"] - nmdc_lib_prep_ids = self._id_minter( - "nmdc:LibraryPreparation", len(neon_lib_prep_ids) - ) - neon_to_nmdc_lib_prep_ids = dict(zip(neon_lib_prep_ids, nmdc_lib_prep_ids)) + nmdc_libprep_ids = self._id_minter("nmdc:LibraryPreparation", len(sample_ids)) + neon_to_nmdc_libprep_ids = dict(zip(sample_ids, nmdc_libprep_ids)) - neon_lib_prep_processed_ids = benthic_samples["sampleID"] - nmdc_lib_prep_processed_ids = self._id_minter( - "nmdc:ProcessedSample", len(neon_lib_prep_processed_ids) + nmdc_libprep_processed_ids = self._id_minter( + "nmdc:ProcessedSample", len(sample_ids) ) - neon_to_nmdc_lib_prep_processed_ids = dict( - zip(neon_lib_prep_processed_ids, nmdc_lib_prep_processed_ids) + neon_to_nmdc_libprep_processed_ids = dict( + zip(sample_ids, nmdc_libprep_processed_ids) ) - neon_omprc_ids = benthic_samples["sampleID"] - nmdc_omprc_ids = self._id_minter( - "nmdc:NucleotideSequencing", len(neon_omprc_ids) - ) - neon_to_nmdc_omprc_ids = dict(zip(neon_omprc_ids, nmdc_omprc_ids)) + nmdc_ntseq_ids = self._id_minter("nmdc:NucleotideSequencing", len(sample_ids)) + neon_to_nmdc_ntseq_ids = dict(zip(sample_ids, nmdc_ntseq_ids)) - neon_raw_data_file_mappings_df = self.neon_raw_data_file_mappings_df - neon_raw_file_paths = neon_raw_data_file_mappings_df["rawDataFilePath"] - nmdc_data_object_ids = self._id_minter( - "nmdc:DataObject", len(neon_raw_file_paths) - ) - neon_to_nmdc_data_object_ids = dict( - zip(neon_raw_file_paths, nmdc_data_object_ids) - ) + raw_df = self.neon_raw_data_file_mappings_df + raw_file_paths = raw_df["rawDataFilePath"] + dataobject_ids = self._id_minter("nmdc:DataObject", len(raw_file_paths)) + neon_to_nmdc_dataobject_ids = dict(zip(raw_file_paths, dataobject_ids)) - for neon_id, nmdc_id in neon_to_nmdc_biosample_ids.items(): - biosample_row = benthic_samples[benthic_samples["sampleID"] == neon_id] + for neon_id, biosample_id in neon_to_nmdc_biosample_ids.items(): + row = benthic_samples[benthic_samples["sampleID"] == neon_id] + if row.empty: + continue + # Example of how you might call _translate_biosample: database.biosample_set.append( - self._translate_biosample(neon_id, nmdc_id, biosample_row) + self._translate_biosample(neon_id, biosample_id, row) ) - for neon_id, nmdc_id in neon_to_nmdc_extraction_ids.items(): - extraction_row = benthic_samples[benthic_samples["sampleID"] == neon_id] + for neon_id, extraction_id in neon_to_nmdc_extraction_ids.items(): + row = benthic_samples[benthic_samples["sampleID"] == neon_id] + if row.empty: + continue - extraction_input = neon_to_nmdc_biosample_ids.get(neon_id) - processed_sample_id = neon_to_nmdc_extraction_processed_ids.get(neon_id) + biosample_id = neon_to_nmdc_biosample_ids.get(neon_id) + extraction_ps_id = neon_to_nmdc_extraction_processed_ids.get(neon_id) - if extraction_input is not None and processed_sample_id is not None: + if biosample_id and extraction_ps_id: database.material_processing_set.append( self._translate_extraction_process( - nmdc_id, - extraction_input, - processed_sample_id, - extraction_row, + extraction_id, biosample_id, extraction_ps_id, row ) ) - - genomics_sample_id = _get_value_or_none( - extraction_row, "genomicsSampleID" - ) - + genomics_sample_id = _get_value_or_none(row, "genomicsSampleID") database.processed_sample_set.append( self._translate_processed_sample( - processed_sample_id, + extraction_ps_id, f"Extracted DNA from {genomics_sample_id}", ) ) - query = """ + query2 = """ SELECT dnaSampleID, GROUP_CONCAT(rawDataFilePath, '|') AS rawDataFilePaths - FROM neonRawDataFile + FROM mms_benthicRawDataFiles GROUP BY dnaSampleID """ - neon_raw_data_files = pd.read_sql_query(query, self.conn) - neon_raw_data_files_dict = ( - neon_raw_data_files.set_index("dnaSampleID")["rawDataFilePaths"] + raw_data_files_df = pd.read_sql_query(query2, self.conn) + dna_files_dict = ( + raw_data_files_df.set_index("dnaSampleID")["rawDataFilePaths"] .str.split("|") .to_dict() ) - filtered_neon_raw_data_files_dict = { - key: value - for key, value in neon_raw_data_files_dict.items() - if len(value) <= 2 - } - for neon_id, nmdc_id in neon_to_nmdc_lib_prep_ids.items(): - lib_prep_row = benthic_samples[benthic_samples["sampleID"] == neon_id] + dna_sample_to_manifest_id: dict[str, str] = {} - lib_prep_input = neon_to_nmdc_extraction_processed_ids.get(neon_id) - processed_sample_id = neon_to_nmdc_lib_prep_processed_ids.get(neon_id) + for neon_id, libprep_id in neon_to_nmdc_libprep_ids.items(): + row = benthic_samples[benthic_samples["sampleID"] == neon_id] + if row.empty: + continue - if lib_prep_input is not None and processed_sample_id is not None: - database.material_processing_set.append( - self._translate_library_preparation( - nmdc_id, - lib_prep_input, - processed_sample_id, - lib_prep_row, - ) + extr_ps_id = neon_to_nmdc_extraction_processed_ids.get(neon_id) + libprep_ps_id = neon_to_nmdc_libprep_processed_ids.get(neon_id) + if not extr_ps_id or not libprep_ps_id: + continue + + database.material_processing_set.append( + self._translate_library_preparation( + libprep_id, extr_ps_id, libprep_ps_id, row ) + ) - dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID") + dna_sample_id = _get_value_or_none(row, "dnaSampleID") + database.processed_sample_set.append( + self._translate_processed_sample( + libprep_ps_id, + f"Library preparation for {dna_sample_id}", + ) + ) - database.processed_sample_set.append( - self._translate_processed_sample( - processed_sample_id, - f"Library preparation for {dna_sample_id}", + filepaths_for_dna: list[str] = dna_files_dict.get(dna_sample_id, []) + if not filepaths_for_dna: + # no raw files => skip + ntseq_id = neon_to_nmdc_ntseq_ids.get(neon_id) + if ntseq_id: + continue + continue + + # If multiple => we create a Manifest + manifest_id: Optional[str] = None + if len(filepaths_for_dna) > 2: + if dna_sample_id not in dna_sample_to_manifest_id: + new_man_id = self._id_minter("nmdc:Manifest", 1)[0] + dna_sample_to_manifest_id[dna_sample_id] = new_man_id + database.manifest_set.append(self._translate_manifest(new_man_id)) + manifest_id = dna_sample_to_manifest_id[dna_sample_id] + + has_input_value = self.samp_procsm_dict.get(neon_id) + if not has_input_value: + continue + + dataobject_ids_for_run: list[str] = [] + for fp in filepaths_for_dna: + if fp not in neon_to_nmdc_dataobject_ids: + continue + do_id = neon_to_nmdc_dataobject_ids[fp] + + do_type = None + if "_R1.fastq.gz" in fp: + do_type = "Metagenome Raw Read 1" + elif "_R2.fastq.gz" in fp: + do_type = "Metagenome Raw Read 2" + + database.data_object_set.append( + self._translate_data_object( + do_id=do_id, + url=fp, + do_type=do_type, + manifest_id=manifest_id, ) ) - - has_output = None - has_output_do_ids = [] - - if dna_sample_id in filtered_neon_raw_data_files_dict: - has_output = filtered_neon_raw_data_files_dict[dna_sample_id] - for item in has_output: - if item in neon_to_nmdc_data_object_ids: - has_output_do_ids.append(neon_to_nmdc_data_object_ids[item]) - - checksum = None - do_type = None - - checksum = neon_raw_data_file_mappings_df[ - neon_raw_data_file_mappings_df["rawDataFilePath"] == item - ]["checkSum"].values[0] - if "_R1.fastq.gz" in item: - do_type = "Metagenome Raw Read 1" - elif "_R2.fastq.gz" in item: - do_type = "Metagenome Raw Read 2" - - database.data_object_set.append( - self._translate_data_object( - neon_to_nmdc_data_object_ids.get(item), - item, - do_type, - checksum, - ) - ) - - database.data_generation_set.append( - self._translate_nucleotide_sequencing( - neon_to_nmdc_omprc_ids.get(neon_id), - processed_sample_id, - has_output_do_ids, - lib_prep_row, - ) + dataobject_ids_for_run.append(do_id) + + ntseq_id = neon_to_nmdc_ntseq_ids.get(neon_id) + if ntseq_id: + database.data_generation_set.append( + self._translate_nucleotide_sequencing( + ntseq_id, + has_input_value, # <--- from self.samp_procsm_dict + dataobject_ids_for_run, + row, ) + ) return database diff --git a/tests/test_data/test_neon_benthic_data_translator.py b/tests/test_data/test_neon_benthic_data_translator.py index 530dfaab..1edfa7aa 100644 --- a/tests/test_data/test_neon_benthic_data_translator.py +++ b/tests/test_data/test_neon_benthic_data_translator.py @@ -127,6 +127,48 @@ } ] ), + "mms_benthicRawDataFiles": pd.DataFrame( + [ + { + "uid": "74cfedfb-b369-43f2-81e8-035dadaabd34", + "domainID": "D13", + "siteID": "WLOU", + "namedLocation": "WLOU.AOS.reach", + "laboratoryName": "Battelle Applied Genomics", + "sequencingFacilityID": "Battelle Memorial Institute", + "setDate": "2018-07-26T15:51Z", + "collectDate": "2018-07-26T15:51Z", + "sequencerRunID": "HWVWKBGX7", + "dnaSampleID": "WLOU.20180726.AMC.EPILITHON.1-DNA1", + "dnaSampleCode": "LV7005092900", + "internalLabID": "BMI_AquaticPlate6WellA5", + "rawDataFileName": "BMI_HWVWKBGX7_AquaticPlate6WellA5_R2.fastq.gz", + "rawDataFileDescription": "R2 metagenomic archive of fastq files", + "rawDataFilePath": "https://storage.neonscience.org/neon-microbial-raw-seq-files/2023/BMI_HWVWKBGX7_mms_R2/BMI_HWVWKBGX7_AquaticPlate6WellA5_R2.fastq.gz", + "remarks": "", + "dataQF": "", + }, + { + "uid": "6dfc7444-3878-4db1-85da-b4430f52a023", + "domainID": "D13", + "siteID": "WLOU", + "namedLocation": "WLOU.AOS.reach", + "laboratoryName": "Battelle Applied Genomics", + "sequencingFacilityID": "Battelle Memorial Institute", + "setDate": "2018-07-26T15:51Z", + "collectDate": "2018-07-26T15:51Z", + "sequencerRunID": "HWVWKBGX7", + "dnaSampleID": "WLOU.20180726.AMC.EPILITHON.1-DNA1", + "dnaSampleCode": "LV7005092900", + "internalLabID": "BMI_AquaticPlate6WellA5", + "rawDataFileName": "BMI_HWVWKBGX7_AquaticPlate6WellA5_R1.fastq.gz", + "rawDataFileDescription": "R1 metagenomic archive of fastq files", + "rawDataFilePath": "https://storage.neonscience.org/neon-microbial-raw-seq-files/2023/BMI_HWVWKBGX7_mms_R1/BMI_HWVWKBGX7_AquaticPlate6WellA5_R1.fastq.gz", + "remarks": "", + "dataQF": "", + }, + ] + ), } @@ -176,55 +218,87 @@ def translator(self, test_minter): ) def test_get_database(self, translator): + """Full end-to-end test for get_database() method in NeonBenthicDataTranslator. + This test checks that the objects created for the various classes connected in + the MaterialEntity/PlannedProcess bipartite graph represented in the schema have + the correct inputs and outputs (`has_input`, `has_output`) between them. + """ + translator.samp_procsm_dict = { + "WLOU.20180726.AMC.EPILITHON.1": "nmdc:procsm-11-x1y2z3" + } + database = translator.get_database() - # verify lengths of all collections in database assert len(database.biosample_set) == 1 assert len(database.material_processing_set) == 2 assert len(database.data_generation_set) == 1 assert len(database.processed_sample_set) == 2 + assert len(database.data_object_set) == 2 - # verify contents of biosample_set biosample_list = database.biosample_set - expected_biosample_names = [ - "WLOU.20180726.AMC.EPILITHON.1", + biosample = biosample_list[0] + assert biosample.name == "WLOU.20180726.AMC.EPILITHON.1" + + extraction_list = [ + proc + for proc in database.material_processing_set + if proc.type == "nmdc:Extraction" ] - for biosample in biosample_list: - actual_biosample_name = biosample["name"] - assert actual_biosample_name in expected_biosample_names - - # verify contents of data_generation_set - data_generation_list = database.data_generation_set - expected_nucleotide_sequencing = [ - "Benthic microbial communities - WLOU.20180726.AMC.EPILITHON.1-DNA1" + library_prep_list = [ + proc + for proc in database.material_processing_set + if proc.type == "nmdc:LibraryPreparation" ] - for data_generation in data_generation_list: - if data_generation["type"] == "nmdc:NucleotideSequencing": - actual_nucleotide_sequencing = data_generation["name"] - assert actual_nucleotide_sequencing in expected_nucleotide_sequencing - - extraction_list = [] - library_preparation_list = [] - nucleotide_sequencing_list = [] - for data_generation_obj in database.data_generation_set: - if data_generation_obj["type"] == "nmdc:Extraction": - extraction_list.append(data_generation_obj) - elif data_generation_obj["type"] == "nmdc:LibraryPreparation": - library_preparation_list.append(data_generation_obj) - elif data_generation_obj["type"] == "nmdc:NucleotideSequencing": - nucleotide_sequencing_list.append(data_generation_obj) - - biosample_id = [bsm["id"] for bsm in biosample_list] - for extraction in extraction_list: - extraction_input = extraction.has_input - extraction_output = extraction.has_output - assert extraction_input == biosample_id - - for lib_prep in library_preparation_list: - lib_prep_input = lib_prep.has_input - lib_prep_output = lib_prep.has_output - assert lib_prep_input == extraction_output - - for omics_processing in nucleotide_sequencing_list: - omics_processing_input = omics_processing.has_input - assert omics_processing_input == lib_prep_output + ntseq_list = [ + proc + for proc in database.data_generation_set + if proc.type == "nmdc:NucleotideSequencing" + ] + + assert len(extraction_list) == 1 + assert len(library_prep_list) == 1 + assert len(ntseq_list) == 1 + + extraction = extraction_list[0] + libprep = library_prep_list[0] + ntseq = ntseq_list[0] + + ext_input_list = extraction.has_input + ext_output_list = extraction.has_output + assert len(ext_input_list) == 1 + assert len(ext_output_list) == 1 + + biosample_ids = [b.id for b in database.biosample_set] + assert ext_input_list[0] in biosample_ids + + processed_sample_ids = [ps.id for ps in database.processed_sample_set] + assert ext_output_list[0] in processed_sample_ids + + lp_input_list = libprep.has_input + lp_output_list = libprep.has_output + assert len(lp_input_list) == 1 + assert len(lp_output_list) == 1 + + assert lp_input_list == ext_output_list + assert lp_output_list[0] in processed_sample_ids + + ntseq_input_list = ntseq.has_input + ntseq_output_list = ntseq.has_output + + assert len(ntseq_input_list) == 1 + assert ntseq_input_list[0] == "nmdc:procsm-11-x1y2z3" + + assert len(ntseq_output_list) == 2 + data_object_ids = [obj.id for obj in database.data_object_set] + for do_id in ntseq_output_list: + assert do_id in data_object_ids + + for do_id in ntseq_output_list: + matching_dobj = [x for x in database.data_object_set if x.id == do_id] + assert len(matching_dobj) == 1 + dobj = matching_dobj[0] + assert dobj.type == "nmdc:DataObject" + assert dobj.name in [ + "BMI_HWVWKBGX7_AquaticPlate6WellA5_R1.fastq.gz", + "BMI_HWVWKBGX7_AquaticPlate6WellA5_R2.fastq.gz", + ]