-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.nf
62 lines (47 loc) · 3.26 KB
/
main.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env nextflow
nextflow.enable.dsl = 2
include { hash_files as hash_fastq_input } from './modules/hash_files.nf'
include { hash_files as hash_fastq_output } from './modules/hash_files.nf'
include { fastp as fastp_input } from './modules/downsample_reads.nf'
include { downsample } from './modules/downsample_reads.nf'
include { fastp as fastp_output } from './modules/downsample_reads.nf'
include { pipeline_provenance } from './modules/provenance.nf'
include { collect_provenance } from './modules/provenance.nf'
workflow {
ch_workflow_metadata = Channel.value([
workflow.sessionId,
workflow.runName,
workflow.manifest.name,
workflow.manifest.version,
workflow.start,
])
if (params.samplesheet_input != 'NO_FILE') {
ch_fastq = Channel.fromPath(params.samplesheet_input).splitCsv(header: true).map{ it -> [it['ID'], [it['R1'], it['R2']]] }
ch_coverages = Channel.fromPath(params.samplesheet_input).splitCsv(header: true).map{ it -> [it['ID'], it['COVERAGE'], it['GENOME_SIZE']] }
} else {
ch_fastq = Channel.fromFilePairs(params.fastq_search_path, flat: true).map{ it -> [it[0].split('_')[0], [it[1], it[2]]] }.unique{ it -> it[0] }
ch_coverages = ch_fastq.map{ it -> [it[0], params.coverage, params.genome_size] }
}
main:
hash_fastq_input(ch_fastq.join(ch_coverages).map({ it -> [it[0], it[2], it[1]] }).combine(Channel.of("fastq-input")))
ch_fastp_input = ch_fastq.join(ch_coverages.map({ it -> [it[0], it[2]] }))
fastp_input(ch_fastp_input.combine(Channel.of("original")))
downsample(ch_fastq.join(ch_coverages))
hash_fastq_output(downsample.out.reads.map{ it -> [it[0], it[3], it[1]] }.combine(Channel.of("fastq-output")))
fastp_output(downsample.out.reads)
fastp_input.out.csv.concat(fastp_output.out.csv).map{ it -> it[1] }.collectFile(name: params.collected_outputs_prefix + "_downsampling_summary.csv", storeDir: params.outdir, keepHeader: true, skip: 1, sort: { it -> it[0] })
// Collect Provenance
// The basic idea is to build up a channel with the following structure:
// [sample_id, coverage, [provenance_file_1.yml, provenance_file_2.yml, provenance_file_3.yml...]]
// At each step, we add another provenance file to the list using the << operator...
// ...and then concatenate them all together in the 'collect_provenance' process.
ch_sample_ids_with_coverages = ch_fastq.map({ it -> it[0] }).join(ch_coverages.map({ it -> [it[0], it[1]] }))
ch_provenance = ch_sample_ids_with_coverages
ch_pipeline_provenance = pipeline_provenance(ch_workflow_metadata)
ch_provenance = ch_provenance.combine(ch_pipeline_provenance).map({ it -> [it[0], it[1], [it[2]]] })
ch_provenance = ch_provenance.join(hash_fastq_input.out.provenance, by: [0, 1]).map{ it -> [it[0], it[1], it[2] << it[3]] }
ch_provenance = ch_provenance.join(fastp_input.out.provenance).map{ it -> [it[0], it[1], it[2] << it[4]] }
ch_provenance = ch_provenance.join(downsample.out.provenance, by: [0, 1]).map{ it -> [it[0], it[1], it[2] << it[3]] }
ch_provenance = ch_provenance.join(hash_fastq_output.out.provenance, by: [0, 1]).map{ it -> [it[0], it[1], it[2] << it[3]] }
collect_provenance(ch_provenance.map{ it -> [it[0], it[1], it[2].minus(null)] })
}