Skip to content

Commit

Permalink
Merge pull request #898 from microbiomedata/issue-1067-multiple-seq-r…
Browse files Browse the repository at this point in the history
…uns-neon-sw

Account for multiple sequencing runs in NEON surface water translator code
  • Loading branch information
sujaypatil96 authored Feb 13, 2025
2 parents 7cce886 + 1d17dbf commit 65b0607
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 86 deletions.
16 changes: 8 additions & 8 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down Expand Up @@ -694,7 +694,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down Expand Up @@ -737,7 +737,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
"get_neon_pipeline_benthic_data_product": {
Expand Down Expand Up @@ -779,7 +779,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down Expand Up @@ -822,14 +822,14 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
"get_neon_pipeline_surface_water_data_product": {
"config": {
"surface_water_data_product": {
"product_id": "DP1.20281.001",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent, mms_swRawDataFiles",
}
}
},
Expand All @@ -856,15 +856,15 @@ def biosample_submission_ingest():
"config": {
"surface_water_data_product": {
"product_id": "DP1.20281.001",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent, mms_swRawDataFiles",
}
}
},
"get_neon_pipeline_inputs": {
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down
206 changes: 128 additions & 78 deletions nmdc_runtime/site/translation/neon_surface_water_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
neon_amb_data_tables = (
"mms_swMetagenomeSequencing",
"mms_swMetagenomeDnaExtraction",
"mms_swRawDataFiles",
"amc_fieldGenetic",
"amc_fieldSuperParent",
)
Expand All @@ -88,6 +89,9 @@ def __init__(
if_exists="replace",
index=False,
)
surface_water_data["mms_swRawDataFiles"].to_sql(
"mms_swRawDataFiles", self.conn, if_exists="replace", index=False
)
surface_water_data["amc_fieldGenetic"].to_sql(
"amc_fieldGenetic", self.conn, if_exists="replace", index=False
)
Expand All @@ -103,10 +107,7 @@ def __init__(
"neonEnvoTerms", self.conn, if_exists="replace", index=False
)

self.neon_raw_data_file_mappings_df = neon_raw_data_file_mappings_file
self.neon_raw_data_file_mappings_df.to_sql(
"neonRawDataFile", self.conn, if_exists="replace", index=False
)
self.neon_raw_data_file_mappings_df = surface_water_data["mms_swRawDataFiles"]

self.site_code_mapping = site_code_mapping

Expand Down Expand Up @@ -371,7 +372,7 @@ def _translate_processed_sample(
)

def _translate_data_object(
self, do_id: str, url: str, do_type: str, checksum: str
self, do_id: str, url: str, do_type: str, manifest_id: str
) -> nmdc.DataObject:
"""Create nmdc DataObject which is the output of a NucleotideSequencing process. This
object mainly contains information about the sequencing file that was generated as
Expand All @@ -395,8 +396,15 @@ def _translate_data_object(
url=url,
description=f"sequencing results for {basename}",
type="nmdc:DataObject",
md5_checksum=checksum,
data_object_type=do_type,
in_manifest=manifest_id,
)

def _translate_manifest(self, manifest_id: str) -> nmdc.Manifest:
return nmdc.Manifest(
id=manifest_id,
manifest_category=nmdc.ManifestCategoryEnum.poolable_replicates,
type="nmdc:Manifest",
)

def get_database(self):
Expand Down Expand Up @@ -477,6 +485,9 @@ def get_database(self):
"""
surface_water_samples = pd.read_sql_query(query, self.conn)

# --------------------------------------------------
# Create mappings for minted NMDC IDs
# --------------------------------------------------
neon_biosample_ids = surface_water_samples["parentSampleID"]
nmdc_biosample_ids = self._id_minter("nmdc:Biosample", len(neon_biosample_ids))
neon_to_nmdc_biosample_ids = dict(zip(neon_biosample_ids, nmdc_biosample_ids))
Expand Down Expand Up @@ -511,30 +522,20 @@ def get_database(self):
zip(neon_lib_prep_processed_ids, nmdc_lib_prep_processed_ids)
)

