diff --git a/lib/processes/cluster_live.nf b/lib/processes/cluster_live.nf index e90e7ad..05db2db 100644 --- a/lib/processes/cluster_live.nf +++ b/lib/processes/cluster_live.nf @@ -1,12 +1,12 @@ def consensus_fasta="consensus.fasta" def vsearch_dir="vsearch_clusters" -process CLUSTER { +process CLUSTER_LIVE { publishDir "${params.output}/${sample}/clustering/${type}", pattern: "${consensus_fasta}", mode: 'copy' publishDir "${params.output}/${sample}/clustering/${type}", pattern: "cluster*", mode: 'copy' input: - tuple val( sample ), val( target ), path( detected_umis_fastq ) + tuple val( sample ), val( target ), path( detected_umis_fastq_dir ) val ( type ) output: tuple val( "${sample}" ), val( "${target}" ), path( "${consensus_fasta}" ), optional: true, emit:consensus_fasta @@ -22,7 +22,7 @@ process CLUSTER { --minseqlength ${params.min_length} \ --maxseqlength ${params.max_length} \ --threads ${params.threads} \ - --cluster_fast ${detected_umis_fastq} \ + --cluster_fast ${detected_umis_fastq_dir}/* \ --clusterout_sort \ --gapopen 0E/5I \ --gapext 0E/2I \ diff --git a/lib/workflows/umi-pipeline-live.nf b/lib/workflows/umi-pipeline-live.nf index 30004dd..dac4eee 100644 --- a/lib/workflows/umi-pipeline-live.nf +++ b/lib/workflows/umi-pipeline-live.nf @@ -38,7 +38,8 @@ include {MERGE_CONSENSUS_FASTQ} from '../processes/merge_consensus_fastq.nf' include {MAP_READS; MAP_READS as MAP_CONSENSUS; MAP_READS as MAP_FINAL_CONSENSUS} from '../processes/map_reads.nf' include {SPLIT_READS} from '../processes/split_reads.nf' include {DETECT_UMI_FASTQ; DETECT_UMI_FASTQ as DETECT_UMI_CONSENSUS_FASTQ} from '../processes/detect_umi_fastq.nf' -include {CLUSTER; CLUSTER as CLUSTER_CONSENSUS} from '../processes/cluster.nf' +include {CLUSTER as CLUSTER_CONSENSUS} from '../processes/cluster.nf' +include {CLUSTER_LIVE} from '../processes/cluster_live.nf' include {REFORMAT_FILTER_CLUSTER} from '../processes/reformat_filter_cluster.nf' include {POLISH_CLUSTER} from '../processes/polish_cluster.nf' include {FILTER_CONSENSUS_FASTQ} from '../processes/filter_consensus_fastq.nf' @@ -84,11 +85,13 @@ workflow UMI_PIPELINE_LIVE { DETECT_UMI_FASTQ( splt_reads_filtered, raw, umi_extract ) channel - .watchPath( "${params.output}/*/${params.output_format}_umi/**" ) - .subscribe{ fastq -> println "Fastq file:_$fastq" } + .watchPath( "${params.output}/*/${params.output_format}_umi/raw/*fastq" ) + .map{ fastq -> tuple(fastq.parent.parent.parent.name, "target", fastq.parent) } + .set{ cluster_ch } + cluster_ch.view() - //CLUSTER( cluster_ch, raw ) + CLUSTER_LIVE( cluster_ch, raw ) /* REFORMAT_FILTER_CLUSTER( CLUSTER.out.cluster_fastas, raw, umi_parse_clusters ) diff --git a/lib/workflows/umi-pipeline.nf b/lib/workflows/umi-pipeline.nf index 56079e8..176c58b 100644 --- a/lib/workflows/umi-pipeline.nf +++ b/lib/workflows/umi-pipeline.nf @@ -54,15 +54,11 @@ workflow UMI_PIPELINE { main: COPY_BED( bed ) - println "Here" - channel .fromPath("${params.input}/*", type: 'dir') .filter( ~/.*barcode(([0-9][0-9]))/ ) .map{barcode_path -> tuple(barcode_path.name, barcode_path)} .set { fastq_files_ch } - - fastq_files_ch.view() if( params.subsampling ){ MERGE_FASTQ( fastq_files_ch )