Source code for singlecellmultiomics.alleleTools.alleleTools

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pysam
import argparse
import collections
import functools
import gzip
import os
from singlecellmultiomics.utils import Prefetcher

[docs]def get_allele_dict(): return collections.defaultdict(nested_set_defaultdict)
[docs]def nested_set_defaultdict(): return collections.defaultdict( set_defaultdict )
[docs]def set_defaultdict(): return collections.defaultdict (set)
[docs]class AlleleResolver(Prefetcher):
[docs] def clean_vcf_name(self, vcffile): # Convert file name to string if it is not if type(vcffile) == pysam.libcbcf.VariantFile: vcffile = vcffile.filename.decode('ascii') if type(vcffile) == bytes: vcffile = vcffile.decode('ascii') return vcffile
def __init__(self, vcffile=None, chrom=None, phased=True, uglyMode=False, lazyLoad=False, select_samples=None, use_cache=False, ignore_conversions=None, verbose=False, region_start=None, region_end=None # When this flag is true a cache file is generated containing # usable SNPs for every chromosome in gzipped format ): """Initialise AlleleResolver Args: vcffile (str): path of vcf file chrom (str): contig/chromosome to prepare variants for phased(bool) : the variants in the vcf file are phased (every sample is one haplotype) uglyMode (bool) : the vcf file is invalid (not indexed) and has to be loaded to memory verbose (bool) : Print debug information lazyLoad (bool) : the vcf file is valid and indexed and does not need to be loaded to memory select_samples (list) : Use only these samples from the VCF file use_cache (bool) : When this flag is true a cache file is generated containing usable SNPs for every chromosome in gzipped format ignore_conversions(set) : conversions to ignore {(ref, alt), ..} , for example set( ('C','T'), ('G','A') ) region_start(int) : only load variants within this range (region_start-region_end) when reading a cached variant file region_end(int) : only load variants within this range (region_start-region_end) when reading a cached variant file """ self.args = locals().copy() del self.args['self'] self.ignore_conversions = ignore_conversions self.phased = phased self.verbose = verbose self.locationToAllele = get_allele_dict() # chrom -> pos-> base -> sample(s) self.select_samples = select_samples self.region_start = region_start self.region_end = region_end self.lazyLoad = lazyLoad if vcffile is None: return self.vcffile = self.clean_vcf_name(vcffile) self.use_cache = use_cache if use_cache: lazyLoad = True try: with pysam.VariantFile(vcffile) as f: pass except Exception as e: print(e) uglyMode = True if self.verbose: print( 'Pysam cannot read the VCF, rolling back to slower homebrew parser') if lazyLoad and uglyMode: if self.verbose: print( 'Lazy loading is not supported for non proper VCFs. Loading all variants to memory.') if self.select_samples is not None: raise NotImplementedError( "Sample selection is not implemented for non proper VCF") lazyLoad = False # collections.defaultdict(set) ) #(chrom, pos)-> base -> sample(s) if uglyMode: foundIt = False # If the target chromosome was found with open(vcffile, 'r') as f: for line in f: if line[0] != '#': chromosome, location, id, ref, alt = line.strip().split() if chrom is None or chromosome == chrom: pos = int(location) foundIt = True self.locationToAllele[chromosome][pos - 1][ref].add("REF") self.locationToAllele[chromosome][pos - 1][alt].add("ALT") elif chrom is not None and foundIt: print('Stopping for speed') break if self.verbose: print('Finished loading dirty vcf file') else: if not lazyLoad: self.fetchChromosome(vcffile, chrom) else: if self.verbose: print('Lazy loading allele file') pass
[docs] def addAlleleInfoOneBased(self, chromosome, location, base, alleleName): self.locationToAllele[chromosome][location][base].add(alleleName)
[docs] def write_cache(self, path, chrom): """Write to cache file, this will make later lookups to the chromosome faster Args: path (str): path of the cache file chrom (str): contig/chromosome to write cache file for (every contig has it's own cache) """ temp_path = path + '.unfinished' with gzip.open(temp_path, 'wt') as f: for position in sorted(list(self.locationToAllele[chrom].keys())): for base in self.locationToAllele[chrom][position]: f.write( f'{position}\t{base}\t{",".join(sorted(list(self.locationToAllele[chrom][position][base])))}\n') os.rename(temp_path, path)
[docs] def read_cached(self, path, chrom): """Read cache file Args: path (str): path of the cache file chrom (str): contig/chromosome """ with gzip.open(path, 'rt') as f: for line in f: position, base, samples = line.strip().split('\t', 3) position = int(position) if self.region_start is not None and position<self.region_start: continue if self.region_end is not None and position>self.region_end: break self.locationToAllele[chrom][position][base] = set(samples.split(','))
[docs] def instance(self, arg_update): if 'self' in self.args: del self.args['self'] clone = AlleleResolver(**self.args) return clone
[docs] def prefetch(self, contig, start, end): clone = self.instance({'region_start':start, 'region_end':end}) #print(f'Prefetching {contig}:{start}-{end}') try: self.fetchChromosome(self.vcffile, contig, True) except ValueError: # This means the chromosome is not available pass return clone
[docs] def fetchChromosome(self, vcffile, chrom, clear=False): if clear: self.locationToAllele = get_allele_dict() # chrom -> pos-> base -> sample(s) vcffile = self.clean_vcf_name(vcffile) # allocate: # Decide if this is an allele we would may be cache? write_cache_file_flag = False if self.use_cache: if (chrom.startswith('KN') or chrom.startswith('KZ') or chrom.startswith( 'chrUn') or chrom.endswith('_random') or 'ERCC' in chrom): write_cache_file_flag = False else: write_cache_file_flag = True if self.use_cache and write_cache_file_flag: allele_dir = f'{os.path.abspath(vcffile)}_allele_cache/' if not os.path.exists(allele_dir): os.makedirs(allele_dir) cache_file_name = f'{allele_dir}/{chrom}' if self.select_samples is not None: sample_list_id = '-'.join(sorted(list(self.select_samples))) cache_file_name = cache_file_name + '_' + sample_list_id cache_file_name += '.tsv.gz' if os.path.exists(cache_file_name): if self.verbose: print(f"Cached file exists at {cache_file_name}") self.read_cached(cache_file_name, chrom) return if self.verbose: print( f"Cache enabled, but file is not available, creating cache file at {cache_file_name}") self.locationToAllele[chrom][-1]['N'].add('Nop') unTrusted = [] added = 0 if self.verbose: print(f'Reading variants for {chrom} ', end='') with pysam.VariantFile(vcffile) as v: try: for rec in v.fetch(chrom, start=self.region_start, stop=self.region_end): used = False bad = False bases_to_alleles = collections.defaultdict( set) # base -> samples if self.phased: # variants are phased, assign a random allele if len(rec.samples)==0: # File without samples bases_to_alleles[rec.ref]=set('r') bases_to_alleles[rec.alts[0]]=set('a') self.locationToAllele[rec.chrom][rec.pos - 1]=bases_to_alleles else: samples_assigned = set() most_assigned_base = 0 monomorphic=False for sample, sampleData in rec.samples.items(): if self.select_samples is not None and sample not in self.select_samples: continue for base in sampleData.alleles: if base is None: # This site is monomorphic: monomorphic=True continue if len(base) == 1: bases_to_alleles[base].add(sample) used = True samples_assigned.add(sample) else: # This location cannot be trusted: bad = True # We can prune this site if all samples are associated # with the same base if self.select_samples is not None and used: if len(samples_assigned) != len( self.select_samples): # The site is not informative bad = True if monomorphic and len(bases_to_alleles)>0: bad=False elif len(bases_to_alleles) < 2: bad = True # The site is not informative else: # not phased if not all( len(allele) == 1 for allele in rec.alleles): # only select SNVs bad = True else: bad = False for allele, base in zip('UVWXYZ', rec.alleles): bases_to_alleles[base].add(allele) used = True if not bad and self.ignore_conversions is not None: # prune conversions which are banned bad = any( ((rec.ref, base) in self.ignore_conversions for base in bases_to_alleles)) if used and not bad: self.locationToAllele[rec.chrom][rec.pos - 1] = bases_to_alleles added += 1 except Exception as e: raise # for t in unTrusted: # if t in self.locationToAllele: # del self.locationToAllele[t[0]][t[1]] #del unTrusted if self.verbose: print(f'{added} variants [OK]') if self.use_cache and write_cache_file_flag: if self.verbose: print("writing cache file") try: self.write_cache(cache_file_name, chrom) except Exception as e: if self.verbose: print(f"Exception writing cache: {e}") pass # @todo
[docs] def getAllele(self, reads): alleles = set() for read in reads: if read is None or read.is_unmapped: continue chrom = read.reference_name for readPos, refPos in read.get_aligned_pairs(matches_only=True): readBase = read.query_sequence[readPos] c = self.getAllelesAt(chrom, refPos, readBase) if c is not None and len(c) == 1: alleles.update(c) return alleles
# @functools.lru_cache(maxsize=1000) not necessary anymore... complete data is already saved in dict
[docs] def has_location(self, chrom ,pos): if self.lazyLoad and chrom not in self.locationToAllele: try: self.fetchChromosome(self.vcffile, chrom, clear=True) except Exception as e: if 'invalid contig' in str(e): # The contig does not exists return True if 'fetch requires an index' in str(e): raise Exception('The variant file used for allele resolving does not have an index file. Use bcftools index, or vcftools index to generate an index') print('ERROR, in has_location (Allele Resolver):', e) pass if chrom not in self.locationToAllele or pos not in self.locationToAllele[chrom]: return False return True
[docs] def getAllelesAt(self, chrom, pos, base): if self.lazyLoad and chrom not in self.locationToAllele: try: self.fetchChromosome(self.vcffile, chrom, clear=True) except Exception as e: if 'fetch requires an index' in str(e): raise Exception('The variant file used for allele resolving does not have an index file. Use bcftools index, or vcftools index to generate an index') print('ERROR, in getAllelesAt:', e) pass if chrom not in self.locationToAllele or pos not in self.locationToAllele[chrom]: return None if base not in self.locationToAllele[chrom][pos]: return None return self.locationToAllele[chrom][pos][base]
# FWD_-13_C REV_-16_C # FWD_-16_C REV_+12_C