Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework main workflow to tolerate larger datasets #70

Open
wants to merge 36 commits into
base: dev
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e0cdf4d
drop in replace cat_cat with find_concatenate
BioWilko Jan 27, 2025
ed826a0
Fix modules conf
BioWilko Jan 27, 2025
148fbfd
[automated] Fix code linting
nf-core-bot Jan 27, 2025
2da5527
replace gunzip with find/unpigz
BioWilko Jan 29, 2025
b072ad6
No taxid duplication check
BioWilko Jan 29, 2025
7096fdb
Do not require unique tax ids
BioWilko Jan 30, 2025
acb8910
Remove taxid uniquness again, for the final time
BioWilko Jan 30, 2025
5bd7ae6
Fix it
BioWilko Jan 30, 2025
d65039e
collect batches
BioWilko Jan 30, 2025
5de809b
grouptuple before find_cat
BioWilko Jan 30, 2025
b4780be
Remap flattened fastas to metadata with an inner join
BioWilko Jan 30, 2025
0b45f7a
remove taxid uniqueness param entirely
BioWilko Jan 30, 2025
21fbf81
Merge branch 'dev' into find_concatenate_patch
jfy133 Jan 30, 2025
9279716
Merge branch 'find_concatenate_patch' of github.com:nf-core/createtax…
jfy133 Jan 30, 2025
2b53f99
Fixlinting
jfy133 Jan 30, 2025
b660f27
Update find/unpigz remove hack
BioWilko Jan 31, 2025
c0168f0
enforce id uniqueness
BioWilko Jan 31, 2025
02d747f
remove local find unzip module
BioWilko Jan 31, 2025
c033e6b
unhide unzip batch size
BioWilko Jan 31, 2025
2542775
Merge branch 'dev' into find_concatenate_patch
jfy133 Jan 31, 2025
0208693
[automated] Fix code linting
nf-core-bot Jan 31, 2025
4675773
update find/unpigz
BioWilko Feb 3, 2025
daa0820
Patch workflow join logic
BioWilko Feb 3, 2025
24ba8c6
Filter out rows without dna fastas for matching
BioWilko Feb 6, 2025
4a604df
Fix AA batching logic
BioWilko Feb 6, 2025
e3a55b2
Join unbatched aa fastas with their metadata for kaiju
BioWilko Feb 6, 2025
c3a8626
First subworkflow test
BioWilko Feb 6, 2025
ce87fdb
Fix preprocessing subworkflow
BioWilko Feb 10, 2025
aa0a10f
input ungrouped fasta refs to ganon
BioWilko Feb 10, 2025
34c619b
Initialise outputs as empty channels
BioWilko Feb 10, 2025
6fcd794
ganon wants grouped DNA fastas
BioWilko Feb 10, 2025
79cabe6
Fix kaiju outs
BioWilko Feb 10, 2025
718d178
Reduce unzip batch size for test profile
BioWilko Feb 28, 2025
faf9b9a
test unzip batch size to 1
BioWilko Mar 1, 2025
ed8fce9
Merge branch 'dev' into find_concatenate_patch
BioWilko Mar 7, 2025
96ce159
remove commented includes
BioWilko Mar 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
First subworkflow test
BioWilko committed Feb 6, 2025
commit c3a8626ebe8822e4d022f991e2e053b4ffbad6ab
177 changes: 177 additions & 0 deletions subworkflows/local/preprocessing/main.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
include { FIND_UNPIGZ as UNPIGZ_DNA } from '../../../modules/nf-core/find/unpigz/main'
include { FIND_UNPIGZ as UNPIGZ_AA } from '../../../modules/nf-core/find/unpigz/main'
include { FIND_CONCATENATE as FIND_CONCATENATE_DNA } from '../../../modules/nf-core/find/concatenate/main'
include { FIND_CONCATENATE as FIND_CONCATENATE_AA } from '../../../modules/nf-core/find/concatenate/main'
include { FIND_CONCATENATE as FIND_CONCATENATE_AA_KAIJU } from '../../../modules/nf-core/find/concatenate/main'
include { SEQKIT_REPLACE } from '../../../modules/nf-core/seqkit/replace/main'