neon_omprc_ids = surface_water_samples["parentSampleID"]
nmdc_omprc_ids = self._id_minter(
"nmdc:NucleotideSequencing", len(neon_omprc_ids)
)
neon_to_nmdc_omprc_ids = dict(zip(neon_omprc_ids, nmdc_omprc_ids))

neon_raw_data_file_mappings_df = self.neon_raw_data_file_mappings_df
neon_raw_file_paths = neon_raw_data_file_mappings_df["rawDataFilePath"]
nmdc_data_object_ids = self._id_minter(
"nmdc:DataObject", len(neon_raw_file_paths)
)
neon_to_nmdc_data_object_ids = dict(
zip(neon_raw_file_paths, nmdc_data_object_ids)
)

# --------------------------------------------------
# STEP 1: Insert Biosamples
# --------------------------------------------------
for neon_id, nmdc_id in neon_to_nmdc_biosample_ids.items():
biosample_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
]
# database.biosample_set.append(
# self._translate_biosample(neon_id, nmdc_id, biosample_row)
# )

database.biosample_set.append(
self._translate_biosample(neon_id, nmdc_id, biosample_row)
)

# --------------------------------------------------
# STEP 2: Insert Extraction Processes
# --------------------------------------------------
for neon_id, nmdc_id in neon_to_nmdc_extraction_ids.items():
extraction_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
Expand All @@ -557,30 +558,17 @@ def get_database(self):
extraction_row, "genomicsSampleID"
)

# Each Extraction process output => ProcessedSample
database.processed_sample_set.append(
self._translate_processed_sample(
processed_sample_id,
f"Extracted DNA from {genomics_sample_id}",
)
)

query = """
SELECT dnaSampleID, GROUP_CONCAT(rawDataFilePath, '|') AS rawDataFilePaths
FROM neonRawDataFile
GROUP BY dnaSampleID
"""
neon_raw_data_files = pd.read_sql_query(query, self.conn)
neon_raw_data_files_dict = (
neon_raw_data_files.set_index("dnaSampleID")["rawDataFilePaths"]
.str.split("|")
.to_dict()
)
filtered_neon_raw_data_files_dict = {
key: value
for key, value in neon_raw_data_files_dict.items()
if len(value) <= 2
}

# --------------------------------------------------
# STEP 3: Insert LibraryPreparation Processes
# --------------------------------------------------
for neon_id, nmdc_id in neon_to_nmdc_lib_prep_ids.items():
lib_prep_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
Expand All @@ -601,49 +589,111 @@ def get_database(self):

dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID")

# Each LibraryPreparation process output => ProcessedSample
database.processed_sample_set.append(
self._translate_processed_sample(
processed_sample_id,
f"Library preparation for {dna_sample_id}",
)
)

has_output = None
has_output_do_ids = []

if dna_sample_id in filtered_neon_raw_data_files_dict:
has_output = filtered_neon_raw_data_files_dict[dna_sample_id]
for item in has_output:
if item in neon_to_nmdc_data_object_ids:
has_output_do_ids.append(neon_to_nmdc_data_object_ids[item])

checksum = None
do_type = None

checksum = neon_raw_data_file_mappings_df[
neon_raw_data_file_mappings_df["rawDataFilePath"] == item
]["checkSum"].values[0]
if "_R1.fastq.gz" in item:
do_type = "Metagenome Raw Read 1"
elif "_R2.fastq.gz" in item:
do_type = "Metagenome Raw Read 2"

database.data_object_set.append(
self._translate_data_object(
neon_to_nmdc_data_object_ids.get(item),
item,
do_type,
checksum,
)
)

