Skip to content

Commit

Permalink
Update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jfnavarro committed Jan 8, 2025
1 parent 1bef5bd commit 7c4e98b
Show file tree
Hide file tree
Showing 14 changed files with 653 additions and 800,462 deletions.
8 changes: 4 additions & 4 deletions scripts/merge_fastq.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def run_command(command: List[str], out: Union[int, IO[bytes]] = subprocess.PIPE
Exception: If an error occurs during command execution.
"""
try:
print("Running command: {}".format(" ".join(x for x in command).rstrip()))
print(f"Running command: {" ".join(x for x in command).rstrip()}")
proc = subprocess.Popen(
command,
stdout=out,
Expand All @@ -53,7 +53,7 @@ def main(run_path: str, indexes: List[str], out_path: str) -> int:
try:
run_command(["gunzip", "-f", file])
except Exception as e:
print("Error, gunziping FASTQ file {}, {}".format(file, str(e)))
print(f"Error, gunziping FASTQ file {file}, {str(e)}")
return 1

# Second merge the FASTQ files
Expand All @@ -66,15 +66,15 @@ def main(run_path: str, indexes: List[str], out_path: str) -> int:
with open("{}_R2.fastq".format(index), "w") as file2:
run_command(["cat"] + r2_files, out=file2)
except Exception as e:
print("Error, merging FASTQ files, {}".format(str(e)))
print(f"Error, merging FASTQ files, {str(e)}")
return 1

# Third gzip everything again
for file in glob.glob("*.fastq"):
try:
run_command(["gzip", "-f", file])
except Exception as e:
print("Error, gziping FASTQ file {}, {}".format(file, str(e)))
print(f"Error, gziping FASTQ file {file}, {str(e)}")
return 1

# Move merged FASTQ files to output path
Expand Down
4 changes: 2 additions & 2 deletions scripts/multi_qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ def main(counts_table_files: List[str], outdir: str, use_log: bool) -> int:
outdir = os.getcwd()
outdir = os.path.abspath(outdir)

print("Output directory {}".format(outdir))
print("Input datasets {}".format(" ".join(counts_table_files)))
print(f"Output directory {outdir}")
print(f"Input datasets {" ".join(counts_table_files)}")

# Parse datasets and sort them by column
datasets = [(pd.read_table(x,
Expand Down
39 changes: 29 additions & 10 deletions stpipeline/common/fastq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,36 @@ def trim_quality(
return None, None


def check_umi_template(umi, template):
def check_umi_template(umi: str, template: str) -> bool:
"""
Checks that the UMI (molecular barcode) given as input complies
with the pattern given in template.
Returns True if the UMI complies
:param umi: a molecular barcode
:param template: a reg-based template with the same
distance of the UMI that should tell how the UMI should be formed
:type umi: str
:type template: str
:return: True if the given molecular barcode fits the pattern given
Validates that a UMI (molecular barcode) matches a given template pattern.
Args:
umi: Molecular barcode to validate.
template: Regular expression template describing the expected format of the UMI.
Returns:
True if the UMI matches the template, False otherwise.
"""
p = re.compile(template)
return p.match(umi) is not None

def has_sufficient_content(sequence: str, chars_to_count: str, threshold: float) -> bool:
"""
Checks if the content of specified characters in a sequence meets or exceeds a given threshold.
Args:
sequence: The sequence to evaluate.
chars_to_count: The characters to count (e.g., "AT").
threshold: The content threshold as a percentage (0-100).
Returns:
True if the content of specified characters is greater than or equal to the threshold, False otherwise.
"""
if len(sequence) == 0:
raise ValueError("The sequence cannot be empty.")
if not chars_to_count:
raise ValueError("chars_to_count must not be empty.")
count = sum(sequence.count(char) for char in chars_to_count)
content_percentage = (count / len(sequence)) * 100
return content_percentage >= threshold
43 changes: 20 additions & 23 deletions stpipeline/common/sam_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
import math
import pysam
from typing import List, Dict, Union
from typing import List


def split_bam(input_bam: str, temp_dir: str, threads: int) -> List[str]:
Expand All @@ -19,10 +19,10 @@ def split_bam(input_bam: str, temp_dir: str, threads: int) -> List[str]:
threads: Number of CPU cores to use for splitting.
Returns:
List[str]: List of paths to the split BAM files.
List of paths to the split BAM files.
"""
pysam.index(input_bam, os.path.join(temp_dir, f'{input_bam}.bai'))
input_bamfile = pysam.AlignmentFile(input_bam, mode='rb')
pysam.index(input_bam, os.path.join(temp_dir, f"{input_bam}.bai"))
input_bamfile = pysam.AlignmentFile(input_bam, mode="rb")
assert input_bamfile.check_index()

output_file_names = {
Expand All @@ -39,7 +39,6 @@ def split_bam(input_bam: str, temp_dir: str, threads: int) -> List[str]:
reads_per_part = math.ceil(total_read_count / threads)
read_counter = 0
part = 0

for record in input_bamfile.fetch(until_eof=True):
output_bamfiles[part].write(record)
read_counter += 1
Expand All @@ -48,7 +47,7 @@ def split_bam(input_bam: str, temp_dir: str, threads: int) -> List[str]:
read_counter = 0

input_bamfile.close()
return list(output_file_names.values())
return output_file_names.values()


def convert_to_AlignedSegment(
Expand All @@ -59,23 +58,23 @@ def convert_to_AlignedSegment(
barcode information as tags.
Args:
header (str): Header information for the segment.
sequence (str): DNA/RNA sequence.
quality (str): Base calling quality values.
barcode_sequence (str): Barcode sequence.
umi_sequence (str): Unique molecular identifier sequence.
header: Header information for the segment.
sequence: DNA/RNA sequence.
quality: Base calling quality values.
barcode_sequence: Barcode sequence.
umi_sequence: Unique molecular identifier sequence.
Returns:
pysam.AlignedSegment: A new AlignedSegment object with the provided data.
A new AlignedSegment object with the provided data.
"""
aligned_segment = pysam.AlignedSegment()
aligned_segment.query_name = header.split()[0]
aligned_segment.query_sequence = sequence
aligned_segment.query_qualities = pysam.qualitystring_to_array(quality)
aligned_segment.flag |= pysam.FUNMAP
aligned_segment.set_tag('B0', barcode_sequence)
aligned_segment.set_tag('B3', umi_sequence)
aligned_segment.set_tag('RG', '0')
aligned_segment.set_tag("B0", barcode_sequence)
aligned_segment.set_tag("B3", umi_sequence)
aligned_segment.set_tag("RG", "0")
return aligned_segment


Expand All @@ -84,25 +83,23 @@ def merge_bam(merged_file_name: str, files_to_merge: List[str], ubam: bool = Fal
Merges multiple partial BAM files into a single file.
Args:
merged_file_name (str): Path to the output merged BAM file.
files_to_merge (List[str]): List of paths to the partial BAM files.
ubam (bool): Indicates if the files are unaligned BAM (uBAM). Default is False.
merged_file_name: Path to the output merged BAM file.
files_to_merge: List of paths to the partial BAM files.
ubam: Indicates if the files are unaligned BAM (uBAM). Default is False.
Returns:
int: Total number of records in the merged BAM file.
Total number of records in the merged BAM file.
"""
assert files_to_merge, "The list of files to merge cannot be empty."
num_records = 0

with pysam.AlignmentFile(files_to_merge[0], mode='rb', check_sq=not ubam) as input_bamfile:
with pysam.AlignmentFile(files_to_merge[0], mode="rb", check_sq=not ubam) as input_bamfile:
merged_file = pysam.AlignmentFile(merged_file_name, mode="wb", template=input_bamfile)

for file_name in files_to_merge:
with pysam.AlignmentFile(file_name, mode='rb', check_sq=not ubam) as input_file:
with pysam.AlignmentFile(file_name, mode="rb", check_sq=not ubam) as input_file:
for record in input_file.fetch(until_eof=True):
merged_file.write(record)
num_records += 1

merged_file.close()

return num_records
62 changes: 32 additions & 30 deletions stpipeline/common/unique_events_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ def __init__(self, gff_filename: Optional[str]):
"""
self.buffer = {}
self.last_position = 0
self.last_chromosome = 'chrom'
self.last_chromosome = "chrom"
self.gene_end_coordinates = {}
if gff_filename:
self.__compute_gene_end_coordinates(gff_filename)
self.compute_gene_end_coordinates(gff_filename)

def __compute_gene_end_coordinates(self, gff_filename: str) -> None:
def compute_gene_end_coordinates(self, gff_filename: str) -> None:
"""
Reads the end coordinates and chromosomes of all genes present in the GFF file
and saves them in a dictionary with the gene ID as the key.
Args:
gff_filename: Path to the GFF file.
Raises:
ValueError: If the gene_id attribute is missing in the GFF file.
"""
logger.debug(f"Parsing GFF file {gff_filename} to compute gene end coordinates.")

Expand All @@ -55,79 +58,79 @@ def __compute_gene_end_coordinates(self, gff_filename: str) -> None:
gene_id = line.get("gene_id", None)
if not gene_id:
msg = f"The gene_id attribute is missing in the annotation file ({gff_filename})."
loggger.error(msg)
logger.error(msg)
raise ValueError(msg)

if gene_id[0] == '"' and gene_id[-1] == '"':
gene_id = gene_id[1:-1]

if gene_id in gene_end_coordinates:
if end > gene_end_coordinates[gene_id][1]:
gene_end_coordinates[gene_id] = (seqname, end)
else:
gene_end_coordinates[gene_id] = (seqname, end)

gene_end_coordinates['__no_feature'] = (None, -1)
gene_end_coordinates["__no_feature"] = (None, -1)
self.gene_end_coordinates = gene_end_coordinates

def get_gene_end_position(self, gene: str) -> Tuple[Optional[str], int]:
"""
Returns the genomic end coordinate and chromosome of the given gene.
Args:
gene (str): Gene ID.
gene: Gene ID.
Returns:
Tuple[Optional[str], int]: Chromosome and end coordinate of the gene.
Chromosome and end coordinate of the gene.
Raises:
ValueError: If the gene is not found in the annotation file or is ambiguous.
"""
try:
return self.gene_end_coordinates[gene]
except KeyError:
if '__ambiguous[' in gene:
ambiguous_genes = gene[gene.index('[') + 1:gene.index(']')].split('+')
# Handle ambigous genes as defined in htseq
if "__ambiguous[" in gene:
ambiguous_genes = gene[gene.index("[") + 1:gene.index("]")].split("+")
try:
return max(
[self.gene_end_coordinates[amb_gene] for amb_gene in ambiguous_genes],
key=operator.itemgetter(1)
)
except KeyError:
raise ValueError(f"Ambiguous gene {gene} not found in annotation file.")
raise ValueError(f"Gene {gene} not found in annotation file.")
msg = f"Ambiguous gene {gene} not found in annotation file."
logger.error(msg)
raise ValueError(msg)
msg = f"Gene {gene} not found in annotation file."
logger.error(msg)
raise ValueError(msg)

def add_transcript(self, gene: str, spot_coordinates: Tuple[int, int], transcript: Transcript, position: int) -> None:
"""
Adds a transcript to the gene buffer.
Args:
gene (str): Gene name.
spot_coordinates (Tuple[int, int]): Spot coordinates (x, y).
transcript (Transcript): Transcript information.
position (int): Transcript's left-most genomic coordinate.
gene: Gene name.
spot_coordinates: Spot coordinates (x, y).
transcript: Transcript information.
position: Transcript's left-most genomic coordinate.
"""
self.last_position = position
self.last_chromosome = transcript.chrom

self.buffer.setdefault(gene, {}).setdefault(spot_coordinates, []).append(transcript)

def check_and_clear_buffer(self, empty: bool = False) -> Generator[Tuple[str, Dict], None, None]:
"""
Checks and clears the buffer, yielding genes that are outside the current chromosome or position.
Args:
empty (bool): If True, forces clearing the buffer.
empty: If True, forces clearing the buffer.
Yields:
Tuple[str, Dict]: Gene name and its buffer content.
Gene name and its buffer content.
"""
for gene in list(self.buffer.keys()):
if gene == '__no_feature' and not empty:
for gene in self.buffer.keys():
if gene == "__no_feature" and not empty:
continue

chrom, end_position = self.get_gene_end_position(gene)

if empty or self.last_position > end_position or self.last_chromosome != chrom:
yield gene, self.buffer.pop(gene)

Expand All @@ -136,15 +139,14 @@ def parse_unique_events(input_file: str, gff_filename: Optional[str] = None) ->
Parses transcripts from a coordinate-sorted BAM file and organizes them by gene and spot coordinates.
Args:
input_file (str): Path to the input BAM file containing annotated records.
gff_filename (Optional[str]): Path to the GFF file containing gene coordinates.
input_file: Path to the input BAM file containing annotated records.
gff_filename: Path to the GFF file containing gene coordinates.
Yields:
Tuple[str, Dict]: Gene name and a dictionary mapping spot coordinates to transcripts.
Gene name and a dictionary mapping spot coordinates to transcripts.
"""
genes_buffer = GeneBuffer(gff_filename) if gff_filename else None
genes_dict: Dict[str, Dict[Tuple[int, int], List[Transcript]]] = {}

genes_dict = {}
sam_file = pysam.AlignmentFile(input_file, "rb")

for rec in sam_file.fetch(until_eof=True):
Expand All @@ -158,7 +160,7 @@ def parse_unique_events(input_file: str, gff_filename: Optional[str] = None) ->
if strand == "-":
start, end = end, start

x, y, gene, umi = -1, -1, 'None', 'None'
x, y, gene, umi = -1, -1, "None", "None"
for k, v in rec.tags:
if k == "B1":
x = int(v)
Expand Down
Loading

0 comments on commit 7c4e98b

Please sign in to comment.