workflow PREPROCESSING {
take:
ch_samplesheet // channel: samplesheet read in from --input

main:

ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DATA PREPARATION
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
// PREPARE: Prepare input for single file inputs modules
def malt_build_mode = null
if (params.build_malt) {
malt_build_mode = params.malt_build_options.contains('--sequenceType Protein') ? 'protein' : 'nucleotide'
}

if ([(params.build_malt && malt_build_mode == 'nucleotide'), params.build_centrifuge, params.build_kraken2, params.build_bracken, params.build_krakenuniq, params.build_ganon].any()) {
// Pull just DNA sequences

ch_dna_refs_for_singleref = ch_samplesheet
.map { meta, fasta_dna, _fasta_aa -> [meta, fasta_dna] }
.filter { _meta, fasta_dna ->
fasta_dna
}

// Make channel to preserve meta for decompress/compression
ch_dna_refs_for_rematching = ch_samplesheet
.filter { _meta, fasta_dna, _fasta_aa ->
fasta_dna
}
.map { meta, fasta_dna, _fasta_aa ->
[
fasta_dna.getBaseName(fasta_dna.name.endsWith('.gz') ? 1 : 0),
meta,
]
}

ch_aa_refs_for_rematching = ch_samplesheet
.filter { _meta, _fasta_dna, fasta_aa ->
fasta_aa
}
.map { meta, _fasta_dna, fasta_aa ->
[
fasta_aa.getBaseName(fasta_aa.name.endsWith('.gz') ? 1 : 0),
meta,
]
}

// Separate files for zipping and unzipping
ch_dna_for_unzipping = ch_dna_refs_for_singleref.branch { _meta, fasta ->
zipped: fasta.extension == 'gz'
unzipped: true
}

// Batch the zipped files for efficient unzipping of multiple files in a single process job
ch_dna_batches_for_unzipping = ch_dna_for_unzipping.zipped
.map { _meta, fasta -> fasta }
.collate(params.unzip_batch_size, true)
.map { batch -> [[id: params.dbname], batch] }

// Run the batch unzipping
UNPIGZ_DNA(ch_dna_batches_for_unzipping)
ch_versions = ch_versions.mix(UNPIGZ_DNA.out.versions.first())

// Mix back in the originally unzipped files
ch_prepped_dna_batches = UNPIGZ_DNA.out.file_out.mix(ch_dna_for_unzipping.unzipped)

// Unbatch the unzipped files for rematching with metadata
ch_prepped_dna_fastas_gunzipped = ch_prepped_dna_batches
.flatMap { _meta, batch -> batch }
.map { fasta -> [fasta.getName(), fasta] }

// Match metadata back to the prepped DNA fastas with an inner join
ch_prepped_dna_fastas_ungrouped = ch_prepped_dna_fastas_gunzipped
.join(ch_dna_refs_for_rematching, failOnMismatch: true, failOnDuplicate: true)
.map { _fasta_name, fasta, meta -> [meta, fasta] }

// Prepare for making the mega file
ch_prepped_dna_fastas = ch_prepped_dna_fastas_ungrouped
.map { _meta, fasta ->
[[id: params.dbname], fasta]
}
.groupTuple()

// Place in single mega file
FIND_CONCATENATE_DNA(ch_prepped_dna_fastas)
ch_versions = ch_versions.mix(FIND_CONCATENATE_DNA.out.versions)
ch_singleref_for_dna = FIND_CONCATENATE_DNA.out.file_out
}

if ([(params.build_malt && malt_build_mode == 'protein'), params.build_kaiju, params.build_diamond].any()) {

ch_aa_refs_for_singleref = ch_samplesheet
.map { meta, _fasta_dna, fasta_aa -> [meta, fasta_aa] }
.filter { _meta, fasta_aa ->
fasta_aa
}

ch_aa_for_unzipping = ch_aa_refs_for_singleref.branch { _meta, fasta ->
zipped: fasta.extension == 'gz'
unzipped: true
}

ch_aa_batches_for_unzipping = ch_aa_for_unzipping.zipped
.map { _meta, aa_fasta -> aa_fasta }
.collate(params.unzip_batch_size, true)
.map { batch -> [[id: params.dbname], batch] }

// Run the batch unzipping
UNPIGZ_AA(ch_aa_batches_for_unzipping)

// Mix back in the originally unzipped files
ch_prepped_aa_batches = UNPIGZ_AA.out.file_out.mix(ch_aa_for_unzipping.unzipped)

// Unbatch the unzipped files for rematching with metadata
ch_prepped_aa_fastas_gunzipped = ch_prepped_aa_batches
.flatMap { _meta, batch -> batch }
.map { fasta -> [fasta.getName(), fasta] }

// Match metadata back to the prepped DNA fastas with an inner join
ch_prepped_aa_fastas_ungrouped = ch_prepped_aa_fastas_gunzipped
.join(ch_aa_refs_for_rematching, failOnMismatch: true, failOnDuplicate: true)
.map { _fasta_name, fasta, meta -> [meta, fasta] }

ch_prepped_aa_fastas = ch_prepped_aa_fastas_ungrouped
.map { _meta, fasta -> [[id: params.dbname], fasta] }
.groupTuple()

ch_versions = ch_versions.mix(UNPIGZ_AA.out.versions.first())

if ([(params.build_malt && malt_build_mode == 'protein'), params.build_diamond].any()) {
FIND_CONCATENATE_AA(ch_prepped_aa_fastas)
ch_singleref_for_aa = FIND_CONCATENATE_AA.out.file_out
ch_versions = ch_versions.mix(FIND_CONCATENATE_AA.out.versions.first())
}

if ([params.build_kaiju].any()) {
SEQKIT_REPLACE(ch_prepped_aa_fastas_ungrouped)
ch_versions = ch_versions.mix(SEQKIT_REPLACE.out.versions.first())
ch_prepped_aa_fastas_kaiju = SEQKIT_REPLACE.out.fastx.map { _meta, fasta -> [[id: params.dbname], fasta] }.groupTuple()
FIND_CONCATENATE_AA_KAIJU(ch_prepped_aa_fastas_kaiju)
ch_versions = ch_versions.mix(FIND_CONCATENATE_AA_KAIJU.out.versions.first())
}
}

ch_singleref_for_dna = ch_singleref_for_dna ?: Channel.empty()
ch_singleref_for_aa = ch_singleref_for_aa ?: Channel.empty()
ch_grouped_dna_fastas = ch_prepped_dna_fastas ?: Channel.empty()
ch_grouped_aa_fastas = ch_prepped_aa_fastas ?: Channel.empty()
ch_prepped_dna_fastas_ungrouped = ch_prepped_dna_fastas_ungrouped ?: Channel.empty()
ch_prepped_aa_fastas_ungrouped = ch_prepped_aa_fastas_ungrouped ?: Channel.empty()
ch_kaiju_aa = FIND_CONCATENATE_AA_KAIJU.out.file_out ?: Channel.empty()
malt_build_mode = malt_build_mode ?: ""

emit:
singleref_for_dna = ch_singleref_for_dna
singleref_for_aa = ch_singleref_for_aa
grouped_dna_fastas = ch_grouped_dna_fastas
grouped_aa_fastas = ch_grouped_aa_fastas
ungrouped_dna = ch_prepped_dna_fastas_ungrouped
ungrouped_aa = ch_prepped_aa_fastas_ungrouped
kaiju_aa = ch_kaiju_aa
malt_build_mode = malt_build_mode
versions = ch_versions
multiqc_files = ch_multiqc_files
}
330 changes: 168 additions & 162 deletions workflows/createtaxdb.nf
Original file line number Diff line number Diff line change
@@ -4,30 +4,32 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/

