From 835313c90aa52bb8fe91198617873896b108e421 Mon Sep 17 00:00:00 2001 From: Sujay Patil Date: Wed, 12 Feb 2025 15:06:00 -0800 Subject: [PATCH 1/2] account for multiple sequencing runs in NEON surface water translator code --- nmdc_runtime/site/repository.py | 4 +- .../neon_surface_water_translator.py | 206 +++++++++++------- nmdc_runtime/site/workspace.yaml | 3 + 3 files changed, 133 insertions(+), 80 deletions(-) diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index 0a503459..b3300ded 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -829,7 +829,7 @@ def biosample_submission_ingest(): "config": { "surface_water_data_product": { "product_id": "DP1.20281.001", - "product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent", + "product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent, mms_swRawDataFiles", } } }, @@ -856,7 +856,7 @@ def biosample_submission_ingest(): "config": { "surface_water_data_product": { "product_id": "DP1.20281.001", - "product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent", + "product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent, mms_swRawDataFiles", } } }, diff --git a/nmdc_runtime/site/translation/neon_surface_water_translator.py b/nmdc_runtime/site/translation/neon_surface_water_translator.py index 2e05c6eb..67831667 100644 --- a/nmdc_runtime/site/translation/neon_surface_water_translator.py +++ b/nmdc_runtime/site/translation/neon_surface_water_translator.py @@ -71,6 +71,7 @@ def __init__( neon_amb_data_tables = ( "mms_swMetagenomeSequencing", "mms_swMetagenomeDnaExtraction", + "mms_swRawDataFiles", "amc_fieldGenetic", "amc_fieldSuperParent", ) @@ -88,6 +89,9 @@ def __init__( if_exists="replace", index=False, ) + surface_water_data["mms_swRawDataFiles"].to_sql( + "mms_swRawDataFiles", self.conn, if_exists="replace", index=False + ) surface_water_data["amc_fieldGenetic"].to_sql( "amc_fieldGenetic", self.conn, if_exists="replace", index=False ) @@ -103,10 +107,7 @@ def __init__( "neonEnvoTerms", self.conn, if_exists="replace", index=False ) - self.neon_raw_data_file_mappings_df = neon_raw_data_file_mappings_file - self.neon_raw_data_file_mappings_df.to_sql( - "neonRawDataFile", self.conn, if_exists="replace", index=False - ) + self.neon_raw_data_file_mappings_df = surface_water_data["mms_swRawDataFiles"] self.site_code_mapping = site_code_mapping @@ -371,7 +372,7 @@ def _translate_processed_sample( ) def _translate_data_object( - self, do_id: str, url: str, do_type: str, checksum: str + self, do_id: str, url: str, do_type: str, manifest_id: str ) -> nmdc.DataObject: """Create nmdc DataObject which is the output of a NucleotideSequencing process. This object mainly contains information about the sequencing file that was generated as @@ -395,8 +396,15 @@ def _translate_data_object( url=url, description=f"sequencing results for {basename}", type="nmdc:DataObject", - md5_checksum=checksum, data_object_type=do_type, + in_manifest=manifest_id, + ) + + def _translate_manifest(self, manifest_id: str) -> nmdc.Manifest: + return nmdc.Manifest( + id=manifest_id, + manifest_category=nmdc.ManifestCategoryEnum.poolable_replicates, + type="nmdc:Manifest", ) def get_database(self): @@ -477,6 +485,9 @@ def get_database(self): """ surface_water_samples = pd.read_sql_query(query, self.conn) + # -------------------------------------------------- + # Create mappings for minted NMDC IDs + # -------------------------------------------------- neon_biosample_ids = surface_water_samples["parentSampleID"] nmdc_biosample_ids = self._id_minter("nmdc:Biosample", len(neon_biosample_ids)) neon_to_nmdc_biosample_ids = dict(zip(neon_biosample_ids, nmdc_biosample_ids)) @@ -511,30 +522,20 @@ def get_database(self): zip(neon_lib_prep_processed_ids, nmdc_lib_prep_processed_ids) ) - neon_omprc_ids = surface_water_samples["parentSampleID"] - nmdc_omprc_ids = self._id_minter( - "nmdc:NucleotideSequencing", len(neon_omprc_ids) - ) - neon_to_nmdc_omprc_ids = dict(zip(neon_omprc_ids, nmdc_omprc_ids)) - - neon_raw_data_file_mappings_df = self.neon_raw_data_file_mappings_df - neon_raw_file_paths = neon_raw_data_file_mappings_df["rawDataFilePath"] - nmdc_data_object_ids = self._id_minter( - "nmdc:DataObject", len(neon_raw_file_paths) - ) - neon_to_nmdc_data_object_ids = dict( - zip(neon_raw_file_paths, nmdc_data_object_ids) - ) - + # -------------------------------------------------- + # STEP 1: Insert Biosamples + # -------------------------------------------------- for neon_id, nmdc_id in neon_to_nmdc_biosample_ids.items(): biosample_row = surface_water_samples[ surface_water_samples["parentSampleID"] == neon_id ] + # database.biosample_set.append( + # self._translate_biosample(neon_id, nmdc_id, biosample_row) + # ) - database.biosample_set.append( - self._translate_biosample(neon_id, nmdc_id, biosample_row) - ) - + # -------------------------------------------------- + # STEP 2: Insert Extraction Processes + # -------------------------------------------------- for neon_id, nmdc_id in neon_to_nmdc_extraction_ids.items(): extraction_row = surface_water_samples[ surface_water_samples["parentSampleID"] == neon_id @@ -557,6 +558,7 @@ def get_database(self): extraction_row, "genomicsSampleID" ) + # Each Extraction process output => ProcessedSample database.processed_sample_set.append( self._translate_processed_sample( processed_sample_id, @@ -564,23 +566,9 @@ def get_database(self): ) ) - query = """ - SELECT dnaSampleID, GROUP_CONCAT(rawDataFilePath, '|') AS rawDataFilePaths - FROM neonRawDataFile - GROUP BY dnaSampleID - """ - neon_raw_data_files = pd.read_sql_query(query, self.conn) - neon_raw_data_files_dict = ( - neon_raw_data_files.set_index("dnaSampleID")["rawDataFilePaths"] - .str.split("|") - .to_dict() - ) - filtered_neon_raw_data_files_dict = { - key: value - for key, value in neon_raw_data_files_dict.items() - if len(value) <= 2 - } - + # -------------------------------------------------- + # STEP 3: Insert LibraryPreparation Processes + # -------------------------------------------------- for neon_id, nmdc_id in neon_to_nmdc_lib_prep_ids.items(): lib_prep_row = surface_water_samples[ surface_water_samples["parentSampleID"] == neon_id @@ -601,6 +589,7 @@ def get_database(self): dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID") + # Each LibraryPreparation process output => ProcessedSample database.processed_sample_set.append( self._translate_processed_sample( processed_sample_id, @@ -608,42 +597,103 @@ def get_database(self): ) ) - has_output = None - has_output_do_ids = [] - - if dna_sample_id in filtered_neon_raw_data_files_dict: - has_output = filtered_neon_raw_data_files_dict[dna_sample_id] - for item in has_output: - if item in neon_to_nmdc_data_object_ids: - has_output_do_ids.append(neon_to_nmdc_data_object_ids[item]) - - checksum = None - do_type = None - - checksum = neon_raw_data_file_mappings_df[ - neon_raw_data_file_mappings_df["rawDataFilePath"] == item - ]["checkSum"].values[0] - if "_R1.fastq.gz" in item: - do_type = "Metagenome Raw Read 1" - elif "_R2.fastq.gz" in item: - do_type = "Metagenome Raw Read 2" - - database.data_object_set.append( - self._translate_data_object( - neon_to_nmdc_data_object_ids.get(item), - item, - do_type, - checksum, - ) - ) - - database.data_generation_set.append( - self._translate_nucleotide_sequencing( - neon_to_nmdc_omprc_ids.get(neon_id), - processed_sample_id, - has_output_do_ids, - lib_prep_row, - ) + # -------------------------------------------------- + # STEP 4: Group raw files by (dnaSampleID, sequencerRunID) + # and insert DataObjects + DataGeneration processes + # -------------------------------------------------- + raw_query = """ + SELECT dnaSampleID, sequencerRunID, rawDataFilePath + FROM mms_swRawDataFiles + """ + neon_raw_data_files_df = pd.read_sql_query(raw_query, self.conn) + + for neon_id, nmdc_libprep_id in neon_to_nmdc_lib_prep_ids.items(): + # 1) Pull out the row that corresponds to this parentSampleID + lib_prep_row = surface_water_samples[ + surface_water_samples["parentSampleID"] == neon_id + ] + + # 2) Grab the dnaSampleID from that row + dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID") + if not dna_sample_id: + # No dnaSampleID => skip + continue + + # 3) Find all raw files for that dnaSampleID + dna_files = neon_raw_data_files_df[ + neon_raw_data_files_df["dnaSampleID"] == dna_sample_id + ] + if dna_files.empty: + # No raw files => skip + continue + + # ----------------------------------------- + # LOOKUP DICT: get "has_input" for this neon_id + # ----------------------------------------- + has_input_value = self.samp_procsm_dict.get(neon_id) + # If some neon_id isn't in the dictionary, handle it as needed + if not has_input_value: + # Could skip, or raise an error, or set a default + continue + + # ------------------------------------------- + # 4) CREATE A MANIFEST IF MULTIPLE RAW FILES + # for this row's dnaSampleID + # ------------------------------------------- + manifest_id = None + if len(dna_files) > 2: + # For each row that references a dnaSampleID with multiple raw files, + # mint exactly one new manifest record + manifest_id = self._id_minter("nmdc:Manifest", 1)[0] + new_manifest = self._translate_manifest(manifest_id) + # Add to the database + database.manifest_set.append(new_manifest) + + # ------------------------------------------- + # 5) NOW GROUP FILES BY sequencerRunID + # => one data_generation record per run + # ------------------------------------------- + lib_prep_processed_sample_id = neon_to_nmdc_lib_prep_processed_ids.get( + neon_id + ) + if not lib_prep_processed_sample_id: + # If we don't have a ProcessedSample for some reason, skip + continue + + for run_id, group_df in dna_files.groupby("sequencerRunID"): + # a) Mint new data_generation (NucleotideSequencing) ID for this run + data_generation_id = self._id_minter("nmdc:NucleotideSequencing", 1)[0] + + # b) Create DataObjects for each raw file in this run + data_object_ids = [] + for raw_fp in group_df["rawDataFilePath"]: + do_id = self._id_minter("nmdc:DataObject", 1)[0] + + # Distinguish read type + do_type = None + if "_R1.fastq.gz" in raw_fp: + do_type = "Metagenome Raw Read 1" + elif "_R2.fastq.gz" in raw_fp: + do_type = "Metagenome Raw Read 2" + + # Create the DataObject + data_obj = self._translate_data_object( + do_id=do_id, + url=raw_fp, + do_type=do_type, + manifest_id=manifest_id, # link to the new Manifest if it exists + ) + database.data_object_set.append(data_obj) + data_object_ids.append(do_id) + + # c) Finally, create the data generation record for this run + database.data_generation_set.append( + self._translate_nucleotide_sequencing( + nucleotide_sequencing_id=data_generation_id, + processed_sample_id=has_input_value, + raw_data_file_data=data_object_ids, + nucleotide_sequencing_row=lib_prep_row, ) + ) return database diff --git a/nmdc_runtime/site/workspace.yaml b/nmdc_runtime/site/workspace.yaml index 7a96368a..1a161625 100644 --- a/nmdc_runtime/site/workspace.yaml +++ b/nmdc_runtime/site/workspace.yaml @@ -11,6 +11,9 @@ load_from: - python_package: package_name: nmdc_runtime.site.repository attribute: biosample_submission_ingest + - python_package: + package_name: nmdc_runtime.site.repository + attribute: biosample_export - python_package: package_name: nmdc_runtime.site.repository attribute: database_records_stitching From 1d17dbf3b6a123c9806553e1317973a7d7d17e0d Mon Sep 17 00:00:00 2001 From: Sujay Patil Date: Wed, 12 Feb 2025 15:25:18 -0800 Subject: [PATCH 2/2] update NEON instrument mapping file URL in @repository --- nmdc_runtime/site/repository.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index b3300ded..de6b99f7 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -652,7 +652,7 @@ def biosample_submission_ingest(): "inputs": { "neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv", "neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv", - "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", + "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", } }, }, @@ -694,7 +694,7 @@ def biosample_submission_ingest(): "inputs": { "neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv", "neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv", - "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", + "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", } }, }, @@ -737,7 +737,7 @@ def biosample_submission_ingest(): "inputs": { "neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv", "neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv", - "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", + "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", } }, "get_neon_pipeline_benthic_data_product": { @@ -779,7 +779,7 @@ def biosample_submission_ingest(): "inputs": { "neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv", "neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv", - "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", + "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", } }, }, @@ -822,7 +822,7 @@ def biosample_submission_ingest(): "inputs": { "neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv", "neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv", - "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", + "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", } }, "get_neon_pipeline_surface_water_data_product": { @@ -864,7 +864,7 @@ def biosample_submission_ingest(): "inputs": { "neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv", "neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv", - "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", + "neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv", } }, },