Skip to content

Commit

Permalink
adding bold records records metrics (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
johrstrom authored Feb 24, 2022
1 parent 9a90b5c commit 2dcfb1d
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 89 deletions.
27 changes: 14 additions & 13 deletions add_bold_records.sbatch
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,37 @@
#SBATCH -t 01:30:00
#SBATCH --job-name="add_bold_phylogatr"
#SBATCH --output=log/%x-%j.out
#SBATCH --gres=pfsdir

cd $SLURM_SUBMIT_DIR
[ -f "$SLURM_SUBMIT_DIR/env" ] && source "$SLURM_SUBMIT_DIR/env"

set -xe

cp $WORKDIR/bold.tsv $TMPDIR
cp $WORKDIR/genes.tar.gz $TMPDIR
time (cd $TMPDIR; tar xzf genes.tar.gz)
BOLD_ROOT="$PFSDIR/bold"
mkdir -p "$BOLD_ROOT"

cp $WORKDIR/bold.tsv $PFSDIR
cp $WORKDIR/genes.tar.gz $PFSDIR
time (cd $PFSDIR; tar xzf genes.tar.gz)

if [[ "$PHYLOGATR_DATABASE_URL" == *sqlite3 ]]; then
# serial
time GENBANK_ROOT=$TMPDIR/genes bin/rake pipeline:add_bold_records < $TMPDIR/bold.tsv 2> $TMPDIR/bold.errors
mv "$PFSDIR/bold.tsv" "$BOLD_ROOT/bold.tsv"
time BOLD_ROOT="$BOLD_ROOT" GENBANK_ROOT=$PFSDIR/genes bin/rake pipeline:add_bold_records
else
# parallel
# NOTE: 14 not 28 because more than that puts too much load on MySQL
# this only takes 10 minutes anyways
split -d -n 14 $TMPDIR/bold.tsv $TMPDIR/x
split -d --additional-suffix '.tsv' -n 14 $PFSDIR/bold.tsv $BOLD_ROOT/x

for tsv in $TMPDIR/x*
do
time GENBANK_ROOT=$TMPDIR/genes bin/rake pipeline:add_bold_records < $tsv 2>$tsv.errors &
done
wait
BOLD_ROOT="$BOLD_ROOT" GENBANK_ROOT=$PFSDIR/genes bin/rake pipeline:add_bold_records

# print out all errors with header per error file
echo "Errors:"
tail -n +1 $TMPDIR/*errors
tail -n +1 $PFSDIR/*errors
fi


time (cd $TMPDIR; tar czf genes_with_bold.tar.gz genes)
cp $TMPDIR/genes_with_bold.tar.gz $WORKDIR/genes.tar.gz
time (cd $PFSDIR; tar czf genes_with_bold.tar.gz genes)
cp $PFSDIR/genes_with_bold.tar.gz $WORKDIR/genes.tar.gz
162 changes: 86 additions & 76 deletions lib/tasks/pipeline.rake
Original file line number Diff line number Diff line change
Expand Up @@ -216,15 +216,15 @@ namespace :pipeline do

totals = Parallel.map(files) do |file|
puts "reading file #{file}"
output_records = invalid_records = duplicate_records = 0
invalid_records = duplicate_records = 0

records = CSV.read(file, col_sep: "\t", headers: BoldRecord::HEADERS)
input_records = records.size

all = records.map do |record|
BoldRecord.new(record.to_h)
end.select do |record|
invalid_records += 1 if !record.valid?
invalid_records += 1 unless record.valid?
record.valid?
end.reject do |record|
duplicate_records += 1 if record.duplicate?
Expand Down Expand Up @@ -255,18 +255,9 @@ namespace :pipeline do

desc 'add bold records to database'
task add_bold_records: :environment do
# Job array makes it easy cause then you can use the SAME
# starting boldfile is an ARRAY and then the job array is just the index to get you that item
#
# For each files
#
# cp boldfile.tsv $TMPDIR/bold.tsv
# cut -d $'\t' -f1,3,4,5,10,12,14,16,20,22,24,47,48,70,71,72 $TMPDIR/bold.tsv > $TMPDIR/bold.simple.tsv

# STDIN has all the bold records
# markercode is understood to be gene_symbol
# nucleotides is understood to be sequence
csv = CSV.new($stdin, col_sep: "\t", headers: BoldRecord::HEADERS)
if ENV['BOLD_ROOT'].nil? || !File.directory?(ENV['BOLD_ROOT'])
raise "#{ENV['BOLD_ROOT']} set in BOLD_ROOT environment variable is invalid!"
end

kingdoms = {
'Acanthocephala' => 'Animalia',
Expand Down Expand Up @@ -295,7 +286,6 @@ namespace :pipeline do
'Sipuncula' => 'Animalia',
'Tardigrada' => 'Animalia',
'Zygomycota' => 'Fungi',

'Chytridiomycota' => 'Fungi',
'Entoprocta' => 'Animalia',
'Gastrotricha' => 'Animalia',
Expand All @@ -309,81 +299,101 @@ namespace :pipeline do
'Pteridophyta' => 'Plantae'
}

phylums_ignore = Set.new(['Chlorarachniophyta', 'Heterokontophyta', 'Lycopodiophyta', 'Myxomycota', 'Onychophora',
'Pyrrophycophyta'])

fasta_files_updated = Set.new
files = Dir.glob("#{ENV['BOLD_ROOT']}/*.tsv")
totals = Parallel.map(files) do |file|
invalid_records = invalid_taxons = invalid_occurences = output_files = input_records = 0

CSV.foreach(file, col_sep: "\t", headers: BoldRecord::HEADERS) do |line|
# read line by line so we can catch and skip individual malformed rows
begin
input_records += 1
record = BoldRecord.new(**line.to_h)
rescue StandardError => e
invalid_records += 1
puts "#{e.class} #{e.message}"
next
end

loop do
# read line by line so we can catch and skip individual malformed rows
record = nil
begin
row = csv.shift
break unless row
# TODO: every skipped record should be written to a file
unless record.gene_symbol_mapped.present? && record.sequence.present? && record.species.present? && record.species_binomial?
invalid_records += 1
next
end

record = BoldRecord.new(**row.to_h)
rescue StandardError => e
puts "#{e.class} #{e.message}"
next
end
# FIXME: be careful of case issues
# FIXME: cache species to reduce the number of queries?
species = Species.find_by(taxon_species: record.species)
taxons_set = ['phylum', 'class', 'order', 'family', 'genus', 'species'].all? do |t|
record.send(:"taxon_#{t}").present?
end

# TODO: every skipped record should be written to a file
unless record.gene_symbol_mapped.present? && record.sequence.present? && record.species.present? && record.species_binomial?
next
end
unless species || (taxons_set && kingdoms.include?(record.taxon_phylum))
invalid_taxons += 1
next
end

# FIXME: be careful of case issues
# FIXME: cache species to reduce the number of queries?
species = Species.find_by(taxon_species: record.species)
taxons_set = ['phylum', 'class', 'order', 'family', 'genus', 'species'].all? do |t|
record.send(:"taxon_#{t}").present?
end
if species.nil?
species = Species.create(
path: File.join(record.taxon_class, record.taxon_order, record.taxon_family,
record.species.gsub(' ', '-')),
taxon_kingdom: kingdoms[record.taxon_phylum],
taxon_phylum: record.taxon_phylum,
taxon_class: record.taxon_class,
taxon_order: record.taxon_order,
taxon_family: record.taxon_family,
taxon_genus: record.taxon_genus,
taxon_species: record.species,
aligned: false
)
end

next unless species || (taxons_set && kingdoms.include?(record.taxon_phylum))

if species.nil?
species = Species.create(
path: File.join(record.taxon_class, record.taxon_order, record.taxon_family,
record.species.gsub(' ', '-')),
taxon_kingdom: kingdoms[record.taxon_phylum],
taxon_phylum: record.taxon_phylum,
taxon_class: record.taxon_class,
taxon_order: record.taxon_order,
taxon_family: record.taxon_family,
taxon_genus: record.taxon_genus,
taxon_species: record.species,
aligned: false
occurrence = Occurrence.new(
source: :bold,
source_id: record.process_id,
catalog_number: record.catalog_number.presence,
field_number: record.field_number.presence,
accession: record.accession.presence,
lat: record.lat,
lng: record.lng,
species_id: species.id,
genes: record.gene_symbol_mapped
)
end

occurrence = Occurrence.new(
source: :bold,
source_id: record.process_id,
catalog_number: record.catalog_number.presence,
field_number: record.field_number.presence,
accession: record.accession.presence,
lat: record.lat,
lng: record.lng,
species_id: species.id,
genes: record.gene_symbol_mapped
)
unless occurrence.save
invalid_occurences += 1
next
end

next unless occurrence.save
# Reptilia/Squamata/Agamidae/Phrynocephalus-persicus/Phrynocephalus-persicus-COI
fname = "#{record.species}-#{record.gene_symbol_mapped.upcase}".gsub(' ', '-')
fasta_path = "#{Configuration.genbank_root.join(species.path, fname)}.fa"

# Reptilia/Squamata/Agamidae/Phrynocephalus-persicus/Phrynocephalus-persicus-COI
fasta_path = Configuration.genbank_root.join(species.path, "#{record.species}-#{record.gene_symbol_mapped.upcase}").to_s.gsub(
' ', '-'
) + '.fa'
# occurrence saved, now write gene data
FileUtils.mkdir_p(File.dirname(fasta_path))

# occurrence saved, now write gene data
FileUtils.mkdir_p(File.dirname(fasta_path))
File.write(fasta_path, record.fasta_sequence, mode: 'a+')

File.write(fasta_path, record.fasta_sequence, mode: 'a+')
output_files += 1

fasta_files_updated << fasta_path
species.update(aligned: false) if species.aligned
end

species.update(aligned: false) if species.aligned
{
'input_records' => input_records,
'invalid_records' => invalid_records,
'invalid_taxons' => invalid_taxons,
'invalid_occurences' => invalid_occurences,
'output_files' => output_files
}
end.each_with_object({}) do |entry, total|
total['input_records'] = entry['input_records'] + (total['input_records'] || 0)
total['invalid_records'] = entry['invalid_records'] + (total['invalid_records'] || 0)
total['invalid_taxons'] = entry['invalid_taxons'] + (total['invalid_taxons'] || 0)
total['invalid_occurences'] = entry['invalid_occurences'] + (total['invalid_occurences'] || 0)
total['output_files'] = entry['output_files'] + (total['output_files'] || 0)
end

PipelineMetrics.append_record(totals.merge({ 'name' => 'add_bold_records' }))
end

desc 'align fasta files'
Expand Down

0 comments on commit 2dcfb1d

Please sign in to comment.