include { MULTIQC } from '../modules/nf-core/multiqc/main'
include { paramsSummaryMap } from 'plugin/nf-schema'
include { paramsSummaryMultiqc } from '../subworkflows/nf-core/utils_nfcore_pipeline'
include { softwareVersionsToYAML } from '../subworkflows/nf-core/utils_nfcore_pipeline'
include { methodsDescriptionText } from '../subworkflows/local/utils_nfcore_createtaxdb_pipeline'
include { MULTIQC } from '../modules/nf-core/multiqc/main'
include { paramsSummaryMap } from 'plugin/nf-schema'
include { paramsSummaryMultiqc } from '../subworkflows/nf-core/utils_nfcore_pipeline'
include { softwareVersionsToYAML } from '../subworkflows/nf-core/utils_nfcore_pipeline'
include { methodsDescriptionText } from '../subworkflows/local/utils_nfcore_createtaxdb_pipeline'


// Preprocessing
include { FIND_UNPIGZ as UNPIGZ_DNA } from '../modules/nf-core/find/unpigz/main'
include { FIND_UNPIGZ as UNPIGZ_AA } from '../modules/nf-core/find/unpigz/main'
include { FIND_CONCATENATE as FIND_CONCATENATE_DNA } from '../modules/nf-core/find/concatenate/main'
include { FIND_CONCATENATE as FIND_CONCATENATE_AA } from '../modules/nf-core/find/concatenate/main'
include { FIND_CONCATENATE as FIND_CONCATENATE_AA_KAIJU } from '../modules/nf-core/find/concatenate/main'
include { SEQKIT_REPLACE } from '../modules/nf-core/seqkit/replace/main'
include { PREPROCESSING } from '../subworkflows/local/preprocessing/main'
// include { FIND_UNPIGZ as UNPIGZ_DNA } from '../modules/nf-core/find/unpigz/main'
// include { FIND_UNPIGZ as UNPIGZ_AA } from '../modules/nf-core/find/unpigz/main'
// include { FIND_CONCATENATE as FIND_CONCATENATE_DNA } from '../modules/nf-core/find/concatenate/main'
// include { FIND_CONCATENATE as FIND_CONCATENATE_AA } from '../modules/nf-core/find/concatenate/main'
// include { FIND_CONCATENATE as FIND_CONCATENATE_AA_KAIJU } from '../modules/nf-core/find/concatenate/main'
// include { SEQKIT_REPLACE } from '../modules/nf-core/seqkit/replace/main'

