-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProbe_seek_1.10
333 lines (266 loc) · 12.9 KB
/
Probe_seek_1.10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
"""
title: probe_seek (1.10)
author: Alex Chalk
date: Nov 22nd, 2024
description: A script for generating custom DNA probes for strategies like probe hybridisation capture
"""
import os
import sys
import matplotlib.pyplot as plt
from Bio.Blast.Applications import NcbiblastnCommandline
from collections import Counter
from Bio.SeqUtils import MeltingTemp as mt
from Bio.Seq import Seq
import time
# ---- Global Parameters ----
# Input sequence
target_sequence = "paste_sequence_here"
db_path = "/Users/.../hg38" # Path to BLAST database (adjust for your directory)
evalue = 1e-5
word_size = 9
dust = "no"
num_threads = 5 #adjust for your operating system
chunk_size = 28
overlap = 14
output_dir = "/Users/.../results" # Path to BLAST database (adjust for your directory)
os.makedirs(output_dir, exist_ok=True)
# Probe design parameters
tm_range = (78, 88)
max_homopolymer_length = 8
probe_length_default = 100
probe_length_min = 90
probe_length_max = 120
probe_spacing = 1
max_3 = 10 #maximum number of non-unique bases allowed in your probe
# ---- Utility Functions ----
def split_sequence(sequence, chunk_size, overlap):
"""Breaks your target sequence into custom sized chunks with overlap between chunks"""
chunks = []
for i in range(0, len(sequence), chunk_size - overlap):
chunk = sequence[i:i + chunk_size]
if len(chunk) < chunk_size:
break
chunks.append((chunk, i))
return chunks
def save_chunks_to_file(chunks, output_file):
with open(output_file, "w") as f:
for idx, (chunk, start_pos) in enumerate(chunks):
f.write(f">chunk_{idx}_pos_{start_pos}\n{chunk}\n")
def run_blast_local(query_file, results_file):
"""Uses blastn parameters to align each of your chunks against a reference genome"""
blastn_cline = NcbiblastnCommandline(
cmd="blastn",
query=query_file,
db=db_path,
evalue=evalue,
word_size=word_size,
dust=dust,
out=results_file,
outfmt=6,
num_threads=num_threads,
)
stdout, stderr = blastn_cline()
def parse_blast_results_by_chunk(results_file, chunk_labels):
"""Counts number of times each chunk is found throughout reference genome"""
chunk_hit_counts = Counter()
with open(results_file, "r") as file:
for line in file:
chunk_label = line.split("\t")[0].strip()
chunk_hit_counts[chunk_label] += 1
uniqueness_map = []
for chunk_label in chunk_labels:
count = chunk_hit_counts.get(chunk_label, 0)
if count == 0:
#sequence not found anywhere = 0
uniqueness_map.append(0)
elif count == 1:
#sequence found at one site in genome = 1
uniqueness_map.append(1)
elif count <= 3:
#sequence found 2-3 times in genome = 2
uniqueness_map.append(2)
else:
#sequence found more than 3 times across genome = 3
uniqueness_map.append(3)
return uniqueness_map
def calculate_gc_content(seq):
return (sum(1 for base in seq if base in "GCgc") / len(seq)) * 100
def has_homopolymers(seq, max_len):
for base in set(seq):
if base * max_len in seq:
return True
return False
def generate_uniqueness_string(target_sequence, uniqueness_map, chunk_size, overlap):
"""Generate a uniqueness string of the same length as target_sequence."""
uniqueness_string = [None] * len(target_sequence)
step = chunk_size - overlap
for chunk_idx, score in enumerate(uniqueness_map):
start = chunk_idx * step
end = min(start + chunk_size, len(target_sequence))
for pos in range(start, end):
if uniqueness_string[pos] is None:
uniqueness_string[pos] = score
else:
# Take the maximum score for overlapping positions
uniqueness_string[pos] = max(uniqueness_string[pos], score)
# Fill in any None values (should only happen at the very end)
for i in range(len(uniqueness_string)):
if uniqueness_string[i] is None:
uniqueness_string[i] = uniqueness_map[-1]
return uniqueness_string
def satisfies_uniqueness(seq_start, seq_end, orientation, uniqueness_string):
"""
Check if the probe sequence satisfies uniqueness constraints.
Less than 1/4 of probe can contain regions found at 2-3 sites across genome, and the last 30 bp from 3' end must be unique.
"""
relevant_region = uniqueness_string[seq_start:seq_end]
# Reject sequences with any 0's or 3's in the uniqueness region
if any(u in [0] for u in relevant_region):
return False
count_1 = sum(1 for u in relevant_region if u == 1)
count_2 = sum(1 for u in relevant_region if u == 2)
count_3 = sum(1 for u in relevant_region if u == 3)
# Orientation-specific checks.
if orientation == "+":
#Ensures that no 2's or 3' are in the 30 bp 3' end of sequence, that less than 1/4 of the sequence is 2's and that the number of 3's is less than max_3
return relevant_region[-30:].count(2) == 0 and relevant_region[-30:].count(3) == 0 and count_1 >= 30 and count_2 < len(relevant_region)/4 and count_3 < max_3
elif orientation == "-":
return relevant_region[:30].count(2) == 0 and relevant_region[-30:].count(3) == 0 and count_1 >= 30 and count_2 < len(relevant_region)/4 and count_3 < max_3
return False
def adjust_probe_length(probe_start, probe_end, orientation, target_sequence, uniqueness_string):
"""Adjust probe length to satisfy Tm while maintaining uniqueness and GC constraints."""
previous_probe_start = None
previous_probe_end = None
while True:
# Check if probe length is within bounds
probe_length = probe_end - probe_start
if probe_length < probe_length_min or probe_length > probe_length_max:
print(f"Breaking: Probe length out of bounds ({probe_length}).")
break
probe_seq = target_sequence[probe_start:probe_end]
tm = mt.Tm_NN(Seq(probe_seq), nn_table=mt.DNA_NN4, Na=50, Mg=1.5)
# Check if Tm, holopolymer, GC content, and uniqueness constraints are satisfied for a given sequence
if tm_range[0] <= tm <= tm_range[1]:
if has_homopolymers(probe_seq, max_homopolymer_length):
print(f"Rejected at ({probe_start},{probe_end}) due to homopolymers")
return None, None
gc_content = calculate_gc_content(probe_seq)
if not (35 < gc_content < 65): #Modify these number to change GC range
print(f"Rejected at ({probe_start},{probe_end}) do to GC content of {gc_content:.2f}")
return None, None
if satisfies_uniqueness(probe_start, probe_end, orientation, uniqueness_string):
return probe_start, probe_end
else:
print(f"Sequence at ({probe_start},{probe_end}) was not unique enough")
return None, None
# Adjust probe length
else:
print(f"Adjusting: Start={probe_start}, End={probe_end}, Tm={tm:.2f}")
if tm < tm_range[0]:
if orientation == "+":
probe_start = max(0, probe_start - 1)
elif orientation == "-":
probe_end = min(len(target_sequence), probe_end + 1)
elif tm > tm_range[1]:
if orientation == "+":
probe_start = min(probe_end - probe_length_min, probe_start + 1)
elif orientation == "-":
probe_end = max(probe_start + probe_length_min, probe_end - 1)
# Check for lack of progress
if (probe_start == previous_probe_start) and (probe_end == previous_probe_end):
break
previous_probe_start, previous_probe_end = probe_start, probe_end
print("Failed to adjust to valid Tm.")
return None, None
def design_probes(target_sequence, uniqueness_string):
"""Takes all the desired conditions and scans scross target sequence generating candidate probes"""
probes = []
probe_start = 0
probe_end = probe_start + probe_length_default
orientation = "+"
while probe_end < len(target_sequence):
# Starts with candidate probe of the default length. It will adjust this sequence length to match ideal Tm and sequence conditions, then check if the sequence is unique
adjusted_start, adjusted_end = adjust_probe_length(probe_start, probe_end, orientation, target_sequence, uniqueness_string)
# If, after adjusting probe length, a viable probe is found, then information about that probe is stored in a list
if adjusted_start is not None:
probe_seq = target_sequence[adjusted_start:adjusted_end]
# Extract uniqueness scores for the probe
probe_uniqueness = uniqueness_string[adjusted_start:adjusted_end]
# Relevant probe information for export
probe = {
"start": adjusted_start,
"end": adjusted_end,
"orientation": orientation,
"length": len(probe_seq),
"sequence": probe_seq,
"tm": f"{mt.Tm_NN(Seq(probe_seq), nn_table=mt.DNA_NN4, Na=50, Mg=1.5):.2f}",
"gc_content": f"{calculate_gc_content(probe_seq):.2f}",
"uniqueness": probe_uniqueness,
}
probes.append(probe)
print(f"Probe found!: {probe}")
probe_start = probe_end + probe_spacing
probe_end = probe_start + probe_length_default
orientation = "-" if orientation == "+" else "+"
else:
probe_start += 1
probe_end += 1
print("Reached end of sequence, no more candidate probe sites to check")
return probes
def visualize_sequence_uniqueness_by_chunk(chunk_labels, uniqueness_map, output_file, probes):
"""Create a graphical representation of your target sequence with overlayed probes"""
value_to_color = {0: "purple", 1: "green", 2: "orange", 3: "red"}
base_positions = [i * (chunk_size - overlap) for i in range(len(chunk_labels))]
colors = [value_to_color[val] for val in uniqueness_map]
plt.figure(figsize=(12, 3))
plt.bar(base_positions, [1] * len(chunk_labels), color=colors, edgecolor="none", width=(chunk_size - overlap))
plt.xlabel("Base Pair Position")
plt.ylabel("Uniqueness")
plt.title("Probe_seek ")
# Overlay probes as line segments
for idx, probe in enumerate(probes):
color = "black" if probe["orientation"] == "+" else "darkblue"
y_offset = 1.05 + (idx % 2) * 0.05 # Staggering for visibility
plt.hlines(y=y_offset, xmin=probe["start"], xmax=probe["end"], colors=color, linewidth=2)
# Optionally label probes with their index or properties
plt.text((probe["start"] + probe["end"]) / 2, y_offset + 0.02,
f"{idx + 1}", color=color, fontsize=8, ha="center")
plt.tight_layout()
plt.savefig(output_file)
plt.close()
# ---- Main Script ----
if __name__ == "__main__":
start_time = time.time()
# Step 1: Split sequence and create chunks
print("Processing sequence...")
chunks = split_sequence(target_sequence, chunk_size, overlap)
chunk_labels = [f"chunk_{idx}_pos_{start}" for idx, (_, start) in enumerate(chunks)]
base_positions = [start for _, start in chunks]
chunks_file = os.path.join(output_dir, "chunks.txt")
save_chunks_to_file(chunks, chunks_file)
results_file = os.path.join(output_dir, "blast_results.txt")
run_blast_local(chunks_file, results_file)
uniqueness_map = parse_blast_results_by_chunk(results_file, chunk_labels)
#Step 2: Generate the uniqueness string
print("Generating uniqueness string...")
uniqueness_string = generate_uniqueness_string(target_sequence, uniqueness_map, chunk_size, overlap)
# Step 3: Design probes
print("Designing probes...")
sys.stdout = open('/Users/.../results/output.txt', 'w')
probes = design_probes(target_sequence, uniqueness_string)
with open(os.path.join(output_dir, "designed_probes.txt"), "w") as f:
for probe in probes:
probe_seq = probe["sequence"]
if probe["orientation"] == "-":
probe["sequence"] = str(Seq(probe["sequence"]).reverse_complement())
f.write(f"{probe}\n")
# Step 4: Export results
sys.stdout.close()
sys.stdout = sys.__stdout__
print(f"Designed {len(probes)} probes. Results saved.")
chunk_uniqueness_plot = os.path.join(output_dir, "chunk_uniqueness.png")
visualize_sequence_uniqueness_by_chunk(chunk_labels, uniqueness_map, chunk_uniqueness_plot, probes)
end_time = time.time()
print(f"Total time: {(end_time - start_time) / 60:.2f} minutes.")
print("Make sure to validate these probes with trusted sources (Blat, IDT, NEB Tm calulator, etc).")
# Reset to normal