#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import glob
import logging
from colorama import Fore
from colorama import Back
from colorama import Style
import os
import collections
import itertools
# http://codereview.stackexchange.com/questions/88912/create-a-list-of-all-strings-within-hamming-distance-of-a-reference-string-with
[docs]def hamming_circle(s, n, alphabet):
for positions in itertools.combinations(range(len(s)), n):
for replacements in itertools.product(
range(len(alphabet) - 1), repeat=n):
cousin = list(s)
for p, r in zip(positions, replacements):
if cousin[p] == alphabet[r]:
cousin[p] = alphabet[-1]
else:
cousin[p] = alphabet[r]
yield ''.join(cousin)
[docs]class BarcodeParser():
def __init__(
self,
barcodeDirectory='barcodes',
hammingDistanceExpansion=0,
spaceFill=False):
barcodeDirectory = os.path.join(
os.path.dirname(
os.path.realpath(__file__)),
barcodeDirectory)
barcode_files = list(glob.glob(f'{barcodeDirectory}/*'))
self.spaceFill = spaceFill
self.hammingDistanceExpansion = hammingDistanceExpansion
self.barcodes = collections.defaultdict(
dict) # alias -> barcode -> index
# alias -> barcode -> (index, hammingDistance)
self.extendedBarcodes = collections.defaultdict(dict)
for barcodeFile in barcode_files:
barcodeFileAlias = os.path.splitext(
os.path.basename(barcodeFile))[0]
logging.info(f"Parsing {barcodeFile}, alias {barcodeFileAlias}")
# Decide the file type (index first or name first)
indexNotFirst = False
with open(barcodeFile) as f:
for i, line in enumerate(f):
parts = line.strip().split()
if len(parts) == 1 and ' ' in line:
parts = line.strip().split(' ')
if len(parts) == 1:
pass
elif len(parts) == 2:
indexFirst = not all((c in 'ATCGNX') for c in parts[0])
if not indexFirst:
indexNotFirst = True
# print(parts[1],indexFirst)
with open(barcodeFile) as f:
for i, line in enumerate(f):
parts = line.strip().split()
if len(parts) == 1 and ' ' in line:
parts = line.strip().split(' ')
if len(parts) == 1:
self.addBarcode(
barcodeFileAlias, barcode=parts[0], index=i)
#self.barcodes[barcodeFileAlias][parts[0]] = i
logging.info(
f"\t{parts[0]}:{i} (No index specified in file)")
elif len(parts) == 2:
if indexNotFirst:
barcode, index = parts
else:
index, barcode = parts
#self.barcodes[barcodeFileAlias][barcode] = index
self.addBarcode(
barcodeFileAlias, barcode=barcode, index=index)
logging.info(
f"\t{barcode}:{index} (index was specified in file, {'index' if indexFirst else 'barcode'} on first column)")
else:
e = f'The barcode file {barcodeFile} contains more than two columns. Failed to parse!'
logging.error(e)
raise ValueError(e)
if hammingDistanceExpansion > 0:
for alias in list(self.barcodes.keys()):
# print(alias)
self.expand(hammingDistanceExpansion, alias=alias)
[docs] def getTargetCount(self, barcodeFileAlias):
return(len(self.barcodes[barcodeFileAlias]), len(self.extendedBarcodes[barcodeFileAlias]))
[docs] def expand(
self,
hammingDistanceExpansion,
alias,
reportCollisions=True,
spaceFill=None):
barcodes = self.barcodes[alias]
# hammingBarcode -> ( ( distance,origin) )
hammingSpace = collections.defaultdict(list)
for barcode in barcodes:
for hammingDistance in range(0, hammingDistanceExpansion + 1):
for hammingInstance in hamming_circle(
barcode, hammingDistance, 'ACTGN'):
hammingSpace[hammingInstance].append(
(hammingDistance, barcode))
# Resolve all
for hammingBarcode in hammingSpace:
# Check if there is a closest origin:
sortedDistances = sorted(hammingSpace[hammingBarcode])
if len(sortedDistances) > 1 and (
sortedDistances[0][0] == sortedDistances[1][0]):
# We cannot resolve this, two or more origins are at the same distance:
#print('Cannot resolve %s' % hammingBarcode)
continue
hammingDistance, origin = sortedDistances[0]
self.addBarcode(
alias,
barcode=hammingBarcode,
index=self.barcodes[alias][origin],
hammingDistance=hammingDistance,
originBarcode=origin)
# Space fill fills all hamming instances, even if they are not resolvable
[docs] def expand_old(
self,
hammingDistanceExpansion,
alias,
reportCollisions=True,
spaceFill=None):
if spaceFill is None:
spaceFill = self.spaceFill
#print("Expanding Hamming distance %s" % alias)
hammingMatrix = {}
collisions = collections.Counter()
collisionsPerBarcode = {}
barcodes = self.barcodes[alias]
# Iterate all barcodes
for barcode in barcodes:
k = barcodes[barcode]
# Perform iteration per hamming distance
for hammingDistance in range(0, hammingDistanceExpansion + 1):
for hammingInstance in hamming_circle(
barcode, hammingDistance, 'ACTGN'):
if spaceFill:
self.addBarcode(
alias,
barcode=hammingInstance,
index=k,
hammingDistance=hammingDistance,
originBarcode=barcode)
continue
# The hamming instance is a hammingDistance mutation of the
# current barcode
if hammingInstance not in hammingMatrix:
# The hamming Matrix contains [ mutatedBarcode ] = [
# targetIndex, distance, [(originBarcode, targetIndex),
# ... ], originBarcode]
hammingMatrix[hammingInstance] = [
k, hammingDistance, [(k, barcode)], barcode]
else:
# The means another barcode was already assigned to
# this mutated version
if k != hammingMatrix[hammingInstance][0]:
collisions[hammingInstance] += 1
hammingMatrix[hammingInstance][2].append(
(k, barcode))
else: # There is no collision
raise ValueError('This should never happen')
continue
# Getting here means there is a collision
if hammingMatrix[hammingInstance][1] == hammingDistance: # Collision
# set collision
hammingMatrix[hammingInstance][0] = None
# Maybe there is a collision, but one barcode is
# further away than the other:
elif hammingMatrix[hammingInstance][1] > hammingDistance:
hammingMatrix[hammingInstance][0] = k
hammingMatrix[hammingInstance][1] = hammingDistance
hammingMatrix[hammingInstance][3] = barcode
if sum(collisions.values()) > 0 and reportCollisions:
print("%s%s collisions for %s in Hamming space: %s" %
(Fore.RED, sum(collisions.values()), alias, Style.RESET_ALL))
# for hammingDistance in range(0, args.hd+1):
# print("%s %s collisions" % (hammingDistance,collisions[hammingDistance]))
showCollisions = 20
shown = 0
totalCollisions = 0
for idx, hammingInstance in enumerate(hammingMatrix):
assignedSample, hammingDistance, collidingSamples, origin = hammingMatrix[
hammingInstance]
if len(collidingSamples) > 1:
if shown < showCollisions:
print(
"%s\t%sat%s %s %sdistance%s %s" %
(",".join(
[
"%s%s[%s]%s" %
(Fore.GREEN if x[0] is assignedSample else Fore.RED,
x[0],
x[1],
Style.RESET_ALL) for x in collidingSamples]),
Style.DIM,
Style.RESET_ALL,
hammingInstance,
Style.DIM,
Style.RESET_ALL,
hammingDistance))
shown += 1
totalCollisions += 1
if totalCollisions > showCollisions:
print(('%s more ...\n' % (totalCollisions - shown)))
if not spaceFill:
mapping = {} # @todo: we don't need this variable anymore
for sequence in hammingMatrix:
if hammingMatrix[sequence][0] is not None:
mapping[sequence] = hammingMatrix[sequence][0]
self.addBarcode(
alias,
barcode=sequence,
index=hammingMatrix[sequence][0],
hammingDistance=hammingMatrix[sequence][1],
originBarcode=hammingMatrix[sequence][3])
# Show a couple of barcodes
if False:
print(('\n%s hamming extended barcodes will be used for demultiplexing strategy %s' % (
len(mapping), alias)))
if len(mapping):
showBarcodes = 7
for bcId in list(mapping.keys())[:showBarcodes]:
print(('%s%s%s → %s%s' % (Fore.GREEN, bcId,
Fore.WHITE, mapping[bcId], Style.RESET_ALL)))
if len(mapping) > showBarcodes:
print(('%s more ...\n' % (len(mapping) - showBarcodes)))
[docs] def addBarcode(
self,
barcodeFileAlias,
barcode,
index,
hammingDistance=0,
originBarcode=None):
if hammingDistance == 0:
self.barcodes[barcodeFileAlias][barcode] = index
else:
if originBarcode is None:
raise ValueError()
self.extendedBarcodes[barcodeFileAlias][barcode] = (
index, originBarcode, hammingDistance)
# get index and hamming distance to barcode returns none if not Available
[docs] def getIndexCorrectedBarcodeAndHammingDistance(self, barcode, alias):
if barcode in self.barcodes[alias]:
return (self.barcodes[alias][barcode], barcode, 0)
if barcode in self.extendedBarcodes[alias]:
return self.extendedBarcodes[alias][barcode]
return (None, None, None)
[docs] def list(self, showBarcodes=5): # showBarcodes=None show all
for barcodeAlias, mapping in self.barcodes.items():
print(
f'{len(mapping)} barcodes{Style.DIM} obtained from {Style.RESET_ALL}{barcodeAlias}')
if len(mapping):
for bcId in list(mapping.keys())[:showBarcodes]:
try:
print(('%s%s%s%s%s%s' % (Fore.GREEN, bcId,
Fore.WHITE, '→', mapping[bcId], Style.RESET_ALL)))
except Exception as e:
print(('%s%s%s%s%s%s' % (Fore.GREEN, bcId,
Fore.WHITE, '->', mapping[bcId], Style.RESET_ALL)))
if showBarcodes is not None and len(mapping) > showBarcodes:
print(
f'{Style.DIM} %s more ...\n{Style.RESET_ALL}' %
(len(mapping) - showBarcodes))
[docs] def getBarcodeMapping(self):
return self.barcodes