Source code for singlecellmultiomics.barcodeFileParser.barcodeFileParser

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import glob
import logging
from colorama import Fore
from colorama import Back
from colorama import Style
import os
import collections
import itertools
import gzip

#logging.basicConfig(level=logging.DEBUG)
# http://codereview.stackexchange.com/questions/88912/create-a-list-of-all-strings-within-hamming-distance-of-a-reference-string-with
[docs]def hamming_circle(s, n, alphabet): for positions in itertools.combinations(range(len(s)), n): for replacements in itertools.product( range(len(alphabet) - 1), repeat=n): cousin = list(s) for p, r in zip(positions, replacements): if cousin[p] == alphabet[r]: cousin[p] = alphabet[-1] else: cousin[p] = alphabet[r] yield ''.join(cousin)
[docs]class BarcodeParser():
[docs] def path_to_barcode_alias(self, path): barcodeFileAlias = os.path.splitext( os.path.basename(path))[0].replace('.gz','').replace('.bc','') return barcodeFileAlias
[docs] def parse_pending_barcode_file_of_alias(self, alias): if not alias in self.pending_files: raise ValueError(f"trying to load {alias}, but barcode file not found") logging.info(f"Loading promised barcode file {alias}") self.parse_barcode_file( self.pending_files[alias] ) logging.info(f"Performing hamming extension for {alias}") self.expand(self.hammingDistanceExpansion, alias=alias) del self.pending_files[alias]
[docs] def parse_barcode_file(self, barcodeFile): barcodeFileAlias = self.path_to_barcode_alias(barcodeFile) # Decide the file type (index first or name first) indexNotFirst = False with gzip.open(barcodeFile,'rt') if barcodeFile.endswith('.gz') else open(barcodeFile) as f : for i, line in enumerate(f): parts = line.strip().split() if len(parts) == 1 and ' ' in line: parts = line.strip().split(' ') if len(parts) == 1: pass elif len(parts) == 2: indexFirst = not all((c in 'ATCGNX') for c in parts[0]) if not indexFirst: indexNotFirst = True # print(parts[1],indexFirst) nospec=False i=0 with gzip.open(barcodeFile,'rt') if barcodeFile.endswith('.gz') else open(barcodeFile) as f : for i, line in enumerate(f): parts = line.strip().split() if len(parts) == 1 and ' ' in line: parts = line.strip().split(' ') if len(parts) == 1: self.addBarcode( barcodeFileAlias, barcode=parts[0], index=i+1) #self.barcodes[barcodeFileAlias][parts[0]] = i if not nospec: # only show this once: logging.info( f"\t{parts[0]}:{i} (No index specified in file)") nospec=True elif len(parts) == 2: if indexNotFirst: barcode, index = parts else: index, barcode = parts #self.barcodes[barcodeFileAlias][barcode] = index # When the index is only digits, convert to integer try: if int(index)==int(str(int(index))): index = int(index) else: pass except Exception as e: pass self.addBarcode( barcodeFileAlias, barcode=barcode, index=index) if not nospec: # only show this once: logging.info( f"\t{barcode}:{index} (index was specified in file, {'index' if indexFirst else 'barcode'} on first column)") nospec=True else: e = f'The barcode file {barcodeFile} contains more than two columns. Failed to parse!' logging.error(e) raise ValueError(e) logging.info(f'done loading {i} barcodes')
def __init__( self, barcodeDirectory='barcodes', hammingDistanceExpansion=0, spaceFill=False, lazyLoad=None # these aliases wiill not be loaded until requested, '*' matches all files ): self.hammingDistanceExpansion = hammingDistanceExpansion barcodeDirectory = os.path.join( os.path.dirname( os.path.realpath(__file__)), barcodeDirectory) barcode_files = list(glob.glob(f'{barcodeDirectory}/*')) self.spaceFill = spaceFill self.hammingDistanceExpansion = hammingDistanceExpansion self.barcodes = collections.defaultdict( dict) # alias -> barcode -> index # alias -> barcode -> (index, hammingDistance) self.extendedBarcodes = collections.defaultdict(dict) self.pending_files = dict() for barcodeFile in barcode_files: barcodeFileAlias = self.path_to_barcode_alias(barcodeFile) if lazyLoad is not None and (barcodeFileAlias in lazyLoad or lazyLoad =='*'): logging.info(f"Lazy loading {barcodeFile}, alias {barcodeFileAlias}") self.pending_files[barcodeFileAlias] = barcodeFile continue logging.info(f"Parsing {barcodeFile}, alias {barcodeFileAlias}") self.parse_barcode_file(barcodeFile) if hammingDistanceExpansion > 0: self.expand(hammingDistanceExpansion, alias=barcodeFileAlias)
[docs] def getTargetCount(self, barcodeFileAlias): return(len(self.barcodes[barcodeFileAlias]), len(self.extendedBarcodes[barcodeFileAlias]))
[docs] def expand( self, hammingDistanceExpansion, alias, reportCollisions=True, spaceFill=None): barcodes = self.barcodes[alias] # hammingBarcode -> ( ( distance,origin) ) hammingSpace = collections.defaultdict(list) for barcode in barcodes: for hammingDistance in range(0, hammingDistanceExpansion + 1): for hammingInstance in hamming_circle( barcode, hammingDistance, 'ACTGN'): hammingSpace[hammingInstance].append( (hammingDistance, barcode)) # Resolve all for hammingBarcode in hammingSpace: # Check if there is a closest origin: sortedDistances = sorted(hammingSpace[hammingBarcode]) if len(sortedDistances) > 1 and ( sortedDistances[0][0] == sortedDistances[1][0]): # We cannot resolve this, two or more origins are at the same distance: #print('Cannot resolve %s' % hammingBarcode) continue hammingDistance, origin = sortedDistances[0] self.addBarcode( alias, barcode=hammingBarcode, index=self.barcodes[alias][origin], hammingDistance=hammingDistance, originBarcode=origin)
[docs] def addBarcode( self, barcodeFileAlias, barcode, index, hammingDistance=0, originBarcode=None): if hammingDistance == 0: self.barcodes[barcodeFileAlias][barcode] = index else: if originBarcode is None: raise ValueError() self.extendedBarcodes[barcodeFileAlias][barcode] = ( index, originBarcode, hammingDistance)
# get index and hamming distance to barcode returns none if not Available
[docs] def getIndexCorrectedBarcodeAndHammingDistance(self, barcode, alias, try_lazy_load_pending=True): # Check if the alias still needs to be loaded: if barcode in self.barcodes[alias]: return (self.barcodes[alias][barcode], barcode, 0) if barcode in self.extendedBarcodes[alias]: return self.extendedBarcodes[alias][barcode] if alias in self.pending_files: if not try_lazy_load_pending: raise RecursionError() self.parse_pending_barcode_file_of_alias(alias) return self.getIndexCorrectedBarcodeAndHammingDistance(barcode, alias, try_lazy_load_pending=False) return (None, None, None)
[docs] def list(self, showBarcodes=5): # showBarcodes=None show all for barcodeAlias, mapping in self.barcodes.items(): print( f'{len(mapping)} barcodes{Style.DIM} obtained from {Style.RESET_ALL}{barcodeAlias}') if len(mapping): for bcId in list(mapping.keys())[:showBarcodes]: try: print(('%s%s%s%s%s%s' % (Fore.GREEN, bcId, Fore.WHITE, '→', mapping[bcId], Style.RESET_ALL))) except Exception as e: print(('%s%s%s%s%s%s' % (Fore.GREEN, bcId, Fore.WHITE, '->', mapping[bcId], Style.RESET_ALL))) if showBarcodes is not None and len(mapping) > showBarcodes: print( f'{Style.DIM} %s more ...\n{Style.RESET_ALL}' % (len(mapping) - showBarcodes))
[docs] def getBarcodeMapping(self): return self.barcodes