import itertools
from singlecellmultiomics.utils.sequtils import hamming_distance, get_consensus_dictionaries, pick_best_base_call
import pysamiterators.iterators
import singlecellmultiomics.modularDemultiplexer.baseDemultiplexMethods
from singlecellmultiomics.utils import style_str
from singlecellmultiomics.bamProcessing import get_read_group_from_read
from singlecellmultiomics.features import FeatureAnnotatedObject
complement = str.maketrans('ATCGN', 'TAGCN')
[docs]class Fragment():
"""
This class holds 1 or more reads which are derived from the same cluster
Example:
Generate a Fragment with a single associated read::
>>> from singlecellmultiomics.molecule import Molecule
>>> from singlecellmultiomics.fragment import Fragment
>>> import pysam
>>> read = pysam.AlignedSegment()
>>> read.reference_start = 30
>>> read.query_name = 'R1'
>>> read.mapping_quality = 30
>>> read.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag
>>> read.set_tag('RX','CAT') # The UMI is extracted from the RX tag
>>> read.query_sequence = "CATGTATCCGGGCTTAA"
>>> read.query_qualities = [30] * len(read.query_sequence)
>>> read.cigarstring = f'{len(read.query_sequence)}M'
>>> Fragment([read])
Fragment:
sample:CELL_1
umi:CAT
span:None 30-47
strand:+
has R1: yes
has R2: no
randomer trimmed: no
Warning:
Make sure the RX and SM tags of the read are set! If these are encoded
in the read name, use singlecellmultiomics.universalBamTagger.customreads
for conversion.
"""
def __init__(self, reads,
assignment_radius: int = 0,
umi_hamming_distance: int = 1,
R1_primer_length: int = 0,
R2_primer_length: int = 6,
tag_definitions: list = None,
max_fragment_size: int = None,
mapping_dir=(False, True),
max_NUC_stretch: int = None,
read_group_format: int = 0, # R1 forward, R2 reverse
library_name: str = None, # Overwrites the library name
single_end: bool = False
):
"""
Initialise Fragment
Args:
reads( list ):
list containing pysam AlignedSegment reads
assignment_radius( int ):
Assignment radius for molecule resolver
umi_hamming_distance( int ):
umi_hamming_distance distance threshold for associating multiple fragments to a molecule
R1_primer_length(int):
length of R1 primer, these bases are not taken into account when calculating a consensus
R2_primer_length(int):
length of R2 primer, these bases are not taken into account when calculating a consensus, this length is auto-detected/ overwritten when the rS tag is set
tag_definitions(list):
sam TagDefinitions
max_fragment_size (int):
When specified, fragments with a fragment size larger than the specified value are flagged as qcfail
mapping_dir( tuple ):
expected mapping direction of mates,
True for reverse, False for forward. This parameter is used in dovetail detection
read_group_format(int) : see get_read_group()
Returns:
html(string) : html representation of the fragment
"""
self.R1_primer_length = R1_primer_length
self.R2_primer_length = R2_primer_length
if tag_definitions is None:
tag_definitions = singlecellmultiomics.modularDemultiplexer.baseDemultiplexMethods.TagDefinitions
self.tag_definitions = tag_definitions
self.assignment_radius = assignment_radius
self.umi_hamming_distance = umi_hamming_distance
self.mapping_dir = mapping_dir
self.reads = reads
self.strand = None
self.meta = {} # dictionary of meta data
self.is_mapped = None
# Check for multimapping
self.is_multimapped = True
self.mapping_quality = 0
self.match_hash = None
self.safe_span = None # wether the span of the fragment could be determined
self.unsafe_trimmed = False # wether primers have been trimmed off
self.random_primer_sequence = None
self.max_fragment_size = max_fragment_size
self.read_group_format = read_group_format
self.max_NUC_stretch = max_NUC_stretch
self.qcfail = False
self.single_end = single_end
# Span:\
self.span = [None, None, None]
self.umi = None
# Force R1=read1 R2=read2:
for i, read in enumerate(self.reads):
if read is None:
continue
if self.max_NUC_stretch is not None and (
self.max_NUC_stretch*'A' in read.seq or
self.max_NUC_stretch*'G' in read.seq or
self.max_NUC_stretch*'T' in read.seq or
self.max_NUC_stretch*'C' in read.seq):
self.set_rejection_reason('HomoPolymer', set_qcfail=True)
self.qcfail=True
break
if read.is_qcfail:
self.qcfail = True
if read.has_tag('rS'):
self.random_primer_sequence = read.get_tag('rS')
self.unsafe_trimmed = True
self.R2_primer_length = 0 # Set R2 primer length to zero, as the primer has been removed
if not read.is_unmapped:
self.is_mapped = True
if read.mapping_quality > 0:
self.is_multimapped = False
self.mapping_quality = max(
self.mapping_quality, read.mapping_quality)
if i == 0:
if read.is_read2:
if not read.is_qcfail:
raise ValueError('Supply first R1 then R2')
read.is_read1 = True
read.is_read2 = False
elif i == 1:
read.is_read1 = False
read.is_read2 = True
self.set_sample(library_name=library_name)
self.update_umi()
if self.qcfail:
return
self.set_strand(self.identify_strand())
self.update_span()
[docs] def write_tensor(self, chromosome=None, span_start=None, span_end=None, height=30, index_start=0,
base_content_table=None,
base_mismatches_table=None,
base_indel_table=None,
base_qual_table=None,
base_clip_table=None,
mask_reference_bases=None,
# set with reference bases to mask (converted to N )
# (chrom,pos)
reference=None,
skip_missing_reads=False
):
"""
Write tensor representation of the fragment to supplied 2d arrays
Args:
chromosome( str ):
chromosome to view
span_start( int ):
first base to show
span_end( int ):
last base to show
height (int) :
height of the tensor (reads)
index_start(int): start writing tensor at this row
base_content_table(np.array) : 2d array to write base contents to
base_indel_table(np.array) : 2d array to write indel information to
base_content_table(np.array) : 2d array to write base contents to
base_content_table(np.array) : 2d array to write base contents to
mask_reference_bases(set): mask reference bases in this set with a N set( (chrom,pos), ... )
reference(pysam.FastaFile) : Handle to reference file to use instead of MD tag. If None: MD tag is used.
skip_missing_reads (bool) : when enabled only existing (non None) reads are added to the tensor. Use this option when mapping single-end
Returns:
None
"""
self.base_mapper = {}
for i, (strand, base) in enumerate(
itertools.product((True, False), 'NACTG-')):
self.base_mapper[(strand, base)] = i + 2
if chromosome is None and span_start is None and span_end is None:
chromosome, span_start, span_end = self.get_span()
span_len = span_end - span_start
reads = self
last_ref = None
used_reads = 0
for ri, read in enumerate(reads):
if read is None:
continue
alignment_operations = list(itertools.chain(
*([operation] * l for operation, l in read.cigartuples if operation != 2)))
print(alignment_operations)
# Obtain the amount of operations before the alignment start
operations_before_alignment_start = 0
for query_pos, ref_pos in read.get_aligned_pairs():
if ref_pos is None:
operations_before_alignment_start += 1
else:
break
initial_base_offset = read.reference_start - operations_before_alignment_start
# Obtain row index in the tensor
if skip_missing_reads:
row_index = (used_reads + index_start) % height
else:
row_index = (ri + index_start) % height
# Where in the reference are we?
ref_pointer = read.reference_start
alignment_started = False
for operation, (cycle, query_pos, ref_pos, ref_base) in zip(
alignment_operations,
pysamiterators.iterators.ReadCycleIterator(
read,
with_seq=True,
reference=reference)):
# Mask locations from mask_reference_bases
if mask_reference_bases is not None and ref_pos is not None and (
chromosome, ref_pos) in mask_reference_bases:
ref_base = 'N'
if ref_pos is None:
if not alignment_started:
ref_pointer = initial_base_offset + i
else:
ref_pointer += 1
if operation == 4:
try:
query_base = read.seq[query_pos]
base_clip_table[row_index, ref_pointer -
span_start] = self.base_mapper[(read.is_reverse, query_base)]
except IndexError:
pass
continue
ref_pointer = ref_pos
if (ref_pos - span_start) < 0 or (ref_pos -
span_start) > (span_len - 1):
continue
ref_base = ref_base.upper()
if query_pos is None:
query_base = '-'
qual = 0
base_indel_table[row_index, ref_pos - span_start] = 1
else:
query_base = read.seq[query_pos]
qual = ord(read.qual[query_pos])
if ref_pos is not None:
base_qual_table[row_index, ref_pos - span_start] = qual
if query_base != ref_base:
base_content_table[row_index, ref_pos -
span_start] = self.base_mapper[(read.is_reverse, query_base)]
else:
base_mismatches_table[row_index,
ref_pos - span_start] = 0.2
base_content_table[row_index, ref_pos -
span_start] = self.base_mapper[(read.is_reverse, query_base)]
used_reads += 1
if skip_missing_reads:
return used_reads + index_start + 1
else:
return ri + index_start + 1
[docs] def get_read_group(self, with_attr_dict=False) -> str:
r = None
for read in self.reads:
if read is None:
continue
r = get_read_group_from_read(read,
format=self.read_group_format,
with_attr_dict=with_attr_dict)
break
return r
[docs] def set_duplicate(self, is_duplicate):
"""Define this fragment as duplicate, sets the corresponding bam bit flag
Args:
value(bool) : is_duplicate
"""
for read in self.reads:
if read is not None:
read.is_duplicate = is_duplicate
[docs] def get_fragment_size(self):
return abs(self.span[2] - self.span[1])
[docs] def write_pysam(self, pysam_handle):
"""Write all associated reads to the target file
Args:
target_file (pysam.AlignmentFile) : Target file
"""
self.write_tags()
for read in self:
if read is not None:
pysam_handle.write(read)
[docs] def identify_strand(self):
# If R2 is rev complement:
if self.get_R1() is not None:
# verify if the read has been mapped:
if self.get_R1().is_unmapped:
return None
return self.get_R1().is_reverse
elif self.get_R2() is not None:
# verify if the read has been mapped:
if self.get_R2().is_unmapped:
return None
return not self.get_R2().is_reverse
return None
[docs] def get_random_primer_hash(self):
"""Obtain hash describing the random primer
this assumes the random primer is on the end of R2 and has a length of self.R2_primer_length
When the rS tag is set, the value of this tag is used as random primer sequence
Returns None,None when the random primer cannot be described
Returns:
reference_name (str) or None
reference_start (int) : Int or None
sequence (str) : Int or None
"""
R2 = self.get_R2()
if R2 is None or R2.query_sequence is None:
return None, None, None
if self.R2_primer_length == 0 and self.random_primer_sequence is None:
return None, None, None
# The read was not mapped
if R2.is_unmapped:
# Guess the orientation does not matter
if self.random_primer_sequence is not None:
return None, None, self.random_primer_sequence
return None, None, R2.query_sequence[:self.R2_primer_length]
if R2.is_reverse:
global complement
if self.random_primer_sequence is not None:
return R2.reference_name, R2.reference_end, self.random_primer_sequence
return(R2.reference_name, R2.reference_end, R2.query_sequence[-self.R2_primer_length:][::-1].translate(complement))
else:
if self.random_primer_sequence is not None:
return R2.reference_name, R2.reference_start, self.random_primer_sequence
return(R2.reference_name, R2.reference_start, R2.query_sequence[:self.R2_primer_length])
raise ValueError()
[docs] def get_R1(self):
"""
Obtain the AlignedSegment of read 1 of the fragment
Returns:
R1 (pysam.AlignedSegment) : Read 1 of the fragment, returns None
when R1 is not mapped
"""
if len(self.reads) == 0:
raise IndexError('The fragment has no associated reads')
return self.reads[0]
[docs] def get_R2(self):
"""
Obtain the AlignedSegment of read 2 of the fragment
Returns:
R1 (pysam.AlignedSegment) : Read 2 of the fragment, returns None
when R2 is not mapped
"""
if len(self.reads) < 2:
raise IndexError('The fragment has no associated R2')
return self.reads[1]
[docs] def has_R1(self):
"""
Check if the fragment has an associated read 1
Returns:
has_r1 (bool)
"""
if len(self.reads) == 0:
return False
return self.reads[0] is not None
[docs] def has_R2(self):
"""
Check if the fragment has an associated read 2
Returns:
has_r2 (bool)
"""
if len(self.reads) < 2:
return False
return self.reads[1] is not None
@property
def R1(self):
return self.reads[0]
@property
def R2(self):
return self.reads[1]
[docs] def get_consensus(self, only_include_refbase: str = None, dove_safe: bool = False, **get_consensus_dictionaries_kwargs) -> dict:
"""
a dictionary of (reference_pos) : (qbase, quality, reference_base) tuples
Args:
only_include_refbase(str) : Only report bases aligned to this reference base, uppercase only
dove_safe(bool) : Only report bases supported within R1 and R2 start and end coordinates
Returns:
consensus(dict) : {reference_position: (qbase, quality)
"""
r1_consensus, r2_consensus = get_consensus_dictionaries(
self.R1,
self.R2,
only_include_refbase=only_include_refbase,
dove_safe=dove_safe,
**get_consensus_dictionaries_kwargs)
return {
ref_pos:pick_best_base_call( r1_consensus.get(ref_pos) , r2_consensus.get(ref_pos) )
for ref_pos in set(r1_consensus.keys()).union(set(r2_consensus.keys()))
}
@property
def estimated_length(self) -> int:
"""
Obtain the estimated size of the fragment,
returns None when estimation is not possible
Takes into account removed bases (R2_primer_length attribute)
Assumes inwards sequencing orientation, except when self.single_end is set
"""
if self.single_end:
if self[0] is None:
return None
return self[0].reference_end - self[0].reference_start
if self.has_R1() and self.has_R2():
contig = self.R1.reference_name
if self.R1.is_reverse and not self.R2.is_reverse:
## rp |----- R2 ----> <--- R1 ----|
## |>----------DISTANCE------------<|
start, end = self.R2.reference_start - self.R2_primer_length, self.R1.reference_end
if start<end:
return end - start
else:
return None
elif not self.R1.is_reverse and self.R2.is_reverse:
## |----- R1 ----> <--- R2 ----| rp
## |>----------DISTANCE------------<|
start, end = self.R1.reference_start, self.R2.reference_end + self.R2_primer_length
if start<end:
return end - start
else:
return None
else:
return None
return None
[docs] def update_span(self):
"""
Update the span (the location the fragment maps to) stored in Fragment
The span is a single stretch of coordinates on a single contig.
The contig is determined by the reference_name assocated to the first
mapped read in self.reads
This calculation assumes the reads are sequenced inwards and
dove-tails of the molecule cannot be trusted
"""
contig = None
start = None
end = None
if self.has_R1() and self.has_R2() and \
self.R1.reference_start is not None and self.R1.reference_end is not None and \
self.R2.reference_start is not None and self.R2.reference_end is not None :
contig = self.R1.reference_name
if self.R1.is_reverse and not self.R2.is_reverse:
start, end = self.R2.reference_start, self.R1.reference_end
self.safe_span = True
elif not self.R1.is_reverse and self.R2.is_reverse:
start, end = self.R1.reference_start, self.R2.reference_end
self.safe_span = True
else:
start = min(self.R1.reference_start, self.R2.reference_start)
end = max(self.R1.reference_start, self.R2.reference_start)
self.safe_span = False
elif self.has_R1() and self.R1.reference_start is not None and self.R1.reference_end is not None :
contig = self.R1.reference_name
start, end = self.R1.reference_start, self.R1.reference_end
self.safe_span = False
elif self.has_R2() and self.R2.reference_start is not None and self.R2.reference_end is not None :
contig = self.R2.reference_name
start, end = self.R2.reference_start, self.R2.reference_end
self.safe_span = False
else:
# Its Sometimes possible that no cigar is set for the alignment, only a start coordinate
for read in self:
if read is None:
continue
if len(read.cigar)!=0:
raise NotImplementedError("Non implemented span")
if read.reference_start is not None:
start,end = read.reference_start, read.reference_start
contig = read.reference_name
else:
raise NotImplementedError("Non implemented span, undefined alignment, and not start coordinate")
self.span = (contig, start, end)
@property
def aligned_length(self):
return self.span[2]-self.span[1]
[docs] def get_span(self) -> tuple:
return self.span
[docs] def has_valid_span(self) -> bool:
# Check if the span could be calculated:
defined_span = not any([x is None for x in self.get_span()])
if not defined_span:
return False
if self.max_fragment_size is not None and self.get_fragment_size()>self.max_fragment_size:
return False
return True
[docs] def set_sample(self, sample: str = None, library_name: str = None):
"""Force sample name or obtain sample name from associated reads"""
if sample is not None:
self.sample = sample
else:
for read in self.reads:
if read is not None and read.has_tag('SM'):
self.sample = read.get_tag('SM')
break
if library_name is not None:
sample = self.sample.split('_', 1)[-1]
self.sample = library_name+'_'+sample
for read in self.reads:
if read is not None:
read.set_tag('SM', self.sample)
read.set_tag('LY', library_name)
[docs] def get_sample(self) -> str:
""" Obtain the sample name associated with the fragment
The sample name is extracted from the SM tag of any of the associated reads.
Returns:
sample name (str)
"""
return self.sample
[docs] def __len__(self):
"""Obtain the amount of associated reads to the fragment
Returns:
assocoiated reads (int)
"""
return(sum(read is not None for read in self.reads))
[docs] def set_strand(self, strand: bool):
"""Set mapping strand
Args:
strand (bool) : False for Forward, True for reverse
"""
self.strand = strand
[docs] def get_strand(self):
"""Obtain strand
Returns:
strand (bool) : False for Forward, True for reverse
"""
return self.strand
[docs] def update_umi(self):
"""
Extract umi from 'RX' tag and store the UMI in the Fragment object
"""
for read in self.reads:
if read is not None and read.has_tag('RX'):
self.umi = read.get_tag('RX')
[docs] def get_umi(self):
"""
Get the UMI sequence associated to this fragment
Returns:
umi(str)
"""
return self.umi
def __iter__(self):
return iter(self.reads)
[docs] def umi_eq(self, other):
"""
Hamming distance measurement to another Fragment or Molecule,
Returns :
is_close (bool) : returns True when the hamming distance between
the two objects <= umi_hamming_distance
Example:
>>> from singlecellmultiomics.molecule import Molecule
>>> from singlecellmultiomics.fragment import Fragment
>>> import pysam
>>> # Create reads (read_A, and read_B), they both belong to the same
>>> # cell and have 1 hamming distance between their UMI's
>>> read_A = pysam.AlignedSegment()
>>> read_A.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag
>>> read_A.set_tag('RX','CAT') # The UMI is extracted from the RX tag
>>> read_B = pysam.AlignedSegment()
>>> read_B.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag
>>> read_B.set_tag('RX','CAG') # The UMI is extracted from the RX tag
>>> # Create fragment objects for read_A and B:
>>> frag_A = Fragment([read_A],umi_hamming_distance=0)
>>> frag_B = Fragment([read_B],umi_hamming_distance=0)
>>> frag_A.umi_eq(frag_B) # This returns False, the distance is 1, which is higher than 0 (umi_hamming_distance)
False
>>> frag_A = Fragment([read_A],umi_hamming_distance=1)
>>> frag_B = Fragment([read_B],umi_hamming_distance=1)
>>> frag_A.umi_eq(frag_B) # This returns True, the distance is 1, which is the (umi_hamming_distance)
True
"""
if self.umi == other.umi:
return True
if self.umi_hamming_distance == 0:
return False
else:
if len(self.umi)!=len(other.umi):
return False
return hamming_distance(
self.umi, other.umi) <= self.umi_hamming_distance
[docs] def __eq__(self, other): # other can also be a Molecule!
# Make sure fragments map to the same strand, cheap comparisons
"""
Check equivalence between two Fragments or Fragment and Molecule.
Args:
other (Fragment or Molecule) : object to compare against
Returns
is_eq (bool) : True when the other object is (likely) derived from the same molecule
Example:
>>> from singlecellmultiomics.molecule import Molecule
>>> from singlecellmultiomics.fragment import Fragment
>>> import pysam
>>> # Create sam file to write some reads to:
>>> test_sam = pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000])
>>> read_A = pysam.AlignedSegment(test_sam.header)
>>> read_A.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag
>>> read_A.set_tag('RX','CAT') # The UMI is extracted from the RX tag
>>> # By default the molecule assignment is done based on the mapping location of read 1:
>>> read_A.reference_name = 'chr1'
>>> read_A.reference_start = 100
>>> read_A.query_sequence = 'ATCGGG'
>>> read_A.cigarstring = '6M'
>>> read_B = pysam.AlignedSegment(test_sam.header)
>>> read_B.set_tag('SM','CELL_1')
>>> read_B.set_tag('RX','CAT')
>>> read_B.reference_start = 100
>>> read_B.query_sequence = 'ATCGG'
>>> read_B.cigarstring = '5M'
>>> frag_A = Fragment([read_A],umi_hamming_distance=0)
>>> frag_A
Fragment:
sample:CELL_1
umi:CAT
span:chr1 100-106
strand:+
has R1: yes
has R2: no
randomer trimmed: no
>>> frag_B = Fragment([read_B],umi_hamming_distance=0)
>>> frag_A == frag_B
True
# Fragment A and fragment B belong to the same molecule,
# the UMI is identical, the starting position of R1 is identical and
# the sample name matches
When we move one of the reads, the Fragments are not equivalent any more
Example:
>>> read_B.reference_start = 150
>>> frag_B = Fragment([read_B],umi_hamming_distance=0)
>>> frag_A == frag_B
False
Except if the difference <= the assignment_radius
Example:
>>> read_B.reference_start = 150
>>> read_A.reference_start = 100
>>> frag_B = Fragment([read_B],assignment_radius=300)
>>> frag_A = Fragment([read_A],assignment_radius=300)
>>> frag_A == frag_B
True
When the UMI's are too far apart, the eq function returns `False`
Example:
>>> read_B.reference_start = 100
>>> read_A.reference_start = 100
>>> read_A.set_tag('RX','GGG')
>>> frag_B = Fragment([read_B])
>>> frag_A = Fragment([read_A])
>>> frag_A == frag_B
False
When the sample of the Fragments are not identical, the eq function returns `False`
Example:
>>> read_B.reference_start = 100
>>> read_A.reference_start = 100
>>> read_A.set_tag('RX','AAA')
>>> read_B.set_tag('RX','AAA')
>>> read_B.set_tag('SM', 'CELL_2' )
>>> frag_B = Fragment([read_B])
>>> frag_A = Fragment([read_A])
>>> frag_A == frag_B
False
"""
if self.sample != other.sample:
return False
if self.strand != other.strand:
return False
if not self.has_valid_span() or not other.has_valid_span():
return False
if min(abs(self.span[1] -
other.span[1]), abs(self.span[2] -
other.span[2])) > self.assignment_radius:
return False
# Make sure UMI's are similar enough, more expensive hamming distance
# calculation
return self.umi_eq(other)
[docs] def __getitem__(self, index):
"""
Get a read from the fragment
Args:
index( int ):
0 : Read 1
1: Read 2
..
"""
return self.reads[index]
[docs] def is_valid(self):
if self.qcfail:
return False
return self.has_valid_span()
[docs] def get_strand_repr(self):
s = self.get_strand()
if s is None:
return '?'
if s:
return '-'
else:
return '+'
def __repr__(self):
rep = f"""Fragment:
sample:{self.get_sample()}
umi:{self.get_umi()}
span:{('%s %s-%s' % self.get_span() if self.has_valid_span() else 'no span')}
strand:{self.get_strand_repr()}
has R1: {"yes" if self.has_R1() else "no"}
has R2: {"yes" if self.has_R2() else "no"}
randomer trimmed: {"yes" if self.unsafe_trimmed else "no"}
"""
try:
rep += '\n\t'.join([f'{key}:{str(value)}' for key, value in self.meta.items()])
except Exception as e:
print(self.meta)
raise
return rep
[docs] def get_html(
self,
chromosome=None,
span_start=None,
span_end=None,
show_read1=None,
show_read2=None):
"""
Get HTML representation of the fragment
Args:
chromosome( str ):
chromosome to view
span_start( int ):
first base to show
span_end( int ):
last base to show
show_read1(bool):
show read1
show_read2(bool):
show read2
Returns:
html(string) : html representation of the fragment
"""
if chromosome is None and span_start is None and span_end is None:
chromosome, span_start, span_end = self.get_span()
span_len = abs(span_end - span_start)
visualized_frag = ['.'] * span_len
if show_read1 is None and show_read2 is None:
reads = self
else:
reads = []
if show_read1:
reads.append(self.get_R1())
if show_read2:
reads.append(self.get_R2())
for read in reads:
if read is None:
continue
# for cycle, query_pos, ref_pos, ref_base in
# pysamiterators.iterators.ReadCycleIterator(read,with_seq=True):
for query_pos, ref_pos, ref_base in read.get_aligned_pairs(
with_seq=True, matches_only=True):
if ref_pos is None or ref_pos < span_start or ref_pos > span_end:
continue
ref_base = ref_base.upper()
if query_pos is None:
query_base = '-'
else:
query_base = read.seq[query_pos]
if ref_pos is not None:
if query_base != ref_base:
v = style_str(query_base, color='red', weight=500)
else:
v = query_base
try:
visualized_frag[ref_pos - span_start] = v
except IndexError:
pass # the alignment is outside the requested view
return ''.join(visualized_frag)
[docs] def set_rejection_reason(self, reason, set_qcfail=False):
self.set_meta('RR', reason, as_set=True)
if set_qcfail:
for read in self:
if read is not None:
read.is_qcfail = True
[docs] def set_recognized_sequence(self, seq):
self.set_meta('RZ', seq)
[docs]class SingleEndTranscriptFragment(Fragment, FeatureAnnotatedObject):
def __init__(self, reads, features, assignment_radius=0, stranded=None, capture_locations=False, auto_set_intron_exon_features=True,**kwargs):
Fragment.__init__(self, reads, assignment_radius=assignment_radius, **kwargs)
FeatureAnnotatedObject.__init__(self,
features=features,
stranded=stranded,
capture_locations=capture_locations,
auto_set_intron_exon_features=auto_set_intron_exon_features )
self.match_hash = (self.sample,
self.span[0],
self.strand)
[docs] def is_valid(self):
if len(self.genes)==0:
return False
return True
[docs] def annotate(self):
for read in self:
if read is None:
continue
read_strand = read.is_reverse
if self.stranded is not None and self.stranded==False:
read_strand=not read_strand
strand = ('-' if read_strand else '+')
read.set_tag('mr',strand)
for start, end in read.get_blocks():
for hit in self.features.findFeaturesBetween(
chromosome=read.reference_name,
sampleStart=start,
sampleEnd=end,
strand=(None if self.stranded is None else strand)) :
hit_start, hit_end, hit_id, hit_strand, hit_ids = hit
self.hits[hit_ids].add(
(read.reference_name, (hit_start, hit_end)))
if self.capture_locations:
if not hit_id in self.feature_locations:
self.feature_locations[hit_id] = []
self.feature_locations[hit_id].append( (hit_start, hit_end, hit_strand))
[docs] def get_site_location(self):
return self.span[0], self.start
def __eq__(self, other):
# Check if the other fragment/molecule is aligned to the same gene
if self.match_hash != other.match_hash:
return False
if len(self.genes.intersection(other.genes)):
return self.umi_eq(other)
return False
[docs]class FeatureCountsSingleEndFragment(Fragment):
""" Class for fragments annotated with featureCounts
Extracts annotated gene from the XT tag.
Deduplicates using XT tag and UMI
Reads without XT tag are flagged as invalid
"""
def __init__(self,
reads,
R1_primer_length=4,
R2_primer_length=6,
assignment_radius=100_000,
umi_hamming_distance=1,
invert_strand=False,
**kwargs
):
Fragment.__init__(self,
reads,
assignment_radius=assignment_radius,
R1_primer_length=R1_primer_length,
R2_primer_length=R2_primer_length,
umi_hamming_distance=umi_hamming_distance,**kwargs)
self.strand = None
self.gene = None
self.identify_gene()
if self.is_valid():
self.match_hash = (self.strand, self.gene, self.sample)
else:
self.match_hash = None
def __eq__(self, other):
# Make sure fragments map to the same strand, cheap comparisons
if self.match_hash != other.match_hash:
return False
# Make sure UMI's are similar enough, more expensive hamming distance
# calculation
return self.umi_eq(other)
[docs] def is_valid(self):
return self.gene is not None
[docs] def identify_gene(self):
self.valid = False
for read in self.reads:
if read is not None and read.has_tag('XT'):
self.gene = read.get_tag('XT')
self.valid = True
return self.gene
[docs]class FeatureCountsFullLengthFragment(FeatureCountsSingleEndFragment):
""" Class for fragments annotated with featureCounts, with multiple reads covering a gene
Extracts annotated gene from the XT tag.
Deduplicates using XT tag and UMI
Reads without XT tag are flagged as invalid
"""
def __init__(self,
reads,
R1_primer_length=4,
R2_primer_length=6,
assignment_radius=10_000,
umi_hamming_distance=1,
invert_strand=False,
**kwargs
):
FeatureCountsSingleEndFragment.__init__(self,
reads,
assignment_radius=assignment_radius,
R1_primer_length=R1_primer_length,
R2_primer_length=R2_primer_length,
umi_hamming_distance=umi_hamming_distance,
**kwargs
)
def __eq__(self, other):
# Make sure fragments map to the same strand, cheap comparisons
if self.match_hash != other.match_hash:
return False
#Calculate distance
if min(abs(self.span[1] -
other.span[1]), abs(self.span[2] -
other.span[2])) > self.assignment_radius:
return False
# Make sure UMI's are similar enough, more expensive hamming distance
# calculation
return self.umi_eq(other)
[docs]class FragmentWithoutPosition(Fragment):
""" Fragment without a specific location on a contig"""
[docs] def get_site_location(self):
if self.has_valid_span():
return self.span[0], 0
else:
return '*', 0
[docs]class FragmentStartPosition(Fragment):
""" Fragment without a specific location on a contig"""
[docs] def get_site_location(self):
for read in self:
if read is not None and not read.is_unmapped:
return read.reference_name, read.reference_start
return None
[docs]class FragmentWithoutUMI(Fragment):
"""
Use this class when no UMI information is available
"""
def __init__(self, reads, **kwargs):
Fragment.__init__(self, reads, **kwargs)
# remove the set_umi function
[docs] def set_umi(self, **kwargs):
pass
# Replace the equality function
def __eq__(self, other): # other can also be a Molecule!
# Make sure fragments map to the same strand, cheap comparisons
if self.sample != other.sample:
return False
if self.strand != other.strand:
return False
if min(abs(self.span[1] -
other.span[1]), abs(self.span[2] -
other.span[2])) > self.assignment_radius:
return False
# Sample matches and starting position is within the defined span
# radius
return True