// Database building (with specific auxiliary modules)
include { CENTRIFUGE_BUILD } from '../modules/nf-core/centrifuge/build/main'
include { DIAMOND_MAKEDB } from '../modules/nf-core/diamond/makedb/main'
include { GANON_BUILDCUSTOM } from '../modules/nf-core/ganon/buildcustom/main'
include { KAIJU_MKFMI } from '../modules/nf-core/kaiju/mkfmi/main'
include { KRAKENUNIQ_BUILD } from '../modules/nf-core/krakenuniq/build/main'
include { UNZIP } from '../modules/nf-core/unzip/main'
include { MALT_BUILD } from '../modules/nf-core/malt/build/main'
include { CENTRIFUGE_BUILD } from '../modules/nf-core/centrifuge/build/main'
include { DIAMOND_MAKEDB } from '../modules/nf-core/diamond/makedb/main'
include { GANON_BUILDCUSTOM } from '../modules/nf-core/ganon/buildcustom/main'
include { KAIJU_MKFMI } from '../modules/nf-core/kaiju/mkfmi/main'
include { KRAKENUNIQ_BUILD } from '../modules/nf-core/krakenuniq/build/main'
include { UNZIP } from '../modules/nf-core/unzip/main'
include { MALT_BUILD } from '../modules/nf-core/malt/build/main'

