diff --git a/add_bold_records.sbatch b/add_bold_records.sbatch index 1a2e3b8..679ce60 100644 --- a/add_bold_records.sbatch +++ b/add_bold_records.sbatch @@ -4,36 +4,37 @@ #SBATCH -t 01:30:00 #SBATCH --job-name="add_bold_phylogatr" #SBATCH --output=log/%x-%j.out +#SBATCH --gres=pfsdir cd $SLURM_SUBMIT_DIR [ -f "$SLURM_SUBMIT_DIR/env" ] && source "$SLURM_SUBMIT_DIR/env" set -xe -cp $WORKDIR/bold.tsv $TMPDIR -cp $WORKDIR/genes.tar.gz $TMPDIR -time (cd $TMPDIR; tar xzf genes.tar.gz) +BOLD_ROOT="$PFSDIR/bold" +mkdir -p "$BOLD_ROOT" + +cp $WORKDIR/bold.tsv $PFSDIR +cp $WORKDIR/genes.tar.gz $PFSDIR +time (cd $PFSDIR; tar xzf genes.tar.gz) if [[ "$PHYLOGATR_DATABASE_URL" == *sqlite3 ]]; then # serial - time GENBANK_ROOT=$TMPDIR/genes bin/rake pipeline:add_bold_records < $TMPDIR/bold.tsv 2> $TMPDIR/bold.errors + mv "$PFSDIR/bold.tsv" "$BOLD_ROOT/bold.tsv" + time BOLD_ROOT="$BOLD_ROOT" GENBANK_ROOT=$PFSDIR/genes bin/rake pipeline:add_bold_records else # parallel # NOTE: 14 not 28 because more than that puts too much load on MySQL # this only takes 10 minutes anyways - split -d -n 14 $TMPDIR/bold.tsv $TMPDIR/x + split -d --additional-suffix '.tsv' -n 14 $PFSDIR/bold.tsv $BOLD_ROOT/x - for tsv in $TMPDIR/x* - do - time GENBANK_ROOT=$TMPDIR/genes bin/rake pipeline:add_bold_records < $tsv 2>$tsv.errors & - done - wait + BOLD_ROOT="$BOLD_ROOT" GENBANK_ROOT=$PFSDIR/genes bin/rake pipeline:add_bold_records # print out all errors with header per error file echo "Errors:" - tail -n +1 $TMPDIR/*errors + tail -n +1 $PFSDIR/*errors fi -time (cd $TMPDIR; tar czf genes_with_bold.tar.gz genes) -cp $TMPDIR/genes_with_bold.tar.gz $WORKDIR/genes.tar.gz +time (cd $PFSDIR; tar czf genes_with_bold.tar.gz genes) +cp $PFSDIR/genes_with_bold.tar.gz $WORKDIR/genes.tar.gz diff --git a/lib/tasks/pipeline.rake b/lib/tasks/pipeline.rake index f551ea7..cbbf716 100644 --- a/lib/tasks/pipeline.rake +++ b/lib/tasks/pipeline.rake @@ -216,7 +216,7 @@ namespace :pipeline do totals = Parallel.map(files) do |file| puts "reading file #{file}" - output_records = invalid_records = duplicate_records = 0 + invalid_records = duplicate_records = 0 records = CSV.read(file, col_sep: "\t", headers: BoldRecord::HEADERS) input_records = records.size @@ -224,7 +224,7 @@ namespace :pipeline do all = records.map do |record| BoldRecord.new(record.to_h) end.select do |record| - invalid_records += 1 if !record.valid? + invalid_records += 1 unless record.valid? record.valid? end.reject do |record| duplicate_records += 1 if record.duplicate? @@ -255,18 +255,9 @@ namespace :pipeline do desc 'add bold records to database' task add_bold_records: :environment do - # Job array makes it easy cause then you can use the SAME - # starting boldfile is an ARRAY and then the job array is just the index to get you that item - # - # For each files - # - # cp boldfile.tsv $TMPDIR/bold.tsv - # cut -d $'\t' -f1,3,4,5,10,12,14,16,20,22,24,47,48,70,71,72 $TMPDIR/bold.tsv > $TMPDIR/bold.simple.tsv - - # STDIN has all the bold records - # markercode is understood to be gene_symbol - # nucleotides is understood to be sequence - csv = CSV.new($stdin, col_sep: "\t", headers: BoldRecord::HEADERS) + if ENV['BOLD_ROOT'].nil? || !File.directory?(ENV['BOLD_ROOT']) + raise "#{ENV['BOLD_ROOT']} set in BOLD_ROOT environment variable is invalid!" + end kingdoms = { 'Acanthocephala' => 'Animalia', @@ -295,7 +286,6 @@ namespace :pipeline do 'Sipuncula' => 'Animalia', 'Tardigrada' => 'Animalia', 'Zygomycota' => 'Fungi', - 'Chytridiomycota' => 'Fungi', 'Entoprocta' => 'Animalia', 'Gastrotricha' => 'Animalia', @@ -309,81 +299,101 @@ namespace :pipeline do 'Pteridophyta' => 'Plantae' } - phylums_ignore = Set.new(['Chlorarachniophyta', 'Heterokontophyta', 'Lycopodiophyta', 'Myxomycota', 'Onychophora', - 'Pyrrophycophyta']) - - fasta_files_updated = Set.new + files = Dir.glob("#{ENV['BOLD_ROOT']}/*.tsv") + totals = Parallel.map(files) do |file| + invalid_records = invalid_taxons = invalid_occurences = output_files = input_records = 0 + + CSV.foreach(file, col_sep: "\t", headers: BoldRecord::HEADERS) do |line| + # read line by line so we can catch and skip individual malformed rows + begin + input_records += 1 + record = BoldRecord.new(**line.to_h) + rescue StandardError => e + invalid_records += 1 + puts "#{e.class} #{e.message}" + next + end - loop do - # read line by line so we can catch and skip individual malformed rows - record = nil - begin - row = csv.shift - break unless row + # TODO: every skipped record should be written to a file + unless record.gene_symbol_mapped.present? && record.sequence.present? && record.species.present? && record.species_binomial? + invalid_records += 1 + next + end - record = BoldRecord.new(**row.to_h) - rescue StandardError => e - puts "#{e.class} #{e.message}" - next - end + # FIXME: be careful of case issues + # FIXME: cache species to reduce the number of queries? + species = Species.find_by(taxon_species: record.species) + taxons_set = ['phylum', 'class', 'order', 'family', 'genus', 'species'].all? do |t| + record.send(:"taxon_#{t}").present? + end - # TODO: every skipped record should be written to a file - unless record.gene_symbol_mapped.present? && record.sequence.present? && record.species.present? && record.species_binomial? - next - end + unless species || (taxons_set && kingdoms.include?(record.taxon_phylum)) + invalid_taxons += 1 + next + end - # FIXME: be careful of case issues - # FIXME: cache species to reduce the number of queries? - species = Species.find_by(taxon_species: record.species) - taxons_set = ['phylum', 'class', 'order', 'family', 'genus', 'species'].all? do |t| - record.send(:"taxon_#{t}").present? - end + if species.nil? + species = Species.create( + path: File.join(record.taxon_class, record.taxon_order, record.taxon_family, + record.species.gsub(' ', '-')), + taxon_kingdom: kingdoms[record.taxon_phylum], + taxon_phylum: record.taxon_phylum, + taxon_class: record.taxon_class, + taxon_order: record.taxon_order, + taxon_family: record.taxon_family, + taxon_genus: record.taxon_genus, + taxon_species: record.species, + aligned: false + ) + end - next unless species || (taxons_set && kingdoms.include?(record.taxon_phylum)) - - if species.nil? - species = Species.create( - path: File.join(record.taxon_class, record.taxon_order, record.taxon_family, - record.species.gsub(' ', '-')), - taxon_kingdom: kingdoms[record.taxon_phylum], - taxon_phylum: record.taxon_phylum, - taxon_class: record.taxon_class, - taxon_order: record.taxon_order, - taxon_family: record.taxon_family, - taxon_genus: record.taxon_genus, - taxon_species: record.species, - aligned: false + occurrence = Occurrence.new( + source: :bold, + source_id: record.process_id, + catalog_number: record.catalog_number.presence, + field_number: record.field_number.presence, + accession: record.accession.presence, + lat: record.lat, + lng: record.lng, + species_id: species.id, + genes: record.gene_symbol_mapped ) - end - occurrence = Occurrence.new( - source: :bold, - source_id: record.process_id, - catalog_number: record.catalog_number.presence, - field_number: record.field_number.presence, - accession: record.accession.presence, - lat: record.lat, - lng: record.lng, - species_id: species.id, - genes: record.gene_symbol_mapped - ) + unless occurrence.save + invalid_occurences += 1 + next + end - next unless occurrence.save + # Reptilia/Squamata/Agamidae/Phrynocephalus-persicus/Phrynocephalus-persicus-COI + fname = "#{record.species}-#{record.gene_symbol_mapped.upcase}".gsub(' ', '-') + fasta_path = "#{Configuration.genbank_root.join(species.path, fname)}.fa" - # Reptilia/Squamata/Agamidae/Phrynocephalus-persicus/Phrynocephalus-persicus-COI - fasta_path = Configuration.genbank_root.join(species.path, "#{record.species}-#{record.gene_symbol_mapped.upcase}").to_s.gsub( - ' ', '-' - ) + '.fa' + # occurrence saved, now write gene data + FileUtils.mkdir_p(File.dirname(fasta_path)) - # occurrence saved, now write gene data - FileUtils.mkdir_p(File.dirname(fasta_path)) + File.write(fasta_path, record.fasta_sequence, mode: 'a+') - File.write(fasta_path, record.fasta_sequence, mode: 'a+') + output_files += 1 - fasta_files_updated << fasta_path + species.update(aligned: false) if species.aligned + end - species.update(aligned: false) if species.aligned + { + 'input_records' => input_records, + 'invalid_records' => invalid_records, + 'invalid_taxons' => invalid_taxons, + 'invalid_occurences' => invalid_occurences, + 'output_files' => output_files + } + end.each_with_object({}) do |entry, total| + total['input_records'] = entry['input_records'] + (total['input_records'] || 0) + total['invalid_records'] = entry['invalid_records'] + (total['invalid_records'] || 0) + total['invalid_taxons'] = entry['invalid_taxons'] + (total['invalid_taxons'] || 0) + total['invalid_occurences'] = entry['invalid_occurences'] + (total['invalid_occurences'] || 0) + total['output_files'] = entry['output_files'] + (total['output_files'] || 0) end + + PipelineMetrics.append_record(totals.merge({ 'name' => 'add_bold_records' })) end desc 'align fasta files'