import collections
[docs]class RangeCache():
def __init__(self, maxRange=1200):
self.d = dict() # Chrom -> pos -> value
self.current = 0
self.maxRange = maxRange
self.freedMemory = 0
# Obtain record within a radius (center, Radius) , returns first (closest)
# instance it finds
[docs] def getWithinRange(self, chrom, center, radius):
if chrom not in self.d:
return None
for i in range(0, radius + 1):
r = self.get(chrom, i + center)
if r is not None:
return r
r = self.get(chrom, center - i)
if r is not None:
return r
[docs] def get(self, chrom, pos):
if chrom not in self.d:
return None
if pos not in self.d[chrom]:
return None
return self.d[chrom][pos]
[docs] def set(self, chrom, pos, data):
self.purge(chrom, pos)
if chrom not in self.d:
self.d[chrom] = {}
self.d[chrom][pos] = data
[docs] def purge(self, chrom, pos):
if chrom in self.d:
drop = []
for position in self.d[chrom]:
if abs(position - pos) > self.maxRange:
drop.append(position)
for d in drop:
if d in self.d[chrom]:
del self.d[chrom][d]
self.freedMemory += 1
# Remove all data on other chromosomes:
drop = [x for x in self.d if x != chrom]
for d in drop:
del self.d[d]
self.freedMemory += 1
def __len__(self):
return sum([len(self.d[chrom]) for chrom in self.d])
# A digest function should fullfill the following conditions:
# Accept : a read pair of R1, and R2 optional
# a reference genome handle (optional)
# an allele tool handle (optional)
[docs]class DigestFlagger():
def __init__(
self,
reference=None,
alleleResolver=None,
moleculeRadius=0,
verbose=False,
**kwargs):
self.reference = reference
self.alleleResolver = alleleResolver
self.verbose = verbose
self.siteCoordinateTag = 'DS'
self.oversequencedMoleculeTag = 'RC'
self.recognitionSequenceTag = 'RZ'
self.sourceTypeTag = 'DT' # DataType
self.alleleTag = 'DA'
self.alleleSetTag = 'AA'
self.strandTag = 'RS'
self.umiTag = 'RX'
self.sampleTag = 'SM'
self.ligationTag = 'LI'
self.rejectionTag = 'RR'
self.randomPrimerPosTag = 'rP'
self.randomPrimerPosSeq = 'rS'
self.fragmentSizeTag = 'fS'
self.fragmentEndTag = 'fe'
self.fragmentStartTag = 'fs'
self.cacheWrites = 0
self.cachePurgeEvery = 10_000 # Clean the cache every N writes
self.moleculeRadius = moleculeRadius
# This hold which sites we saw to count oversequencing
# sample -> (chrom,allele,strand,..) -> pos -> seen
self.observedSites = collections.defaultdict(RangeCache)
def __repr__(self):
return f'Tagger with {self.cacheWrites} cache writes, total consumption: {self.getTotalConsumption()} '
[docs] def getTotalConsumption(self):
return sum([len(self.observedSites[sample])
for sample in self.observedSites])
[docs] def setRejectionReason(self, read, reason):
self.appendTag(read, self.rejectionTag, reason)
[docs] def setLigationSite(self, read, site):
read.set_tag(self.ligationTag, site)
[docs] def setSiteCoordinate(self, read, coordinate):
self.appendTag(read, self.siteCoordinateTag, coordinate)
# 1 if first seen 2, second, -1 if None
[docs] def setSiteOversequencing(self, read, moleculeIndex=1):
read.set_tag(self.oversequencedMoleculeTag, -
1 if moleculeIndex is None else moleculeIndex)
# Decribe as string and set tag:
if moleculeIndex > 1:
read.is_duplicate = True
[docs] def setFragmentSize(self, read, size):
read.set_tag(self.fragmentSizeTag, size)
[docs] def setFragmentTrust(self, read, start, end):
read.set_tag(self.fragmentStartTag, start)
read.set_tag(self.fragmentEndTag, end)
[docs] def setAllele(self, read, allele): # 1 if first seen 2, second, -1 if None
read.set_tag(self.alleleTag, allele)
[docs] def setRecognizedSequence(self, read, sequence):
self.appendTag(read, self.recognitionSequenceTag, sequence)
[docs] def setSource(self, read, source):
self.appendTag(read, self.sourceTypeTag, source)
[docs] def setStrand(self, read, strand):
read.set_tag(self.strandTag, strand)
[docs] def setRandomPrimer(self, R1, R2, hstart, hseq):
if R1 is not None:
R1.set_tag(self.randomPrimerPosSeq, hseq)
R1.set_tag(self.randomPrimerPosTag, hstart)
if R2 is not None:
R2.set_tag(self.randomPrimerPosSeq, hseq)
R2.set_tag(self.randomPrimerPosTag, hstart)
[docs] def appendTag(self, read, tag, value):
if read is None:
return
if not read.has_tag(tag):
read.set_tag(tag, value)
else:
read.set_tag(tag, f'{read.get_tag(tag)},{value}')
[docs] def addAlleleInfo(self, reads):
allele = None if self.alleleResolver is None else self.alleleResolver.getAllele(
reads)
if self.verbose:
print(allele, reads)
if allele is not None and len(allele) > 0:
allele = ','.join(sorted(list(allele)))
for read in reads:
if read is not None:
self.setAllele(read, allele)
return allele
# siteInfo describes the site as tuple: (allele, recognizedStrand, umiSequence, ..)
# the site info can have arbitrary length, with as only requirement that
# it should be hashable
[docs] def increaseAndRecordOversequencing(self, sample, chrom, pos, siteInfo=()):
cutSite = (chrom, pos)
self.cacheWrites += 1
if self.cacheWrites > self.cachePurgeEvery:
for sample in self.observedSites:
self.observedSites[sample].purge(*cutSite)
self.cacheWrites = 0
if self.moleculeRadius != 0:
current = self.observedSites[sample].getWithinRange(
*cutSite, radius=self.moleculeRadius)
else:
current = self.observedSites[sample].get(*cutSite)
# There are no matching molecules nearby:
if current is None:
current = collections.Counter()
self.observedSites[sample].set(*cutSite, current)
UMI = siteInfo
current[UMI] += 1
# print(current)
return current[UMI]
# ADD THIS FUNCTION:
# def digest( self, reads )
#