database.data_generation_set.append(
self._translate_nucleotide_sequencing(
neon_to_nmdc_omprc_ids.get(neon_id),
processed_sample_id,
has_output_do_ids,
lib_prep_row,
)
# --------------------------------------------------
# STEP 4: Group raw files by (dnaSampleID, sequencerRunID)
# and insert DataObjects + DataGeneration processes
# --------------------------------------------------
raw_query = """
SELECT dnaSampleID, sequencerRunID, rawDataFilePath
FROM mms_swRawDataFiles
"""
neon_raw_data_files_df = pd.read_sql_query(raw_query, self.conn)

for neon_id, nmdc_libprep_id in neon_to_nmdc_lib_prep_ids.items():
# 1) Pull out the row that corresponds to this parentSampleID
lib_prep_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
]

# 2) Grab the dnaSampleID from that row
dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID")
if not dna_sample_id:
# No dnaSampleID => skip
continue

# 3) Find all raw files for that dnaSampleID
dna_files = neon_raw_data_files_df[
neon_raw_data_files_df["dnaSampleID"] == dna_sample_id
]
if dna_files.empty:
# No raw files => skip
continue

# -----------------------------------------
# LOOKUP DICT: get "has_input" for this neon_id
# -----------------------------------------
has_input_value = self.samp_procsm_dict.get(neon_id)
# If some neon_id isn't in the dictionary, handle it as needed
if not has_input_value:
# Could skip, or raise an error, or set a default
continue

# -------------------------------------------
# 4) CREATE A MANIFEST IF MULTIPLE RAW FILES
# for this row's dnaSampleID
# -------------------------------------------
manifest_id = None
if len(dna_files) > 2:
# For each row that references a dnaSampleID with multiple raw files,
# mint exactly one new manifest record
manifest_id = self._id_minter("nmdc:Manifest", 1)[0]
new_manifest = self._translate_manifest(manifest_id)
# Add to the database
database.manifest_set.append(new_manifest)

# -------------------------------------------
# 5) NOW GROUP FILES BY sequencerRunID
# => one data_generation record per run
# -------------------------------------------
lib_prep_processed_sample_id = neon_to_nmdc_lib_prep_processed_ids.get(
neon_id
)
if not lib_prep_processed_sample_id:
# If we don't have a ProcessedSample for some reason, skip
continue

for run_id, group_df in dna_files.groupby("sequencerRunID"):
# a) Mint new data_generation (NucleotideSequencing) ID for this run
data_generation_id = self._id_minter("nmdc:NucleotideSequencing", 1)[0]

# b) Create DataObjects for each raw file in this run
data_object_ids = []
for raw_fp in group_df["rawDataFilePath"]:
do_id = self._id_minter("nmdc:DataObject", 1)[0]

# Distinguish read type
do_type = None
if "_R1.fastq.gz" in raw_fp:
do_type = "Metagenome Raw Read 1"
elif "_R2.fastq.gz" in raw_fp:
do_type = "Metagenome Raw Read 2"

# Create the DataObject
data_obj = self._translate_data_object(
do_id=do_id,
url=raw_fp,
do_type=do_type,
manifest_id=manifest_id, # link to the new Manifest if it exists
)
database.data_object_set.append(data_obj)
data_object_ids.append(do_id)

# c) Finally, create the data generation record for this run
database.data_generation_set.append(
self._translate_nucleotide_sequencing(
nucleotide_sequencing_id=data_generation_id,
processed_sample_id=has_input_value,
raw_data_file_data=data_object_ids,
nucleotide_sequencing_row=lib_prep_row,
)
)

return database
3 changes: 3 additions & 0 deletions nmdc_runtime/site/workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ load_from:
- python_package:
package_name: nmdc_runtime.site.repository
attribute: biosample_submission_ingest
- python_package:
package_name: nmdc_runtime.site.repository
attribute: biosample_export
- python_package:
package_name: nmdc_runtime.site.repository
attribute: database_records_stitching
Expand Down

0 comments on commit 65b0607

Please sign in to comment.