Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Account for multiple sequencing runs in NEON surface water translator code #898

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down Expand Up @@ -694,7 +694,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down Expand Up @@ -737,7 +737,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
"get_neon_pipeline_benthic_data_product": {
Expand Down Expand Up @@ -779,7 +779,7 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down Expand Up @@ -822,14 +822,14 @@ def biosample_submission_ingest():
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
"get_neon_pipeline_surface_water_data_product": {
"config": {
"surface_water_data_product": {
"product_id": "DP1.20281.001",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent, mms_swRawDataFiles",
}
}
},
Expand All @@ -856,15 +856,15 @@ def biosample_submission_ingest():
"config": {
"surface_water_data_product": {
"product_id": "DP1.20281.001",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent",
"product_tables": "mms_swMetagenomeSequencing, mms_swMetagenomeDnaExtraction, amc_fieldGenetic, amc_fieldSuperParent, mms_swRawDataFiles",
}
}
},
"get_neon_pipeline_inputs": {
"inputs": {
"neon_envo_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/neon_mixs_env_triad_mappings/neon-nlcd-local-broad-mappings.tsv",
"neon_raw_data_file_mappings_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/main/assets/misc/neon_raw_data_file_mappings.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/berkeley-schema-fy24/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
"neon_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/neon_sequencingMethod_to_nmdc_instrument_set.tsv",
}
},
},
Expand Down
206 changes: 128 additions & 78 deletions nmdc_runtime/site/translation/neon_surface_water_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
neon_amb_data_tables = (
"mms_swMetagenomeSequencing",
"mms_swMetagenomeDnaExtraction",
"mms_swRawDataFiles",
"amc_fieldGenetic",
"amc_fieldSuperParent",
)
Expand All @@ -88,6 +89,9 @@ def __init__(
if_exists="replace",
index=False,
)
surface_water_data["mms_swRawDataFiles"].to_sql(
"mms_swRawDataFiles", self.conn, if_exists="replace", index=False
)
surface_water_data["amc_fieldGenetic"].to_sql(
"amc_fieldGenetic", self.conn, if_exists="replace", index=False
)
Expand All @@ -103,10 +107,7 @@ def __init__(
"neonEnvoTerms", self.conn, if_exists="replace", index=False
)

self.neon_raw_data_file_mappings_df = neon_raw_data_file_mappings_file
self.neon_raw_data_file_mappings_df.to_sql(
"neonRawDataFile", self.conn, if_exists="replace", index=False
)
self.neon_raw_data_file_mappings_df = surface_water_data["mms_swRawDataFiles"]

self.site_code_mapping = site_code_mapping

Expand Down Expand Up @@ -371,7 +372,7 @@ def _translate_processed_sample(
)

def _translate_data_object(
self, do_id: str, url: str, do_type: str, checksum: str
self, do_id: str, url: str, do_type: str, manifest_id: str
) -> nmdc.DataObject:
"""Create nmdc DataObject which is the output of a NucleotideSequencing process. This
object mainly contains information about the sequencing file that was generated as
Expand All @@ -395,8 +396,15 @@ def _translate_data_object(
url=url,
description=f"sequencing results for {basename}",
type="nmdc:DataObject",
md5_checksum=checksum,
data_object_type=do_type,
in_manifest=manifest_id,
)

def _translate_manifest(self, manifest_id: str) -> nmdc.Manifest:
return nmdc.Manifest(
id=manifest_id,
manifest_category=nmdc.ManifestCategoryEnum.poolable_replicates,
type="nmdc:Manifest",
)

def get_database(self):
Expand Down Expand Up @@ -477,6 +485,9 @@ def get_database(self):
"""
surface_water_samples = pd.read_sql_query(query, self.conn)

# --------------------------------------------------
# Create mappings for minted NMDC IDs
# --------------------------------------------------
neon_biosample_ids = surface_water_samples["parentSampleID"]
nmdc_biosample_ids = self._id_minter("nmdc:Biosample", len(neon_biosample_ids))
neon_to_nmdc_biosample_ids = dict(zip(neon_biosample_ids, nmdc_biosample_ids))
Expand Down Expand Up @@ -511,30 +522,20 @@ def get_database(self):
zip(neon_lib_prep_processed_ids, nmdc_lib_prep_processed_ids)
)

