Source code for singlecellmultiomics.alleleTools.alleleTools

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pysam
import argparse
import collections
import functools
import gzip
import os


[docs]class AlleleResolver:
[docs] def clean_vcf_name(self, vcffile): # Convert file name to string if it is not if type(vcffile) == pysam.libcbcf.VariantFile: vcffile = vcffile.filename.decode('ascii') if type(vcffile) == bytes: vcffile = vcffile.decode('ascii') return vcffile
def __init__(self, vcffile=None, chrom=None, phased=True, uglyMode=False, lazyLoad=False, select_samples=None, use_cache=False, ignore_conversions=None # When this flag is true a cache file is generated containing # usable SNPs for every chromosome in gzipped format ): """Initialise AlleleResolver Args: vcffile (str): path of vcf file chrom (str): contig/chromosome to prepare variants for phased(bool) : the variants in the vcf file are phased (every sample is one haplotype) uglyMode (bool) : the vcf file is invalid (not indexed) and has to be loaded to memory lazyLoad (bool) : the vcf file is valid and indexed and does not need to be loaded to memory select_samples (list) : Use only these samples from the VCF file use_cache (bool) : When this flag is true a cache file is generated containing usable SNPs for every chromosome in gzipped format ignore_conversions(set) : conversions to ignore {(ref, alt), ..} , for example set( ('C','T'), ('G','A') ) """ self.ignore_conversions = ignore_conversions self.phased = phased self.verbose = False self.locationToAllele = collections.defaultdict(lambda: collections.defaultdict( lambda: collections.defaultdict(set))) # chrom -> pos-> base -> sample(s) self.select_samples = select_samples self.lazyLoad = lazyLoad if vcffile is None: return self.vcffile = self.clean_vcf_name(vcffile) self.use_cache = use_cache try: with pysam.VariantFile(vcffile) as f: pass except Exception as e: print(e) uglyMode = True if self.verbose: print( 'Pysam cannot read the VCF, rolling back to slower homebrew parser') if lazyLoad and uglyMode: if self.verbose: print( 'Lazy loading is not supported for non proper VCFs. Loading all variants to memory.') if self.select_samples is not None: raise NotImplementedError( "Sample selection is not implemented for non proper VCF") lazyLoad = False # self.locationToAllele = collections.defaultdict( lambda: # collections.defaultdict(set) ) #(chrom, pos)-> base -> sample(s) if uglyMode: foundIt = False # If the target chromosome was found with open(vcffile, 'r') as f: for line in f: if line[0] != '#': chromosome, location, id, ref, alt = line.strip().split() if chrom is None or chromosome == chrom: pos = int(location) foundIt = True self.locationToAllele[chromosome][pos - 1][ref].add("REF") self.locationToAllele[chromosome][pos - 1][alt].add("ALT") elif chrom is not None and foundIt: print('Stopping for speed') break if self.verbose: print('Finished loading dirty vcf file') else: if not lazyLoad: self.fetchChromosome(vcffile, chrom) else: if self.verbose: print('Lazy loading allele file') pass
[docs] def addAlleleInfoOneBased(self, chromosome, location, base, alleleName): self.locationToAllele[chromosome][location][base].add(alleleName)
[docs] def write_cache(self, path, chrom): """Write to cache file, this will make later lookups to the chromosome faster Args: path (str): path of the cache file chrom (str): contig/chromosome to write cache file for (every contig has it's own cache) """ temp_path = path + '.unfinished' with gzip.open(temp_path, 'wt') as f: for position in sorted(list(self.locationToAllele[chrom].keys())): for base in self.locationToAllele[chrom][position]: f.write( f'{position}\t{base}\t{",".join(sorted(list(self.locationToAllele[chrom][position][base])))}\n') os.rename(temp_path, path)
[docs] def read_cached(self, path, chrom): """Read cache file Args: path (str): path of the cache file chrom (str): contig/chromosome """ with gzip.open(path, 'rt') as f: for line in f: position, base, samples = line.strip().split('\t', 3) self.locationToAllele[chrom][int( position)][base] = set(samples.split(','))
[docs] def fetchChromosome(self, vcffile, chrom, clear=False): if clear: self.locationToAllele = collections.defaultdict(lambda: collections.defaultdict( lambda: collections.defaultdict(set))) # chrom -> pos-> base -> sample(s) vcffile = self.clean_vcf_name(vcffile) # allocate: # Decide if this is an allele we would may be cache? write_cache_file_flag = False if self.use_cache: if (chrom.startswith('KN') or chrom.startswith('KZ') or chrom.startswith( 'chrUn') or chrom.endswith('_random') or 'ERCC' in chrom): write_cache_file_flag = False else: write_cache_file_flag = True if self.use_cache and write_cache_file_flag: allele_dir = f'{os.path.abspath(vcffile)}_allele_cache/' if not os.path.exists(allele_dir): os.makedirs(allele_dir) cache_file_name = f'{allele_dir}/{chrom}' if self.select_samples is not None: sample_list_id = '-'.join(sorted(list(self.select_samples))) cache_file_name = cache_file_name + '_' + sample_list_id cache_file_name += '.tsv.gz' if os.path.exists(cache_file_name): if self.verbose: print(f"Cached file exists at {cache_file_name}") self.read_cached(cache_file_name, chrom) return if self.verbose: print( f"Cache enabled, but file is not available, creating cache file at {cache_file_name}") self.locationToAllele[chrom][-1]['N'].add('Nop') unTrusted = [] added = 0 if self.verbose: print(f'Reading variants for {chrom} ', end='') with pysam.VariantFile(vcffile) as v: try: for rec in v.fetch(chrom): used = False bad = False bases_to_alleles = collections.defaultdict( set) # base -> samples if self.phased: # variants are phased, assign a random allele samples_assigned = set() most_assigned_base = 0 for sample, sampleData in rec.samples.items(): if self.select_samples is not None and sample not in self.select_samples: continue for base in sampleData.alleles: if base is None: unTrusted.append((rec.chrom, rec.pos)) continue if len(base) == 1: bases_to_alleles[base].add(sample) used = True samples_assigned.add(sample) else: # This location cannot be trusted: bad = True # We can prune this site if all samples are associated # with the same base if self.select_samples is not None and used: if len(samples_assigned) != len( self.select_samples): # The site is not informative bad = True if len(bases_to_alleles) < 2: bad = True # The site is not informative else: # not phased if not all( len(allele) == 1 for allele in rec.alleles): # only select SNVs bad = True else: bad = False for allele, base in zip('UVWXYZ', rec.alleles): bases_to_alleles[base].add(allele) used = True if not bad and self.ignore_conversions is not None: # prune conversions which are banned bad = any( ((rec.ref, base) in self.ignore_conversions for base in bases_to_alleles)) if used and not bad: self.locationToAllele[rec.chrom][rec.pos - 1] = bases_to_alleles added += 1 except Exception as e: print(e) # for t in unTrusted: # if t in self.locationToAllele: # del self.locationToAllele[t[0]][t[1]] #del unTrusted if self.verbose: print(f'{added} variants [OK]') if self.use_cache and write_cache_file_flag: if self.verbose: print("writing cache file") try: self.write_cache(cache_file_name, chrom) except Exception as e: if self.verbose: print(f"Exception writing cache: {e}") pass # @todo
[docs] def getAllele(self, reads): alleles = set() for read in reads: if read is None or read.is_unmapped: continue chrom = read.reference_name for readPos, refPos in read.get_aligned_pairs(matches_only=True): readBase = read.query_sequence[readPos] c = self.getAllelesAt(chrom, refPos, readBase) if c is not None and len(c) == 1: alleles.update(c) return alleles
# @functools.lru_cache(maxsize=1000) not necessary anymore... complete data is already saved in dict
[docs] def getAllelesAt(self, chrom, pos, base): if self.lazyLoad and chrom not in self.locationToAllele: try: self.fetchChromosome(self.vcffile, chrom, clear=True) except Exception as e: print(e) pass if chrom not in self.locationToAllele or pos not in self.locationToAllele[chrom]: return None if base not in self.locationToAllele[chrom][pos]: return None return self.locationToAllele[chrom][pos][base]
# FWD_-13_C REV_-16_C # FWD_-16_C REV_+12_C