Source code for singlecellmultiomics.fragment.fragment

import itertools
from singlecellmultiomics.utils.sequtils import hamming_distance, get_consensus_dictionaries, pick_best_base_call
import pysamiterators.iterators
import singlecellmultiomics.modularDemultiplexer.baseDemultiplexMethods
from singlecellmultiomics.utils import style_str
from singlecellmultiomics.bamProcessing import get_read_group_from_read
from singlecellmultiomics.features import FeatureAnnotatedObject

complement = str.maketrans('ATCGN', 'TAGCN')


[docs]class Fragment(): """ This class holds 1 or more reads which are derived from the same cluster Example: Generate a Fragment with a single associated read:: >>> from singlecellmultiomics.molecule import Molecule >>> from singlecellmultiomics.fragment import Fragment >>> import pysam >>> read = pysam.AlignedSegment() >>> read.reference_start = 30 >>> read.query_name = 'R1' >>> read.mapping_quality = 30 >>> read.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag >>> read.set_tag('RX','CAT') # The UMI is extracted from the RX tag >>> read.query_sequence = "CATGTATCCGGGCTTAA" >>> read.query_qualities = [30] * len(read.query_sequence) >>> read.cigarstring = f'{len(read.query_sequence)}M' >>> Fragment([read]) Fragment: sample:CELL_1 umi:CAT span:None 30-47 strand:+ has R1: yes has R2: no randomer trimmed: no Warning: Make sure the RX and SM tags of the read are set! If these are encoded in the read name, use singlecellmultiomics.universalBamTagger.customreads for conversion. """ def __init__(self, reads, assignment_radius: int = 0, umi_hamming_distance: int = 1, R1_primer_length: int = 0, R2_primer_length: int = 6, tag_definitions: list = None, max_fragment_size: int = None, mapping_dir=(False, True), max_NUC_stretch: int = None, read_group_format: int = 0, # R1 forward, R2 reverse library_name: str = None, # Overwrites the library name single_end: bool = False ): """ Initialise Fragment Args: reads( list ): list containing pysam AlignedSegment reads assignment_radius( int ): Assignment radius for molecule resolver umi_hamming_distance( int ): umi_hamming_distance distance threshold for associating multiple fragments to a molecule R1_primer_length(int): length of R1 primer, these bases are not taken into account when calculating a consensus R2_primer_length(int): length of R2 primer, these bases are not taken into account when calculating a consensus, this length is auto-detected/ overwritten when the rS tag is set tag_definitions(list): sam TagDefinitions max_fragment_size (int): When specified, fragments with a fragment size larger than the specified value are flagged as qcfail mapping_dir( tuple ): expected mapping direction of mates, True for reverse, False for forward. This parameter is used in dovetail detection read_group_format(int) : see get_read_group() Returns: html(string) : html representation of the fragment """ self.R1_primer_length = R1_primer_length self.R2_primer_length = R2_primer_length if tag_definitions is None: tag_definitions = singlecellmultiomics.modularDemultiplexer.baseDemultiplexMethods.TagDefinitions self.tag_definitions = tag_definitions self.assignment_radius = assignment_radius self.umi_hamming_distance = umi_hamming_distance self.mapping_dir = mapping_dir self.reads = reads self.strand = None self.meta = {} # dictionary of meta data self.is_mapped = None # Check for multimapping self.is_multimapped = True self.mapping_quality = 0 self.match_hash = None self.safe_span = None # wether the span of the fragment could be determined self.unsafe_trimmed = False # wether primers have been trimmed off self.random_primer_sequence = None self.max_fragment_size = max_fragment_size self.read_group_format = read_group_format self.max_NUC_stretch = max_NUC_stretch self.qcfail = False self.single_end = single_end # Span:\ self.span = [None, None, None] self.umi = None # Force R1=read1 R2=read2: for i, read in enumerate(self.reads): if read is None: continue if self.max_NUC_stretch is not None and ( self.max_NUC_stretch*'A' in read.seq or self.max_NUC_stretch*'G' in read.seq or self.max_NUC_stretch*'T' in read.seq or self.max_NUC_stretch*'C' in read.seq): self.set_rejection_reason('HomoPolymer', set_qcfail=True) self.qcfail=True break if read.is_qcfail: self.qcfail = True if read.has_tag('rS'): self.random_primer_sequence = read.get_tag('rS') self.unsafe_trimmed = True self.R2_primer_length = 0 # Set R2 primer length to zero, as the primer has been removed if not read.is_unmapped: self.is_mapped = True if read.mapping_quality > 0: self.is_multimapped = False self.mapping_quality = max( self.mapping_quality, read.mapping_quality) if i == 0: if read.is_read2: if not read.is_qcfail: raise ValueError('Supply first R1 then R2') read.is_read1 = True read.is_read2 = False elif i == 1: read.is_read1 = False read.is_read2 = True self.set_sample(library_name=library_name) self.update_umi() if self.qcfail: return self.set_strand(self.identify_strand()) self.update_span()
[docs] def write_tensor(self, chromosome=None, span_start=None, span_end=None, height=30, index_start=0, base_content_table=None, base_mismatches_table=None, base_indel_table=None, base_qual_table=None, base_clip_table=None, mask_reference_bases=None, # set with reference bases to mask (converted to N ) # (chrom,pos) reference=None, skip_missing_reads=False ): """ Write tensor representation of the fragment to supplied 2d arrays Args: chromosome( str ): chromosome to view span_start( int ): first base to show span_end( int ): last base to show height (int) : height of the tensor (reads) index_start(int): start writing tensor at this row base_content_table(np.array) : 2d array to write base contents to base_indel_table(np.array) : 2d array to write indel information to base_content_table(np.array) : 2d array to write base contents to base_content_table(np.array) : 2d array to write base contents to mask_reference_bases(set): mask reference bases in this set with a N set( (chrom,pos), ... ) reference(pysam.FastaFile) : Handle to reference file to use instead of MD tag. If None: MD tag is used. skip_missing_reads (bool) : when enabled only existing (non None) reads are added to the tensor. Use this option when mapping single-end Returns: None """ self.base_mapper = {} for i, (strand, base) in enumerate( itertools.product((True, False), 'NACTG-')): self.base_mapper[(strand, base)] = i + 2 if chromosome is None and span_start is None and span_end is None: chromosome, span_start, span_end = self.get_span() span_len = span_end - span_start reads = self last_ref = None used_reads = 0 for ri, read in enumerate(reads): if read is None: continue alignment_operations = list(itertools.chain( *([operation] * l for operation, l in read.cigartuples if operation != 2))) print(alignment_operations) # Obtain the amount of operations before the alignment start operations_before_alignment_start = 0 for query_pos, ref_pos in read.get_aligned_pairs(): if ref_pos is None: operations_before_alignment_start += 1 else: break initial_base_offset = read.reference_start - operations_before_alignment_start # Obtain row index in the tensor if skip_missing_reads: row_index = (used_reads + index_start) % height else: row_index = (ri + index_start) % height # Where in the reference are we? ref_pointer = read.reference_start alignment_started = False for operation, (cycle, query_pos, ref_pos, ref_base) in zip( alignment_operations, pysamiterators.iterators.ReadCycleIterator( read, with_seq=True, reference=reference)): # Mask locations from mask_reference_bases if mask_reference_bases is not None and ref_pos is not None and ( chromosome, ref_pos) in mask_reference_bases: ref_base = 'N' if ref_pos is None: if not alignment_started: ref_pointer = initial_base_offset + i else: ref_pointer += 1 if operation == 4: try: query_base = read.seq[query_pos] base_clip_table[row_index, ref_pointer - span_start] = self.base_mapper[(read.is_reverse, query_base)] except IndexError: pass continue ref_pointer = ref_pos if (ref_pos - span_start) < 0 or (ref_pos - span_start) > (span_len - 1): continue ref_base = ref_base.upper() if query_pos is None: query_base = '-' qual = 0 base_indel_table[row_index, ref_pos - span_start] = 1 else: query_base = read.seq[query_pos] qual = ord(read.qual[query_pos]) if ref_pos is not None: base_qual_table[row_index, ref_pos - span_start] = qual if query_base != ref_base: base_content_table[row_index, ref_pos - span_start] = self.base_mapper[(read.is_reverse, query_base)] else: base_mismatches_table[row_index, ref_pos - span_start] = 0.2 base_content_table[row_index, ref_pos - span_start] = self.base_mapper[(read.is_reverse, query_base)] used_reads += 1 if skip_missing_reads: return used_reads + index_start + 1 else: return ri + index_start + 1
[docs] def get_read_group(self, with_attr_dict=False) -> str: r = None for read in self.reads: if read is None: continue r = get_read_group_from_read(read, format=self.read_group_format, with_attr_dict=with_attr_dict) break return r
[docs] def set_duplicate(self, is_duplicate): """Define this fragment as duplicate, sets the corresponding bam bit flag Args: value(bool) : is_duplicate """ for read in self.reads: if read is not None: read.is_duplicate = is_duplicate
[docs] def get_fragment_size(self): return abs(self.span[2] - self.span[1])
[docs] def write_tags(self): self.set_meta('MQ', self.mapping_quality) self.set_meta('MM', self.is_multimapped) self.set_meta('RG', self.get_read_group()) if self.has_valid_span(): # Write fragment size: if self.safe_span: self.set_meta('fS', self.get_fragment_size()) self.set_meta('fe', self.span[1]) self.set_meta('fs', self.span[2]) else: self.remove_meta('fS') else: self.set_rejection_reason('FS', set_qcfail=True) # Set qcfail bit when the fragment is not valid if not self.is_valid(): for read in self: if read is not None: read.is_qcfail = True
[docs] def write_pysam(self, pysam_handle): """Write all associated reads to the target file Args: target_file (pysam.AlignmentFile) : Target file """ self.write_tags() for read in self: if read is not None: pysam_handle.write(read)
[docs] def identify_strand(self): # If R2 is rev complement: if self.get_R1() is not None: # verify if the read has been mapped: if self.get_R1().is_unmapped: return None return self.get_R1().is_reverse elif self.get_R2() is not None: # verify if the read has been mapped: if self.get_R2().is_unmapped: return None return not self.get_R2().is_reverse return None
[docs] def get_random_primer_hash(self): """Obtain hash describing the random primer this assumes the random primer is on the end of R2 and has a length of self.R2_primer_length When the rS tag is set, the value of this tag is used as random primer sequence Returns None,None when the random primer cannot be described Returns: reference_name (str) or None reference_start (int) : Int or None sequence (str) : Int or None """ R2 = self.get_R2() if R2 is None or R2.query_sequence is None: return None, None, None if self.R2_primer_length == 0 and self.random_primer_sequence is None: return None, None, None # The read was not mapped if R2.is_unmapped: # Guess the orientation does not matter if self.random_primer_sequence is not None: return None, None, self.random_primer_sequence return None, None, R2.query_sequence[:self.R2_primer_length] if R2.is_reverse: global complement if self.random_primer_sequence is not None: return R2.reference_name, R2.reference_end, self.random_primer_sequence return(R2.reference_name, R2.reference_end, R2.query_sequence[-self.R2_primer_length:][::-1].translate(complement)) else: if self.random_primer_sequence is not None: return R2.reference_name, R2.reference_start, self.random_primer_sequence return(R2.reference_name, R2.reference_start, R2.query_sequence[:self.R2_primer_length]) raise ValueError()
[docs] def remove_meta(self,key): for read in self: if read is not None: if read.has_tag(key): read.set_tag(key,None)
[docs] def set_meta(self, key, value, as_set=False): self.meta[key] = value for read in self: if read is not None: if as_set and read.has_tag(key): data = set(read.get_tag(key).split(',')) data.add(value) read.set_tag(key, ','.join(sorted(list(data)))) else: read.set_tag(key, value)
[docs] def get_R1(self): """ Obtain the AlignedSegment of read 1 of the fragment Returns: R1 (pysam.AlignedSegment) : Read 1 of the fragment, returns None when R1 is not mapped """ if len(self.reads) == 0: raise IndexError('The fragment has no associated reads') return self.reads[0]
[docs] def get_R2(self): """ Obtain the AlignedSegment of read 2 of the fragment Returns: R1 (pysam.AlignedSegment) : Read 2 of the fragment, returns None when R2 is not mapped """ if len(self.reads) < 2: raise IndexError('The fragment has no associated R2') return self.reads[1]
[docs] def has_R1(self): """ Check if the fragment has an associated read 1 Returns: has_r1 (bool) """ if len(self.reads) == 0: return False return self.reads[0] is not None
[docs] def has_R2(self): """ Check if the fragment has an associated read 2 Returns: has_r2 (bool) """ if len(self.reads) < 2: return False return self.reads[1] is not None
@property def R1(self): return self.reads[0] @property def R2(self): return self.reads[1]
[docs] def get_consensus(self, only_include_refbase: str = None, dove_safe: bool = False, **get_consensus_dictionaries_kwargs) -> dict: """ a dictionary of (reference_pos) : (qbase, quality, reference_base) tuples Args: only_include_refbase(str) : Only report bases aligned to this reference base, uppercase only dove_safe(bool) : Only report bases supported within R1 and R2 start and end coordinates Returns: consensus(dict) : {reference_position: (qbase, quality) """ r1_consensus, r2_consensus = get_consensus_dictionaries( self.R1, self.R2, only_include_refbase=only_include_refbase, dove_safe=dove_safe, **get_consensus_dictionaries_kwargs) return { ref_pos:pick_best_base_call( r1_consensus.get(ref_pos) , r2_consensus.get(ref_pos) ) for ref_pos in set(r1_consensus.keys()).union(set(r2_consensus.keys())) }
@property def estimated_length(self) -> int: """ Obtain the estimated size of the fragment, returns None when estimation is not possible Takes into account removed bases (R2_primer_length attribute) Assumes inwards sequencing orientation, except when self.single_end is set """ if self.single_end: if self[0] is None: return None return self[0].reference_end - self[0].reference_start if self.has_R1() and self.has_R2(): contig = self.R1.reference_name if self.R1.is_reverse and not self.R2.is_reverse: ## rp |----- R2 ----> <--- R1 ----| ## |>----------DISTANCE------------<| start, end = self.R2.reference_start - self.R2_primer_length, self.R1.reference_end if start<end: return end - start else: return None elif not self.R1.is_reverse and self.R2.is_reverse: ## |----- R1 ----> <--- R2 ----| rp ## |>----------DISTANCE------------<| start, end = self.R1.reference_start, self.R2.reference_end + self.R2_primer_length if start<end: return end - start else: return None else: return None return None
[docs] def update_span(self): """ Update the span (the location the fragment maps to) stored in Fragment The span is a single stretch of coordinates on a single contig. The contig is determined by the reference_name assocated to the first mapped read in self.reads This calculation assumes the reads are sequenced inwards and dove-tails of the molecule cannot be trusted """ contig = None start = None end = None if self.has_R1() and self.has_R2() and \ self.R1.reference_start is not None and self.R1.reference_end is not None and \ self.R2.reference_start is not None and self.R2.reference_end is not None : contig = self.R1.reference_name if self.R1.is_reverse and not self.R2.is_reverse: start, end = self.R2.reference_start, self.R1.reference_end self.safe_span = True elif not self.R1.is_reverse and self.R2.is_reverse: start, end = self.R1.reference_start, self.R2.reference_end self.safe_span = True else: start = min(self.R1.reference_start, self.R2.reference_start) end = max(self.R1.reference_start, self.R2.reference_start) self.safe_span = False elif self.has_R1() and self.R1.reference_start is not None and self.R1.reference_end is not None : contig = self.R1.reference_name start, end = self.R1.reference_start, self.R1.reference_end self.safe_span = False elif self.has_R2() and self.R2.reference_start is not None and self.R2.reference_end is not None : contig = self.R2.reference_name start, end = self.R2.reference_start, self.R2.reference_end self.safe_span = False else: # Its Sometimes possible that no cigar is set for the alignment, only a start coordinate for read in self: if read is None: continue if len(read.cigar)!=0: raise NotImplementedError("Non implemented span") if read.reference_start is not None: start,end = read.reference_start, read.reference_start contig = read.reference_name else: raise NotImplementedError("Non implemented span, undefined alignment, and not start coordinate") self.span = (contig, start, end)
@property def aligned_length(self): return self.span[2]-self.span[1]
[docs] def get_span(self) -> tuple: return self.span
[docs] def has_valid_span(self) -> bool: # Check if the span could be calculated: defined_span = not any([x is None for x in self.get_span()]) if not defined_span: return False if self.max_fragment_size is not None and self.get_fragment_size()>self.max_fragment_size: return False return True
[docs] def set_sample(self, sample: str = None, library_name: str = None): """Force sample name or obtain sample name from associated reads""" if sample is not None: self.sample = sample else: for read in self.reads: if read is not None and read.has_tag('SM'): self.sample = read.get_tag('SM') break if library_name is not None: sample = self.sample.split('_', 1)[-1] self.sample = library_name+'_'+sample for read in self.reads: if read is not None: read.set_tag('SM', self.sample) read.set_tag('LY', library_name)
[docs] def get_sample(self) -> str: """ Obtain the sample name associated with the fragment The sample name is extracted from the SM tag of any of the associated reads. Returns: sample name (str) """ return self.sample
[docs] def __len__(self): """Obtain the amount of associated reads to the fragment Returns: assocoiated reads (int) """ return(sum(read is not None for read in self.reads))
[docs] def set_strand(self, strand: bool): """Set mapping strand Args: strand (bool) : False for Forward, True for reverse """ self.strand = strand
[docs] def get_strand(self): """Obtain strand Returns: strand (bool) : False for Forward, True for reverse """ return self.strand
[docs] def update_umi(self): """ Extract umi from 'RX' tag and store the UMI in the Fragment object """ for read in self.reads: if read is not None and read.has_tag('RX'): self.umi = read.get_tag('RX')
[docs] def get_umi(self): """ Get the UMI sequence associated to this fragment Returns: umi(str) """ return self.umi
def __iter__(self): return iter(self.reads)
[docs] def umi_eq(self, other): """ Hamming distance measurement to another Fragment or Molecule, Returns : is_close (bool) : returns True when the hamming distance between the two objects <= umi_hamming_distance Example: >>> from singlecellmultiomics.molecule import Molecule >>> from singlecellmultiomics.fragment import Fragment >>> import pysam >>> # Create reads (read_A, and read_B), they both belong to the same >>> # cell and have 1 hamming distance between their UMI's >>> read_A = pysam.AlignedSegment() >>> read_A.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag >>> read_A.set_tag('RX','CAT') # The UMI is extracted from the RX tag >>> read_B = pysam.AlignedSegment() >>> read_B.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag >>> read_B.set_tag('RX','CAG') # The UMI is extracted from the RX tag >>> # Create fragment objects for read_A and B: >>> frag_A = Fragment([read_A],umi_hamming_distance=0) >>> frag_B = Fragment([read_B],umi_hamming_distance=0) >>> frag_A.umi_eq(frag_B) # This returns False, the distance is 1, which is higher than 0 (umi_hamming_distance) False >>> frag_A = Fragment([read_A],umi_hamming_distance=1) >>> frag_B = Fragment([read_B],umi_hamming_distance=1) >>> frag_A.umi_eq(frag_B) # This returns True, the distance is 1, which is the (umi_hamming_distance) True """ if self.umi == other.umi: return True if self.umi_hamming_distance == 0: return False else: if len(self.umi)!=len(other.umi): return False return hamming_distance( self.umi, other.umi) <= self.umi_hamming_distance
[docs] def __eq__(self, other): # other can also be a Molecule! # Make sure fragments map to the same strand, cheap comparisons """ Check equivalence between two Fragments or Fragment and Molecule. Args: other (Fragment or Molecule) : object to compare against Returns is_eq (bool) : True when the other object is (likely) derived from the same molecule Example: >>> from singlecellmultiomics.molecule import Molecule >>> from singlecellmultiomics.fragment import Fragment >>> import pysam >>> # Create sam file to write some reads to: >>> test_sam = pysam.AlignmentFile('test.sam','w',reference_names=['chr1','chr2'],reference_lengths=[1000,1000]) >>> read_A = pysam.AlignedSegment(test_sam.header) >>> read_A.set_tag('SM','CELL_1') # The sample to which the sample belongs is extracted from the SM tag >>> read_A.set_tag('RX','CAT') # The UMI is extracted from the RX tag >>> # By default the molecule assignment is done based on the mapping location of read 1: >>> read_A.reference_name = 'chr1' >>> read_A.reference_start = 100 >>> read_A.query_sequence = 'ATCGGG' >>> read_A.cigarstring = '6M' >>> read_B = pysam.AlignedSegment(test_sam.header) >>> read_B.set_tag('SM','CELL_1') >>> read_B.set_tag('RX','CAT') >>> read_B.reference_start = 100 >>> read_B.query_sequence = 'ATCGG' >>> read_B.cigarstring = '5M' >>> frag_A = Fragment([read_A],umi_hamming_distance=0) >>> frag_A Fragment: sample:CELL_1 umi:CAT span:chr1 100-106 strand:+ has R1: yes has R2: no randomer trimmed: no >>> frag_B = Fragment([read_B],umi_hamming_distance=0) >>> frag_A == frag_B True # Fragment A and fragment B belong to the same molecule, # the UMI is identical, the starting position of R1 is identical and # the sample name matches When we move one of the reads, the Fragments are not equivalent any more Example: >>> read_B.reference_start = 150 >>> frag_B = Fragment([read_B],umi_hamming_distance=0) >>> frag_A == frag_B False Except if the difference <= the assignment_radius Example: >>> read_B.reference_start = 150 >>> read_A.reference_start = 100 >>> frag_B = Fragment([read_B],assignment_radius=300) >>> frag_A = Fragment([read_A],assignment_radius=300) >>> frag_A == frag_B True When the UMI's are too far apart, the eq function returns `False` Example: >>> read_B.reference_start = 100 >>> read_A.reference_start = 100 >>> read_A.set_tag('RX','GGG') >>> frag_B = Fragment([read_B]) >>> frag_A = Fragment([read_A]) >>> frag_A == frag_B False When the sample of the Fragments are not identical, the eq function returns `False` Example: >>> read_B.reference_start = 100 >>> read_A.reference_start = 100 >>> read_A.set_tag('RX','AAA') >>> read_B.set_tag('RX','AAA') >>> read_B.set_tag('SM', 'CELL_2' ) >>> frag_B = Fragment([read_B]) >>> frag_A = Fragment([read_A]) >>> frag_A == frag_B False """ if self.sample != other.sample: return False if self.strand != other.strand: return False if not self.has_valid_span() or not other.has_valid_span(): return False if min(abs(self.span[1] - other.span[1]), abs(self.span[2] - other.span[2])) > self.assignment_radius: return False # Make sure UMI's are similar enough, more expensive hamming distance # calculation return self.umi_eq(other)
[docs] def __getitem__(self, index): """ Get a read from the fragment Args: index( int ): 0 : Read 1 1: Read 2 .. """ return self.reads[index]
[docs] def is_valid(self): if self.qcfail: return False return self.has_valid_span()
[docs] def get_strand_repr(self): s = self.get_strand() if s is None: return '?' if s: return '-' else: return '+'
def __repr__(self): rep = f"""Fragment: sample:{self.get_sample()} umi:{self.get_umi()} span:{('%s %s-%s' % self.get_span() if self.has_valid_span() else 'no span')} strand:{self.get_strand_repr()} has R1: {"yes" if self.has_R1() else "no"} has R2: {"yes" if self.has_R2() else "no"} randomer trimmed: {"yes" if self.unsafe_trimmed else "no"} """ try: rep += '\n\t'.join([f'{key}:{str(value)}' for key, value in self.meta.items()]) except Exception as e: print(self.meta) raise return rep
[docs] def get_html( self, chromosome=None, span_start=None, span_end=None, show_read1=None, show_read2=None): """ Get HTML representation of the fragment Args: chromosome( str ): chromosome to view span_start( int ): first base to show span_end( int ): last base to show show_read1(bool): show read1 show_read2(bool): show read2 Returns: html(string) : html representation of the fragment """ if chromosome is None and span_start is None and span_end is None: chromosome, span_start, span_end = self.get_span() span_len = abs(span_end - span_start) visualized_frag = ['.'] * span_len if show_read1 is None and show_read2 is None: reads = self else: reads = [] if show_read1: reads.append(self.get_R1()) if show_read2: reads.append(self.get_R2()) for read in reads: if read is None: continue # for cycle, query_pos, ref_pos, ref_base in # pysamiterators.iterators.ReadCycleIterator(read,with_seq=True): for query_pos, ref_pos, ref_base in read.get_aligned_pairs( with_seq=True, matches_only=True): if ref_pos is None or ref_pos < span_start or ref_pos > span_end: continue ref_base = ref_base.upper() if query_pos is None: query_base = '-' else: query_base = read.seq[query_pos] if ref_pos is not None: if query_base != ref_base: v = style_str(query_base, color='red', weight=500) else: v = query_base try: visualized_frag[ref_pos - span_start] = v except IndexError: pass # the alignment is outside the requested view return ''.join(visualized_frag)
[docs] def set_rejection_reason(self, reason, set_qcfail=False): self.set_meta('RR', reason, as_set=True) if set_qcfail: for read in self: if read is not None: read.is_qcfail = True
[docs] def set_recognized_sequence(self, seq): self.set_meta('RZ', seq)
[docs]class SingleEndTranscriptFragment(Fragment, FeatureAnnotatedObject): def __init__(self, reads, features, assignment_radius=0, stranded=None, capture_locations=False, auto_set_intron_exon_features=True,**kwargs): Fragment.__init__(self, reads, assignment_radius=assignment_radius, **kwargs) FeatureAnnotatedObject.__init__(self, features=features, stranded=stranded, capture_locations=capture_locations, auto_set_intron_exon_features=auto_set_intron_exon_features ) self.match_hash = (self.sample, self.span[0], self.strand)
[docs] def write_tags(self): # First decide on what values to write FeatureAnnotatedObject.write_tags(self) # Additional fragment data Fragment.write_tags(self)
[docs] def is_valid(self): if len(self.genes)==0: return False return True
[docs] def annotate(self): for read in self: if read is None: continue read_strand = read.is_reverse if self.stranded is not None and self.stranded==False: read_strand=not read_strand strand = ('-' if read_strand else '+') read.set_tag('mr',strand) for start, end in read.get_blocks(): for hit in self.features.findFeaturesBetween( chromosome=read.reference_name, sampleStart=start, sampleEnd=end, strand=(None if self.stranded is None else strand)) : hit_start, hit_end, hit_id, hit_strand, hit_ids = hit self.hits[hit_ids].add( (read.reference_name, (hit_start, hit_end))) if self.capture_locations: if not hit_id in self.feature_locations: self.feature_locations[hit_id] = [] self.feature_locations[hit_id].append( (hit_start, hit_end, hit_strand))
[docs] def get_site_location(self): return self.span[0], self.start
def __eq__(self, other): # Check if the other fragment/molecule is aligned to the same gene if self.match_hash != other.match_hash: return False if len(self.genes.intersection(other.genes)): return self.umi_eq(other) return False
[docs]class FeatureCountsSingleEndFragment(Fragment): """ Class for fragments annotated with featureCounts Extracts annotated gene from the XT tag. Deduplicates using XT tag and UMI Reads without XT tag are flagged as invalid """ def __init__(self, reads, R1_primer_length=4, R2_primer_length=6, assignment_radius=100_000, umi_hamming_distance=1, invert_strand=False, **kwargs ): Fragment.__init__(self, reads, assignment_radius=assignment_radius, R1_primer_length=R1_primer_length, R2_primer_length=R2_primer_length, umi_hamming_distance=umi_hamming_distance,**kwargs) self.strand = None self.gene = None self.identify_gene() if self.is_valid(): self.match_hash = (self.strand, self.gene, self.sample) else: self.match_hash = None def __eq__(self, other): # Make sure fragments map to the same strand, cheap comparisons if self.match_hash != other.match_hash: return False # Make sure UMI's are similar enough, more expensive hamming distance # calculation return self.umi_eq(other)
[docs] def is_valid(self): return self.gene is not None
[docs] def identify_gene(self): self.valid = False for read in self.reads: if read is not None and read.has_tag('XT'): self.gene = read.get_tag('XT') self.valid = True return self.gene
[docs]class FeatureCountsFullLengthFragment(FeatureCountsSingleEndFragment): """ Class for fragments annotated with featureCounts, with multiple reads covering a gene Extracts annotated gene from the XT tag. Deduplicates using XT tag and UMI Reads without XT tag are flagged as invalid """ def __init__(self, reads, R1_primer_length=4, R2_primer_length=6, assignment_radius=10_000, umi_hamming_distance=1, invert_strand=False, **kwargs ): FeatureCountsSingleEndFragment.__init__(self, reads, assignment_radius=assignment_radius, R1_primer_length=R1_primer_length, R2_primer_length=R2_primer_length, umi_hamming_distance=umi_hamming_distance, **kwargs ) def __eq__(self, other): # Make sure fragments map to the same strand, cheap comparisons if self.match_hash != other.match_hash: return False #Calculate distance if min(abs(self.span[1] - other.span[1]), abs(self.span[2] - other.span[2])) > self.assignment_radius: return False # Make sure UMI's are similar enough, more expensive hamming distance # calculation return self.umi_eq(other)
[docs]class FragmentWithoutPosition(Fragment): """ Fragment without a specific location on a contig"""
[docs] def get_site_location(self): if self.has_valid_span(): return self.span[0], 0 else: return '*', 0
[docs]class FragmentStartPosition(Fragment): """ Fragment without a specific location on a contig"""
[docs] def get_site_location(self): for read in self: if read is not None and not read.is_unmapped: return read.reference_name, read.reference_start return None
[docs]class FragmentWithoutUMI(Fragment): """ Use this class when no UMI information is available """ def __init__(self, reads, **kwargs): Fragment.__init__(self, reads, **kwargs) # remove the set_umi function
[docs] def set_umi(self, **kwargs): pass
# Replace the equality function def __eq__(self, other): # other can also be a Molecule! # Make sure fragments map to the same strand, cheap comparisons if self.sample != other.sample: return False if self.strand != other.strand: return False if min(abs(self.span[1] - other.span[1]), abs(self.span[2] - other.span[2])) > self.assignment_radius: return False # Sample matches and starting position is within the defined span # radius return True