neon_omprc_ids = surface_water_samples["parentSampleID"]
nmdc_omprc_ids = self._id_minter(
"nmdc:NucleotideSequencing", len(neon_omprc_ids)
)
neon_to_nmdc_omprc_ids = dict(zip(neon_omprc_ids, nmdc_omprc_ids))

neon_raw_data_file_mappings_df = self.neon_raw_data_file_mappings_df
neon_raw_file_paths = neon_raw_data_file_mappings_df["rawDataFilePath"]
nmdc_data_object_ids = self._id_minter(
"nmdc:DataObject", len(neon_raw_file_paths)
)
neon_to_nmdc_data_object_ids = dict(
zip(neon_raw_file_paths, nmdc_data_object_ids)
)

# --------------------------------------------------
# STEP 1: Insert Biosamples
# --------------------------------------------------
for neon_id, nmdc_id in neon_to_nmdc_biosample_ids.items():
biosample_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
]
# database.biosample_set.append(
# self._translate_biosample(neon_id, nmdc_id, biosample_row)
# )

database.biosample_set.append(
self._translate_biosample(neon_id, nmdc_id, biosample_row)
)

# --------------------------------------------------
# STEP 2: Insert Extraction Processes
# --------------------------------------------------
for neon_id, nmdc_id in neon_to_nmdc_extraction_ids.items():
extraction_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
Expand All @@ -557,30 +558,17 @@ def get_database(self):
extraction_row, "genomicsSampleID"
)

# Each Extraction process output => ProcessedSample
database.processed_sample_set.append(
self._translate_processed_sample(
processed_sample_id,
f"Extracted DNA from {genomics_sample_id}",
)
)

query = """
SELECT dnaSampleID, GROUP_CONCAT(rawDataFilePath, '|') AS rawDataFilePaths
FROM neonRawDataFile
GROUP BY dnaSampleID
"""
neon_raw_data_files = pd.read_sql_query(query, self.conn)
neon_raw_data_files_dict = (
neon_raw_data_files.set_index("dnaSampleID")["rawDataFilePaths"]
.str.split("|")
.to_dict()
)
filtered_neon_raw_data_files_dict = {
key: value
for key, value in neon_raw_data_files_dict.items()
if len(value) <= 2
}

# --------------------------------------------------
# STEP 3: Insert LibraryPreparation Processes
# --------------------------------------------------
for neon_id, nmdc_id in neon_to_nmdc_lib_prep_ids.items():
lib_prep_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
Expand All @@ -601,49 +589,111 @@ def get_database(self):

dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID")

# Each LibraryPreparation process output => ProcessedSample
database.processed_sample_set.append(
self._translate_processed_sample(
processed_sample_id,
f"Library preparation for {dna_sample_id}",
)
)

has_output = None
has_output_do_ids = []

if dna_sample_id in filtered_neon_raw_data_files_dict:
has_output = filtered_neon_raw_data_files_dict[dna_sample_id]
for item in has_output:
if item in neon_to_nmdc_data_object_ids:
has_output_do_ids.append(neon_to_nmdc_data_object_ids[item])

checksum = None
do_type = None

checksum = neon_raw_data_file_mappings_df[
neon_raw_data_file_mappings_df["rawDataFilePath"] == item
]["checkSum"].values[0]
if "_R1.fastq.gz" in item:
do_type = "Metagenome Raw Read 1"
elif "_R2.fastq.gz" in item:
do_type = "Metagenome Raw Read 2"

database.data_object_set.append(
self._translate_data_object(
neon_to_nmdc_data_object_ids.get(item),
item,
do_type,
checksum,
)
)