include { FASTA_BUILD_ADD_KRAKEN2_BRACKEN } from '../subworkflows/nf-core/fasta_build_add_kraken2_bracken/main'
include { FASTA_BUILD_ADD_KRAKEN2_BRACKEN } from '../subworkflows/nf-core/fasta_build_add_kraken2_bracken/main'

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -50,145 +52,149 @@ workflow CREATETAXDB {
ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()

PREPROCESSING(ch_samplesheet)

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
DATA PREPARATION
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
// PREPARE: Prepare input for single file inputs modules
def malt_build_mode = null
if (params.build_malt) {
malt_build_mode = params.malt_build_options.contains('--sequenceType Protein') ? 'protein' : 'nucleotide'
}

if ([(params.build_malt && malt_build_mode == 'nucleotide'), params.build_centrifuge, params.build_kraken2, params.build_bracken, params.build_krakenuniq, params.build_ganon].any()) {
// Pull just DNA sequences

ch_dna_refs_for_singleref = ch_samplesheet
.map { meta, fasta_dna, _fasta_aa -> [meta, fasta_dna] }
.filter { _meta, fasta_dna ->
fasta_dna
}

// Make channel to preserve meta for decompress/compression
ch_dna_refs_for_rematching = ch_samplesheet
.filter { _meta, fasta_dna, _fasta_aa ->
fasta_dna
}
.map { meta, fasta_dna, _fasta_aa ->
[
fasta_dna.getBaseName(fasta_dna.name.endsWith('.gz') ? 1 : 0),
meta,
]
}

ch_aa_refs_for_rematching = ch_samplesheet
.filter { _meta, _fasta_dna, fasta_aa ->
fasta_aa
}
.map { meta, _fasta_dna, fasta_aa ->
[
fasta_aa.getBaseName(fasta_aa.name.endsWith('.gz') ? 1 : 0),
meta,
]
}

// Separate files for zipping and unzipping
ch_dna_for_unzipping = ch_dna_refs_for_singleref.branch { _meta, fasta ->
zipped: fasta.extension == 'gz'
unzipped: true
}

// Batch the zipped files for efficient unzipping of multiple files in a single process job
ch_dna_batches_for_unzipping = ch_dna_for_unzipping.zipped
.map { _meta, fasta -> fasta }
.collate(params.unzip_batch_size, true)
.map { batch -> [[id: params.dbname], batch] }

// Run the batch unzipping
UNPIGZ_DNA(ch_dna_batches_for_unzipping)
ch_versions = ch_versions.mix(UNPIGZ_DNA.out.versions.first())

// Mix back in the originally unzipped files
ch_prepped_dna_batches = UNPIGZ_DNA.out.file_out.mix(ch_dna_for_unzipping.unzipped)

// Unbatch the unzipped files for rematching with metadata
ch_prepped_dna_fastas_gunzipped = ch_prepped_dna_batches
.flatMap { _meta, batch -> batch }
.map { fasta -> [fasta.getName(), fasta] }

// Match metadata back to the prepped DNA fastas with an inner join
ch_prepped_dna_fastas_ungrouped = ch_prepped_dna_fastas_gunzipped
.join(ch_dna_refs_for_rematching, failOnMismatch: true, failOnDuplicate: true)
.map { _fasta_name, fasta, meta -> [meta, fasta] }

// Prepare for making the mega file
ch_prepped_dna_fastas = ch_prepped_dna_fastas_ungrouped
.map { _meta, fasta ->
[[id: params.dbname], fasta]
}
.groupTuple()

// Place in single mega file
FIND_CONCATENATE_DNA(ch_prepped_dna_fastas)
ch_versions = ch_versions.mix(FIND_CONCATENATE_DNA.out.versions)
ch_singleref_for_dna = FIND_CONCATENATE_DNA.out.file_out
}

if ([(params.build_malt && malt_build_mode == 'protein'), params.build_kaiju, params.build_diamond].any()) {

ch_aa_refs_for_singleref = ch_samplesheet
.map { meta, _fasta_dna, fasta_aa -> [meta, fasta_aa] }
.filter { _meta, fasta_aa ->
fasta_aa
}

ch_aa_for_unzipping = ch_aa_refs_for_singleref.branch { _meta, fasta ->
zipped: fasta.extension == 'gz'
unzipped: true
}

ch_aa_batches_for_unzipping = ch_aa_for_unzipping.zipped
.map { _meta, aa_fasta -> aa_fasta }
.collate(params.unzip_batch_size, true)
.map { batch -> [[id: params.dbname], batch] }

// Run the batch unzipping
UNPIGZ_AA(ch_aa_batches_for_unzipping)

// Mix back in the originally unzipped files
ch_prepped_aa_batches = UNPIGZ_AA.out.file_out.mix(ch_aa_for_unzipping.unzipped)

// Unbatch the unzipped files for rematching with metadata
ch_prepped_aa_fastas_gunzipped = ch_prepped_aa_batches
.flatMap { _meta, batch -> batch }
.map { fasta -> [fasta.getName(), fasta] }

// Match metadata back to the prepped DNA fastas with an inner join
ch_prepped_aa_fastas_ungrouped = ch_prepped_aa_fastas_gunzipped
.join(ch_aa_refs_for_rematching, failOnMismatch: true, failOnDuplicate: true)
.map { _fasta_name, fasta, meta -> [meta, fasta] }

ch_prepped_aa_fastas = ch_prepped_aa_fastas_ungrouped
.map { _meta, fasta -> [[id: params.dbname], fasta] }
.groupTuple()

ch_versions = ch_versions.mix(UNPIGZ_AA.out.versions.first())

if ([(params.build_malt && malt_build_mode == 'protein'), params.build_diamond].any()) {
FIND_CONCATENATE_AA(ch_prepped_aa_fastas)
ch_singleref_for_aa = FIND_CONCATENATE_AA.out.file_out
ch_versions = ch_versions.mix(FIND_CONCATENATE_AA.out.versions.first())
}

if ([params.build_kaiju].any()) {
SEQKIT_REPLACE(ch_prepped_aa_fastas_ungrouped)
ch_versions = ch_versions.mix(SEQKIT_REPLACE.out.versions.first())
ch_prepped_aa_fastas_kaiju = SEQKIT_REPLACE.out.fastx.map { _meta, fasta -> [[id: params.dbname], fasta] }.groupTuple()
FIND_CONCATENATE_AA_KAIJU(ch_prepped_aa_fastas_kaiju)
ch_versions = ch_versions.mix(FIND_CONCATENATE_AA_KAIJU.out.versions.first())
}
}
// def malt_build_mode = null
// if (params.build_malt) {
// malt_build_mode = params.malt_build_options.contains('--sequenceType Protein') ? 'protein' : 'nucleotide'
// }

// if ([(params.build_malt && malt_build_mode == 'nucleotide'), params.build_centrifuge, params.build_kraken2, params.build_bracken, params.build_krakenuniq, params.build_ganon].any()) {
// // Pull just DNA sequences

// ch_dna_refs_for_singleref = ch_samplesheet
// .map { meta, fasta_dna, _fasta_aa -> [meta, fasta_dna] }
// .filter { _meta, fasta_dna ->
// fasta_dna
// }

// // Make channel to preserve meta for decompress/compression
// ch_dna_refs_for_rematching = ch_samplesheet
// .filter { _meta, fasta_dna, _fasta_aa ->
// fasta_dna
// }
// .map { meta, fasta_dna, _fasta_aa ->
// [
// fasta_dna.getBaseName(fasta_dna.name.endsWith('.gz') ? 1 : 0),
// meta,
// ]
// }

// ch_aa_refs_for_rematching = ch_samplesheet
// .filter { _meta, _fasta_dna, fasta_aa ->
// fasta_aa
// }
// .map { meta, _fasta_dna, fasta_aa ->
// [
// fasta_aa.getBaseName(fasta_aa.name.endsWith('.gz') ? 1 : 0),
// meta,
// ]
// }

// // Separate files for zipping and unzipping
// ch_dna_for_unzipping = ch_dna_refs_for_singleref.branch { _meta, fasta ->
// zipped: fasta.extension == 'gz'
// unzipped: true
// }

// // Batch the zipped files for efficient unzipping of multiple files in a single process job
// ch_dna_batches_for_unzipping = ch_dna_for_unzipping.zipped
// .map { _meta, fasta -> fasta }
// .collate(params.unzip_batch_size, true)
// .map { batch -> [[id: params.dbname], batch] }

// // Run the batch unzipping
// UNPIGZ_DNA(ch_dna_batches_for_unzipping)
// ch_versions = ch_versions.mix(UNPIGZ_DNA.out.versions.first())

// // Mix back in the originally unzipped files
// ch_prepped_dna_batches = UNPIGZ_DNA.out.file_out.mix(ch_dna_for_unzipping.unzipped)

// // Unbatch the unzipped files for rematching with metadata
// ch_prepped_dna_fastas_gunzipped = ch_prepped_dna_batches
// .flatMap { _meta, batch -> batch }
// .map { fasta -> [fasta.getName(), fasta] }

// // Match metadata back to the prepped DNA fastas with an inner join
// ch_prepped_dna_fastas_ungrouped = ch_prepped_dna_fastas_gunzipped
// .join(ch_dna_refs_for_rematching, failOnMismatch: true, failOnDuplicate: true)
// .map { _fasta_name, fasta, meta -> [meta, fasta] }

// // Prepare for making the mega file
// ch_prepped_dna_fastas = ch_prepped_dna_fastas_ungrouped
// .map { _meta, fasta ->
// [[id: params.dbname], fasta]
// }
// .groupTuple()

// // Place in single mega file
// FIND_CONCATENATE_DNA(ch_prepped_dna_fastas)
// ch_versions = ch_versions.mix(FIND_CONCATENATE_DNA.out.versions)
// ch_singleref_for_dna = FIND_CONCATENATE_DNA.out.file_out
// }

// if ([(params.build_malt && malt_build_mode == 'protein'), params.build_kaiju, params.build_diamond].any()) {

// ch_aa_refs_for_singleref = ch_samplesheet
// .map { meta, _fasta_dna, fasta_aa -> [meta, fasta_aa] }
// .filter { _meta, fasta_aa ->
// fasta_aa
// }

// ch_aa_for_unzipping = ch_aa_refs_for_singleref.branch { _meta, fasta ->
// zipped: fasta.extension == 'gz'
// unzipped: true
// }

// ch_aa_batches_for_unzipping = ch_aa_for_unzipping.zipped
// .map { _meta, aa_fasta -> aa_fasta }
// .collate(params.unzip_batch_size, true)
// .map { batch -> [[id: params.dbname], batch] }

// // Run the batch unzipping
// UNPIGZ_AA(ch_aa_batches_for_unzipping)

// // Mix back in the originally unzipped files
// ch_prepped_aa_batches = UNPIGZ_AA.out.file_out.mix(ch_aa_for_unzipping.unzipped)

// // Unbatch the unzipped files for rematching with metadata
// ch_prepped_aa_fastas_gunzipped = ch_prepped_aa_batches
// .flatMap { _meta, batch -> batch }
// .map { fasta -> [fasta.getName(), fasta] }

// // Match metadata back to the prepped DNA fastas with an inner join
// ch_prepped_aa_fastas_ungrouped = ch_prepped_aa_fastas_gunzipped
// .join(ch_aa_refs_for_rematching, failOnMismatch: true, failOnDuplicate: true)
// .map { _fasta_name, fasta, meta -> [meta, fasta] }

// ch_prepped_aa_fastas = ch_prepped_aa_fastas_ungrouped
// .map { _meta, fasta -> [[id: params.dbname], fasta] }
// .groupTuple()

// ch_versions = ch_versions.mix(UNPIGZ_AA.out.versions.first())

// if ([(params.build_malt && malt_build_mode == 'protein'), params.build_diamond].any()) {
// FIND_CONCATENATE_AA(ch_prepped_aa_fastas)
// ch_singleref_for_aa = FIND_CONCATENATE_AA.out.file_out
// ch_versions = ch_versions.mix(FIND_CONCATENATE_AA.out.versions.first())
// }

// if ([params.build_kaiju].any()) {
// SEQKIT_REPLACE(ch_prepped_aa_fastas_ungrouped)
// ch_versions = ch_versions.mix(SEQKIT_REPLACE.out.versions.first())
// ch_prepped_aa_fastas_kaiju = SEQKIT_REPLACE.out.fastx.map { _meta, fasta -> [[id: params.dbname], fasta] }.groupTuple()
// FIND_CONCATENATE_AA_KAIJU(ch_prepped_aa_fastas_kaiju)
// ch_versions = ch_versions.mix(FIND_CONCATENATE_AA_KAIJU.out.versions.first())
// }
// }

PREPROCESSING(ch_samplesheet)


/*
@@ -200,7 +206,7 @@ workflow CREATETAXDB {
// Module: Run CENTRIFUGE/BUILD

if (params.build_centrifuge) {
CENTRIFUGE_BUILD(ch_singleref_for_dna, file_nucl2taxid, file_taxonomy_nodesdmp, file_taxonomy_namesdmp, [])
CENTRIFUGE_BUILD(PREPROCESSING.out.singleref_for_dna, file_nucl2taxid, file_taxonomy_nodesdmp, file_taxonomy_namesdmp, [])
ch_versions = ch_versions.mix(CENTRIFUGE_BUILD.out.versions.first())
ch_centrifuge_output = CENTRIFUGE_BUILD.out.cf
}
@@ -211,7 +217,7 @@ workflow CREATETAXDB {
// MODULE: Run DIAMOND/MAKEDB

if (params.build_diamond) {
DIAMOND_MAKEDB(ch_singleref_for_aa, file_prot2taxid, file_taxonomy_nodesdmp, file_taxonomy_namesdmp)
DIAMOND_MAKEDB(PREPROCESSING.out.singleref_for_aa, file_prot2taxid, file_taxonomy_nodesdmp, file_taxonomy_namesdmp)
ch_versions = ch_versions.mix(DIAMOND_MAKEDB.out.versions.first())
ch_diamond_output = DIAMOND_MAKEDB.out.db
}
@@ -221,7 +227,7 @@ workflow CREATETAXDB {

if (params.build_ganon) {

ch_ganon_input_tsv = ch_prepped_dna_fastas_ungrouped
ch_ganon_input_tsv = PREPROCESSING.out.ungrouped_dna
.map { meta, fasta ->
// I tried with .name() but it kept giving error of `Unknown method invocation `name` on XPath type... not sure why
def fasta_name = fasta.toString().split('/').last()
@@ -239,7 +245,7 @@ workflow CREATETAXDB {
// Nodes must come first
ch_ganon_tax_files = Channel.fromPath(file_taxonomy_nodesdmp).combine(Channel.fromPath(file_taxonomy_namesdmp))

GANON_BUILDCUSTOM(ch_prepped_dna_fastas, ch_ganon_input_tsv.map { _meta, tsv -> tsv }, ch_ganon_tax_files, [])
GANON_BUILDCUSTOM(PREPROCESSING.out.singleref_for_dna, ch_ganon_input_tsv.map { _meta, tsv -> tsv }, ch_ganon_tax_files, [])
ch_versions = ch_versions.mix(GANON_BUILDCUSTOM.out.versions.first())
ch_ganon_output = GANON_BUILDCUSTOM.out.db
}
@@ -250,7 +256,7 @@ workflow CREATETAXDB {
// MODULE: Run KAIJU/MKFMI

if (params.build_kaiju) {
KAIJU_MKFMI(FIND_CONCATENATE_AA_KAIJU.out.file_out, params.kaiju_keepintermediate)
KAIJU_MKFMI(PREPROCESSING.out.kaiju_aa, params.kaiju_keepintermediate)
ch_versions = ch_versions.mix(KAIJU_MKFMI.out.versions.first())
ch_kaiju_output = KAIJU_MKFMI.out.fmi
}
@@ -263,7 +269,7 @@ workflow CREATETAXDB {
// Condition is inverted because subworkflow asks if you want to 'clean' (true) or not, but pipeline says to 'keep'
if (params.build_kraken2 || params.build_bracken) {
def k2_keepintermediates = params.kraken2_keepintermediate || params.build_bracken ? false : true
FASTA_BUILD_ADD_KRAKEN2_BRACKEN(ch_singleref_for_dna, file_taxonomy_namesdmp, file_taxonomy_nodesdmp, file_accession2taxid, k2_keepintermediates, params.build_bracken)
FASTA_BUILD_ADD_KRAKEN2_BRACKEN(PREPROCESSING.out.singleref_for_dna, file_taxonomy_namesdmp, file_taxonomy_nodesdmp, file_accession2taxid, k2_keepintermediates, params.build_bracken)
ch_versions = ch_versions.mix(FASTA_BUILD_ADD_KRAKEN2_BRACKEN.out.versions.first())
ch_kraken2_bracken_output = FASTA_BUILD_ADD_KRAKEN2_BRACKEN.out.db
}
@@ -280,7 +286,7 @@ workflow CREATETAXDB {
.map { [it] }

Channel.of(file_nucl2taxid)
ch_input_for_krakenuniq = ch_prepped_dna_fastas.combine(ch_taxdmpfiles_for_krakenuniq).map { meta, fastas, taxdump -> [meta, fastas, taxdump, file_nucl2taxid] }
ch_input_for_krakenuniq = PREPROCESSING.grouped_dna_fastas.combine(ch_taxdmpfiles_for_krakenuniq).map { meta, fastas, taxdump -> [meta, fastas, taxdump, file_nucl2taxid] }

KRAKENUNIQ_BUILD(ch_input_for_krakenuniq, params.krakenuniq_keepintermediate)
ch_versions = ch_versions.mix(KRAKENUNIQ_BUILD.out.versions.first())
@@ -302,11 +308,11 @@ workflow CREATETAXDB {
ch_malt_mapdb = file(file_malt_mapdb)
}

if (malt_build_mode == 'protein') {
ch_input_for_malt = ch_prepped_aa_fastas.map { _meta, file -> file }
if (PREPROCESSING.out.malt_build_mode == 'protein') {
ch_input_for_malt = PREPROCESSING.out.grouped_aa_fastas.map { _meta, file -> file }
}
else {
ch_input_for_malt = ch_prepped_dna_fastas.map { _meta, file -> file }
ch_input_for_malt = PREPROCESSING.out.grouped_dna_fastas.map { _meta, file -> file }
}

MALT_BUILD(ch_input_for_malt, [], ch_malt_mapdb)