from singlecellmultiomics.fragment import Fragment
from singlecellmultiomics.utils.sequtils import hamming_distance
[docs]class NlaIIIFragment(Fragment):
def __init__(self,
reads,
R1_primer_length=4,
R2_primer_length=6,
assignment_radius=1_000,
umi_hamming_distance=1,
invert_strand=False,
check_motif=True,
no_overhang =False, # CATG is present OUTSIDE the fragment
cut_location_offset=-4,
reference=None, #Reference is required when no_overhang=True
allow_cycle_shift=False,
use_allele_tag = False, # Use existing DA tag for molecule deduplication
no_umi_cigar_processing=False, **kwargs
):
self.invert_strand = invert_strand
self.no_overhang = no_overhang
self.reference = reference
self.allow_cycle_shift = allow_cycle_shift
self.cut_location_offset = cut_location_offset
self.no_umi_cigar_processing= no_umi_cigar_processing
self.use_allele_tag = use_allele_tag
self.check_motif=check_motif
if self.no_overhang and reference is None:
raise ValueError('Supply a reference handle when no_overhang=True')
if self.no_overhang and not self.check_motif:
raise ValueError('no_overhang=True is not compatible with check_motif=False, as there is no way to deduplicate. Consider using method "chic"')
Fragment.__init__(self,
reads,
assignment_radius=assignment_radius,
R1_primer_length=R1_primer_length,
R2_primer_length=R2_primer_length,
umi_hamming_distance=umi_hamming_distance, **kwargs)
# set NLAIII cut site given reads
self.strand = None
self.site_location = None
self.cut_site_strand = None
if self.identify_site():
self.found_valid_site = True
else:
self.found_valid_site = False
if self.is_valid():
if self.use_allele_tag:
allele = 'n'
for read in self.reads:
if read is not None and read.has_tag('DA'):
allele = read.get_tag('DA')
self.match_hash = (
allele,
self.strand,
self.cut_site_strand,
self.site_location[0],
self.site_location[1],
self.sample)
else:
self.match_hash = (
self.strand,
self.cut_site_strand,
self.site_location[0],
self.site_location[1],
self.sample)
else:
self.match_hash = None
[docs] def set_site(self, site_chrom, site_pos, site_strand=None, valid=True):
if not valid :
self.found_valid_site = False
else:
self.found_valid_site = True
self.set_meta('DS', site_pos)
if site_strand is not None:
if self.invert_strand:
self.set_meta('RS', not site_strand)
else:
self.set_meta('RS', site_strand)
self.set_strand(site_strand)
self.site_location = (site_chrom, site_pos)
self.cut_site_strand = site_strand
[docs] def get_safe_span(self, allow_unsafe=True):
# For a mapped read pair it can be important to figure out which bases are actual genomic signal
# Al sequence behind and aligned to the random primer(s) cannot be trusted and should be masked out
# secondly all signal before the starting location of R1 cannot be trusted
# This function returns a lower and higher bound of the locations within the fragment that can be trusted
# ASCII art: (H is primer sequence)
# R1 H------------------------>
# <------------------HH R2
# Output: (E is emitted)
# R1 HEEEEEEE----------------->
# <------------------HH R2
if self.R1_primer_length==0 and self.R2_primer_length==0:
starts = tuple( read.reference_start for read in self if read is not None and not read.is_unmapped )
ends = tuple( read.reference_end for read in self if read is not None and not read.is_unmapped )
return min( min(starts), min(ends) ), max( max(starts), max(ends) )
R1, R2 = self.reads
if (R1 is None or R1.is_unmapped) and allow_unsafe:
return R2.reference_start,R2.reference_end
if (R2 is None or R2.is_unmapped) and allow_unsafe:
return R1.reference_start,R1.reference_end
if (R1 is None and not allow_unsafe) or R2 is None or R1.is_unmapped:
# This is an annoying situation, we cannot determine what bases can be trusted
raise ValueError('Genomic locations cannot be determined')
if R2.is_unmapped:
# This is an annoying situation, we cannot determine what bases can be trusted
raise ValueError('Genomic locations cannot be determined')
if R1.is_reverse==R2.is_reverse:
raise ValueError('Fragment incorrectly mapped') # The fragment is not correctly mapped
if not R1.is_reverse: # R1 is forward, R2 is reverse
# R1 H------------------------>
# <------------------HH R2
start = R1.reference_start+ self.R1_primer_length
end = R2.reference_end - self.R2_primer_length
else:
# R2 HH------------------------>
# <------------------HH R1
start = R2.reference_start+ self.R2_primer_length
end = R1.reference_end - self.R1_primer_length
if start>=end:
raise ValueError('Fragment has no size')
return start,end
[docs] def identify_site(self):
self.found_valid_site = False
R1, R2 = self.reads
""" Valid configs:
CATG######## R1 ########## ^ ########## R2 ##########
############ R2 ########## ^ ########### R1 #####CATG reverse case
!BWA inverts the query sequence if it maps to the negative strand!
or R2.is_unmapped:
if R1.is_unmapped and R2.is_unmapped:
self.set_rejection_reason(R1, 'unmapped R1;R2')
elif R1.is_unmapped:
self.set_rejection_reason(R1, 'unmapped R1')
self.set_rejection_reason(R2, 'unmapped R1')
else:
self.set_rejection_reason(R1, 'unmapped R2')
self.set_rejection_reason(R2, 'unmapped R2')
return(None)
"""
if R1 is None or R1.is_unmapped:
self.set_rejection_reason('unmapped R1')
return None
if self.no_overhang:
# scan 3 bp of sequence for CATG
scan_extra_bp = 3
site_coordinate = None
if R1.is_reverse:
motif = self.reference.fetch(R1.reference_name, R1.reference_end, R1.reference_end-self.cut_location_offset+scan_extra_bp)
if 'CATG' in motif:
offset = motif.find('CATG')
site_coordinate = R1.reference_end + offset
else:
motif = self.reference.fetch(R1.reference_name, R1.reference_start+self.cut_location_offset-scan_extra_bp, R1.reference_start)
if 'CATG' in motif:
offset = motif[::-1].find('GTAC')
site_coordinate = R1.reference_start - offset - 4
if site_coordinate is None:
self.set_rejection_reason('no_CATG_in_ref', set_qcfail=True)
return None
self.set_site(site_strand=R1.is_reverse, site_chrom=R1.reference_name, site_pos=site_coordinate)
self.set_recognized_sequence(motif)
return site_coordinate
else:
forward_motif = R1.seq[:4]
rev_motif = R1.seq[-4:]
r1_start =(R1.reference_end if R1.is_reverse else R1.reference_start)
if not self.no_umi_cigar_processing:
if R1.is_reverse:
if R1.cigartuples[-1][0]==4: # softclipped at end
r1_start+=R1.cigartuples[-1][1]
else:
if R1.cigartuples[0][0]==4: # softclipped at start
r1_start-=R1.cigartuples[0][1]
if (not self.check_motif or forward_motif == 'CATG') and not R1.is_reverse: # Not reverse = forward
rpos = (R1.reference_name, r1_start)
self.set_site(site_strand=R1.is_reverse, site_chrom=rpos[0], site_pos=rpos[1])
self.set_recognized_sequence(forward_motif)
return(rpos)
elif (not self.check_motif or rev_motif == 'CATG') and R1.is_reverse:
rpos = (R1.reference_name, r1_start - 4)
self.set_site(site_strand=R1.is_reverse, site_chrom=rpos[0], site_pos=rpos[1])
self.set_recognized_sequence(rev_motif)
return(rpos)
# Sometimes the cycle is off, this is dangerous though because the cell barcode and UMI might be shifted too!
elif self.allow_cycle_shift and forward_motif.startswith( 'ATG') and not R1.is_reverse:
rpos = (R1.reference_name, R1.reference_start - 1)
self.set_site(site_strand=R1.is_reverse, site_chrom=rpos[0], site_pos=rpos[1])
self.set_recognized_sequence('ATG')
return(rpos)
elif self.allow_cycle_shift and rev_motif.startswith( 'ATG') and R1.is_reverse: # First base was trimmed or lost
rpos = (R1.reference_name, R1.reference_end - 3)
self.set_site(site_strand=R1.is_reverse, site_chrom=rpos[0], site_pos=rpos[1])
self.set_recognized_sequence('CAT')
return(rpos)
else:
if forward_motif == 'CATG' and R1.is_reverse:
self.set_rejection_reason('found CATG R1 REV exp FWD', set_qcfail=True)
elif rev_motif == 'CATG' and not R1.is_reverse:
self.set_rejection_reason('found CATG R1 FWD exp REV', set_qcfail=True)
else:
self.set_rejection_reason('no CATG', set_qcfail=True)
# Every fragment needs to have a site. Otherwise it will get lost. Use R1 start location as anchor
if R1.is_reverse:
rpos = (R1.reference_name, r1_start - 4)
else:
rpos = (R1.reference_name, r1_start)
self.set_site(site_strand=R1.is_reverse, site_chrom=rpos[0], site_pos=rpos[1], valid=False)
return None
[docs] def get_undigested_site_count(self, reference_handle):
"""
Obtain the amount of undigested sites in the span of the fragment
Parameters
----------
reference : pysam.FastaFile or similiar
Returns
-------
undigested_site_count : int
amount of undigested cut sites in the mapping span of the fragment
Raises:
-------
ValueError : when the span of the molecule is not properly defined
"""
if any(e is None for e in self.span):
raise ValueError('Span of the fragment is not well defined')
total = reference_handle.fetch(*self.span).count('CATG')
# ignore 1 CATG if it was Detected:
if self.meta.get('RZ') == 'CATG':
total -= 1
return total
[docs] def is_valid(self):
if self.qcfail:
return False
try:
if self.max_fragment_size is not None and self.get_fragment_size()>self.max_fragment_size:
self.set_rejection_reason('FS', set_qcfail=True)
return False
except TypeError:
pass
return self.found_valid_site
[docs] def get_site_location(self):
if self.site_location is not None:
return self.site_location
else:
# We need some kind of coordinate...
for read in self:
if read is not None and read.reference_name is not None and read.reference_start is not None:
return read.reference_name, read.reference_start
def __repr__(self):
site_loc = self.get_site_location()
if site_loc is None or len(site_loc)==0:
return Fragment.__repr__(
self) + f'\n\tNo restriction site found'
else:
site_def_str = f'\n\tRestriction site:{site_loc[0]}:{site_loc[1]}'
try:
return Fragment.__repr__(
self) + site_def_str
except Exception as e:
print(self.get_site_location())
raise
def __eq__(self, other):
# Make sure fragments map to the same strand, cheap comparisons
if self.match_hash != other.match_hash:
return False
# Make sure UMI's are similar enough, more expensive hamming distance
# calculation
return self.umi_eq(other)