database.data_generation_set.append(
self._translate_nucleotide_sequencing(
neon_to_nmdc_omprc_ids.get(neon_id),
processed_sample_id,
has_output_do_ids,
lib_prep_row,
)
# --------------------------------------------------
# STEP 4: Group raw files by (dnaSampleID, sequencerRunID)
# and insert DataObjects + DataGeneration processes
# --------------------------------------------------
raw_query = """
SELECT dnaSampleID, sequencerRunID, rawDataFilePath
FROM mms_swRawDataFiles
"""
neon_raw_data_files_df = pd.read_sql_query(raw_query, self.conn)

for neon_id, nmdc_libprep_id in neon_to_nmdc_lib_prep_ids.items():
# 1) Pull out the row that corresponds to this parentSampleID
lib_prep_row = surface_water_samples[
surface_water_samples["parentSampleID"] == neon_id
]

# 2) Grab the dnaSampleID from that row
dna_sample_id = _get_value_or_none(lib_prep_row, "dnaSampleID")
if not dna_sample_id:
# No dnaSampleID => skip
continue

# 3) Find all raw files for that dnaSampleID
dna_files = neon_raw_data_files_df[
neon_raw_data_files_df["dnaSampleID"] == dna_sample_id
]
if dna_files.empty:
# No raw files => skip
continue

# -----------------------------------------
# LOOKUP DICT: get "has_input" for this neon_id
# -----------------------------------------
has_input_value = self.samp_procsm_dict.get(neon_id)
# If some neon_id isn't in the dictionary, handle it as needed
if not has_input_value:
# Could skip, or raise an error, or set a default
continue

# -------------------------------------------
# 4) CREATE A MANIFEST IF MULTIPLE RAW FILES
# for this row's dnaSampleID
# -------------------------------------------
manifest_id = None
if len(dna_files) > 2:
# For each row that references a dnaSampleID with multiple raw files,
# mint exactly one new manifest record
manifest_id = self._id_minter("nmdc:Manifest", 1)[0]
new_manifest = self._translate_manifest(manifest_id)
# Add to the database
database.manifest_set.append(new_manifest)

# -------------------------------------------
# 5) NOW GROUP FILES BY sequencerRunID
# => one data_generation record per run
# -------------------------------------------
lib_prep_processed_sample_id = neon_to_nmdc_lib_prep_processed_ids.get(
neon_id
)
if not lib_prep_processed_sample_id:
# If we don't have a ProcessedSample for some reason, skip
continue

for run_id, group_df in dna_files.groupby("sequencerRunID"):
# a) Mint new data_generation (NucleotideSequencing) ID for this run
data_generation_id = self._id_minter("nmdc:NucleotideSequencing", 1)[0]

# b) Create DataObjects for each raw file in this run
data_object_ids = []
for raw_fp in group_df["rawDataFilePath"]:
do_id = self._id_minter("nmdc:DataObject", 1)[0]

# Distinguish read type
do_type = None
if "_R1.fastq.gz" in raw_fp:
do_type = "Metagenome Raw Read 1"
elif "_R2.fastq.gz" in raw_fp:
do_type = "Metagenome Raw Read 2"

# Create the DataObject
data_obj = self._translate_data_object(
do_id=do_id,
url=raw_fp,
do_type=do_type,
manifest_id=manifest_id, # link to the new Manifest if it exists
)
database.data_object_set.append(data_obj)
data_object_ids.append(do_id)

# c) Finally, create the data generation record for this run
database.data_generation_set.append(
self._translate_nucleotide_sequencing(
nucleotide_sequencing_id=data_generation_id,
processed_sample_id=has_input_value,
raw_data_file_data=data_object_ids,
nucleotide_sequencing_row=lib_prep_row,
)
)

return database
3 changes: 3 additions & 0 deletions nmdc_runtime/site/workspace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ load_from:
- python_package:
package_name: nmdc_runtime.site.repository
attribute: biosample_submission_ingest
- python_package:
package_name: nmdc_runtime.site.repository
attribute: biosample_export
- python_package:
package_name: nmdc_runtime.site.repository
attribute: database_records_stitching
Expand Down