From ec29133e5de16929beff1743c2165307cd522a25 Mon Sep 17 00:00:00 2001 From: gayaldassanayake Date: Wed, 17 Nov 2021 17:25:45 +0530 Subject: [PATCH] Add circularity feature generation --- .gitignore | 1 + constants.py | 22 +++-- helpers.py | 23 +---- pipeline.py | 16 ++++ preprocess/circular.py | 179 +++++++++++++++++++++++++++++++++++++++ preprocess/fragment.py | 3 +- preprocess/kmer.py | 6 +- preprocess/preprocess.py | 22 ++++- progress_bar.py | 8 +- 9 files changed, 247 insertions(+), 33 deletions(-) create mode 100644 preprocess/circular.py diff --git a/.gitignore b/.gitignore index ae1edee..77d6147 100644 --- a/.gitignore +++ b/.gitignore @@ -132,3 +132,4 @@ dmypy.json *results* error.txt log.txt +/temp diff --git a/constants.py b/constants.py index e301357..23734bf 100644 --- a/constants.py +++ b/constants.py @@ -6,17 +6,22 @@ frag_len = 5000 k = 7 max_frag = 10**6 +DIRECTORY_INPUT = 0 +FILE_INPUT = 1 # progress bar descriptions -frag_bar_desc = "fragments " -kmer_bar_desc = "kmers " +frag_bar_desc = "fragments " +kmer_bar_desc = "kmers " +circ_bar_desc = "circularity " # Databases db_path = "./biomarker_dbs" -cmscan_path = "cmscan" -blastn_path = "blastn" -seq2vec_path = "seq2vec" -nucmer_path = "nucmer" + +# biomarker tools +default_cmscan_path = "cmscan" +default_blastn_path = "blastn" +default_seq2vec_path = "seq2vec" +default_nucmer_path = "nucmer" # log files err_file = "error.txt" @@ -25,6 +30,11 @@ # results paths frag_write_path = "results/fragments" kmer_write_path = "results/kmers" +circ_write_path = "results/circular" + +# temp paths +circ_out_path = "temp/nucmer_out" +circ_split_path = "temp/split" # # seq target csv files # seq_plas_target_path = "results/plasmid/seq_target.csv" diff --git a/helpers.py b/helpers.py index 31c81e6..eabf55d 100644 --- a/helpers.py +++ b/helpers.py @@ -40,25 +40,10 @@ def create_fragment_dirs(): def create_kmer_dirs(): create_dir_if_needed(kmer_write_path) -# def create_sequence_dirs(): -# # sequences -# create_dir_if_needed(seq_plas_write_path) -# create_dir_if_needed(seq_chrom_write_path) -# create_dir_if_needed(seq_extra_plasmid_write_path) - -# def create_circ_dirs(): -# # circularity files -# create_dir_if_needed(plas_circ_write_path) -# create_dir_if_needed(chrom_circ_write_path) -# create_dir_if_needed(ex_plas_circ_write_path) - -# create_dir_if_needed(plas_circ_out_path) -# create_dir_if_needed(chrom_circ_out_path) -# create_dir_if_needed(ex_plas_circ_out_path) - -# create_dir_if_needed(plas_frag_split_path) -# create_dir_if_needed(chrom_frag_split_path) -# create_dir_if_needed(ex_plas_frag_split_path) +def create_circ_dirs(): + create_dir_if_needed(circ_write_path) + create_dir_if_needed(circ_out_path) + create_dir_if_needed(circ_split_path) # def create_inc_factor_dirs(): # # inc factor files diff --git a/pipeline.py b/pipeline.py index 8876809..2ef18a9 100644 --- a/pipeline.py +++ b/pipeline.py @@ -29,6 +29,22 @@ def parse_user_arguements(): '-t','--threads', help='number of threads', required=False, type=int, default=default_threads ) + parser.add_argument( + '--cmscan', help='path to cmscan', + required=False, type=str, default=default_cmscan_path + ) + parser.add_argument( + '--blastn', help='path to blastn', + required=False, type=str, default=default_blastn_path + ) + parser.add_argument( + '--seq2vec', help='path to seq2vec', + required=False, type=str, default=default_seq2vec_path + ) + parser.add_argument( + '--nucmer', help='path to nucmer', + required=False, type=str, default=default_nucmer_path + ) return parser.parse_args() diff --git a/preprocess/circular.py b/preprocess/circular.py new file mode 100644 index 0000000..9d844a1 --- /dev/null +++ b/preprocess/circular.py @@ -0,0 +1,179 @@ +import subprocess as sp +import concurrent.futures +from Bio import SeqIO +import os + +from progress_bar import * +from helpers import print_error, print_log +from constants import * +from helpers import create_circ_dirs, find_mean, delete_dir_if_exist + +def circ_helper(nucmer_path, record): + ''' Generates the circularity features for single metagenomic record using nucmer + ''' + record_map = {} + id = record.id + try: + # break the string into 2 parts and compare + seq_mid = int(len(record.seq)/2) + seq_a = str(record.seq[:seq_mid]) + seq_b = str(record.seq[seq_mid:]) + + seq_a_file = os.path.join(circ_split_path, f'{id}_a.fasta') + seq_b_file = os.path.join(circ_split_path, f'{id}_b.fasta') + with open(seq_a_file, mode='w+') as fout: + fout.write(f'>{id}_a\n') + fout.write(seq_a + '\n') + + with open(seq_b_file, mode='w+') as fout: + fout.write(f'>{id}_b\n') + fout.write(seq_b + '\n') + + out_file = os.path.join(circ_out_path, f"{id}_temp") + + cmd = [ + nucmer_path, + '-f', # only forward strand + '-l', '40', # increase min match length to 40 bp + '-p', out_file, + seq_a_file, + seq_b_file + ] + proc = sp.run( + cmd, + stdout=sp.PIPE, + stderr=sp.PIPE, + universal_newlines=True, + ) + alignment_a_sum, alignment_b_sum, mismatches_sum, count = 0, 0, 0, 0 + if(proc.returncode == 0): + has_match = False + with open(f"{out_file}.delta") as fout: + for line in fout: + line = line.rstrip() + if(line[0] == '>'): + has_match = True + elif(has_match): + cols = line.split(' ') + if(len(cols) == 7): + start_b = int(cols[0]) + end_b = int(cols[1]) + start_a = int(cols[2]) + end_a = int(cols[3]) + mismatches = int(cols[4]) + alignment_a = end_a - start_a + 1 + alignment_b = end_b - start_b + 1 + mismatches_sum += mismatches + alignment_a_sum += alignment_a + alignment_b_sum += alignment_b + count+=1 + + alignment_a_mean = find_mean(alignment_a_sum, count) + alignment_b_mean = find_mean(alignment_b_sum, count) + mismatches_mean = find_mean(mismatches_sum, count) + + record_map['circ_alignment_a'] = alignment_a_mean + record_map['circ_alignment_b'] = alignment_b_mean + record_map['circ_mismatches'] = mismatches_mean + record_map['circ_count'] = count + + delete_dir_if_exist(seq_a_file) + delete_dir_if_exist(seq_b_file) + delete_dir_if_exist(f"{out_file}.delta") + delete_dir_if_exist(f"{out_file}.ntref") + delete_dir_if_exist(f"{out_file}.mgaps") + + except Exception as err: + print_error(f"Error computing circularity for record {record}: {err}") + + return id, record_map + +def circ_directory_worker(nucmer_path, filename): + ''' worker for input in directory format with large number of files + ''' + sub_seq_map = {} + record_count = 0 + try: + for record in SeqIO.parse(filename, 'fasta'): + record_count+=1 + id, record_map = circ_helper(nucmer_path, record) + sub_seq_map[id] = record_map + except Exception as err: + print_error(f"Error reading file {filename}: {err}") + return record_count, sub_seq_map + +def circ_file_worker(nucmer_path, filename, rid): + ''' worker for input in file format with one or few files with large number of records in each + ''' + this_rec = None + record_map = None + try: + for record in SeqIO.parse(filename, 'fasta'): + if(rid == record.id): + this_rec = record + break + id, record_map = circ_helper(nucmer_path, this_rec) + + except Exception as err: + print_error(f"Error reading file {filename}: {err}") + return rid, record_map + +def merge_maps(seq_map, sub_maps): + ''' Merges the sequence map with the result maps from different threads + ''' + for map in sub_maps: + for k, v in map.items(): + seq_map[k].update(v) + +def calc_circularity(nucmer_path, threads, files, seq_map, input_mode): + ''' Generates the circularity features for each sequence using nucmer + ''' + print_log("Generating circularity features...") + + create_circ_dirs() + progress_bar = create_visual_progress_bar(len(seq_map), circ_bar_desc) + executor = concurrent.futures.ProcessPoolExecutor(threads) + + if(input_mode == DIRECTORY_INPUT): + # -d type input + # Assumes a directory of larger number of file + # with smaller number of records in each file + futures = [ + executor.submit( + circ_directory_worker, nucmer_path, filename) + for filename in files + ] + sub_maps = [] + for f in concurrent.futures.as_completed(futures): + record_count, sub_map = f.result() + update_progress_bar(progress_bar, record_count) + sub_maps.append(sub_map) + executor.shutdown(wait=True) + merge_maps(seq_map, sub_maps) + + + elif(input_mode == FILE_INPUT): + # -r type input + # Assumes one or few files of input + # with larger number of records in each file + for filename in files: + try: + records = [] + for record in SeqIO.parse(filename, 'fasta'): + records.append(record.id) + futures = [ + executor.submit( + circ_file_worker, nucmer_path, filename, rid) + for rid in records + ] + for f in concurrent.futures.as_completed(futures): + id, record_map = f.result() + seq_map[id].update(record_map) + update_progress_bar(progress_bar, 1) + executor.shutdown(wait=True) + + except Exception as err: + print_error(f"Error reading file {filename}: {err}") + + close_progress_bar(progress_bar) + print_log("Generating circularity features completed.") diff --git a/preprocess/fragment.py b/preprocess/fragment.py index a41c712..3de6a5f 100644 --- a/preprocess/fragment.py +++ b/preprocess/fragment.py @@ -1,5 +1,6 @@ from random import randint from Bio import SeqIO +import os from progress_bar import * from constants import * @@ -35,7 +36,7 @@ def write_frags(seq_id, seq_frags): ''' Write the generated fragments into files ''' try: - with open(f'{frag_write_path}/{seq_id}.fasta', 'w+') as fout: + with open(os.path.join(frag_write_path, f'{seq_id}.fasta'), 'w+') as fout: for frag in seq_frags: fout.write(f'>{frag["n"]} {frag["id"]}\n{frag["seq"]}\n') except Exception as err: diff --git a/preprocess/kmer.py b/preprocess/kmer.py index d3ba316..acec981 100644 --- a/preprocess/kmer.py +++ b/preprocess/kmer.py @@ -5,15 +5,15 @@ from constants import * import os -def count_kmers(threads, frag_count): +def count_kmers(seq2vec_path, threads, frag_count): ''' Generates the kmer counts for each fragment using seq2vec ''' try: print_log("Generating kmer counts...") create_kmer_dirs() - progress_bar = create_progress_bar(kmer_bar_desc) + progress_bar = create_visual_progress_bar(sum(frag_count.values()), kmer_bar_desc) for filename in os.listdir(frag_write_path): - name = os.path.splitext(filename)[0] + name = os.path.splitext(os.path.basename(filename))[0] frag_file = os.path.join(frag_write_path, filename) write_file = os.path.join(kmer_write_path, name) args = [ diff --git a/preprocess/preprocess.py b/preprocess/preprocess.py index 2358a46..f927203 100644 --- a/preprocess/preprocess.py +++ b/preprocess/preprocess.py @@ -1,10 +1,22 @@ import os +from Bio import SeqIO from preprocess.fragment import fragment from preprocess.kmer import count_kmers +from preprocess.circular import calc_circularity from constants import * from helpers import print_error +def generate_seq_map(files): + seq_map = {} + for file in files: + try: + for record in SeqIO.parse(file, 'fasta'): + seq_map[record.id] = {'filepath':file} + except Exception as err: + print_error(f"Error reading fasta file {file}: {err}") + return seq_map + def preprocess(args): ''' Preprocess sequences t0 generate kmer, biomarker features ''' @@ -24,6 +36,14 @@ def preprocess(args): except Exception as err: print_error(f"Error listing input directory files: {err}\n") return -1 + + #kmer count frag_count = fragment(files, args.coverage) - count_kmers(args.threads, frag_count) + count_kmers(args.seq2vec, args.threads, frag_count) + + #biomarkers + seq_map = generate_seq_map(files) + input_mode = DIRECTORY_INPUT if args.directory != None else FILE_INPUT + calc_circularity(args.nucmer, args.threads, files, seq_map, input_mode) + return 0 diff --git a/progress_bar.py b/progress_bar.py index 16065f1..2c7a5d1 100644 --- a/progress_bar.py +++ b/progress_bar.py @@ -2,13 +2,15 @@ from tqdm import tqdm def create_progress_bar(desc): - ''' Create a progress bar with the iteration count and without the moving + ''' Create a progress bar with the iteration count ''' return tqdm(desc = desc) -# def create_progress_bar(end, desc): -# return tqdm(range(end), desc = desc) +def create_visual_progress_bar(length, desc): + ''' Create a progress bar with the moving needle + ''' + return tqdm(range(length), desc = desc) def update_progress_bar(pbar,i): pbar.update(i)