Source code for singlecellmultiomics.statistic.plate

import matplotlib.patheffects as path_effects
import math
import string
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from .statistic import StatisticHistogram
import singlecellmultiomics.pyutils as pyutils
import collections

import matplotlib
matplotlib.rcParams['figure.dpi'] = 160
matplotlib.use('Agg')


[docs]def human_readable(value, targetDigits=2, fp=0): # Float: if value < 1 and value > 0: return('%.2f' % value) if value == 0.0: return('0') baseId = int(math.floor(math.log10(float(value)) / 3.0)) suffix = "" if baseId == 0: sVal = str(round(value, targetDigits)) if len(sVal) > targetDigits and sVal.find('.'): sVal = sVal.split('.')[0] elif baseId > 0: sStrD = max(0, targetDigits - len(str('{:.0f}'.format((value / (math.pow(10, baseId * 3))))))) sVal = ('{:.%sf}' % min(fp, sStrD)).format( (value / (math.pow(10, baseId * 3)))) suffix = 'kMGTYZ'[baseId - 1] else: sStrD = max(0, targetDigits - len(str('{:.0f}'.format((value * (math.pow(10, -baseId * 3))))))) sVal = ('{:.%sf}' % min(fp, sStrD)).format( (value * (math.pow(10, -baseId * 3)))) suffix = 'mnpf'[-baseId - 1] if len(sVal) + 1 > targetDigits: # :( sVal = str(round(value, fp))[1:] suffix = '' return('%s%s' % (sVal, suffix))
# Visualize the following: # PER LIBRARY / DEMUX method # total fragments # total fragments with correct site # unique molecules # 384 well format: well2index = collections.defaultdict(dict) index2well = collections.defaultdict(dict) rows = string.ascii_uppercase[:16] columns = list(range(1, 25)) for ci in range(1, 385): i = ci - 1 rowIndex = math.floor(i / len(columns)) row = rows[rowIndex] column = columns[i % len(columns)] well2index[384][(row, column)] = ci index2well[384][ci] = (row, column) rows96 = string.ascii_uppercase[:8] columns96 = list(range(1, 13)) for ci in range(1, 97): i = ci - 1 rowIndex = math.floor(i / len(columns96)) row = rows96[rowIndex] column = columns96[i % len(columns96)] well2index[96][(row, column)] = ci index2well[96][ci] = (row, column)
[docs]class PlateStatistic(object): def __init__(self, args): self.args = args self.rawFragmentCount = collections.defaultdict( collections.Counter) # (library, mux) -> cell -> counts self.usableCount = collections.defaultdict( collections.Counter) # (library, mux) -> cell -> counts self.moleculeCount = collections.defaultdict( collections.Counter) # (library, mux) -> cell -> counts self.skipReasons = collections.Counter()
[docs] def to_csv(self, path): pd.DataFrame( self.moleculeCount).to_csv( path.replace( '.csv', 'molecules.csv')) pd.DataFrame( self.usableCount).to_csv( path.replace( '.csv', 'usable_reads.csv')) pd.DataFrame( self.rawFragmentCount).to_csv( path.replace( '.csv', 'raw_fragments.csv'))
[docs] def processRead(self, R1,R2): for read in [R1,R2]: if read is None: continue if not read.has_tag('MX'): return self.rawFragmentCount[(read.get_tag('LY'), read.get_tag('MX'))][read.get_tag('SM')] += 1 if read.get_tag('MX').startswith('CS2'): if read.has_tag('XT') or read.has_tag('EX'): if read.is_read1: # We only count reads2 return self.usableCount[(read.get_tag('LY'), read.get_tag('MX'))][read.get_tag('SM')] += 1 if read.has_tag('RC') and read.get_tag('RC') == 1: self.moleculeCount[(read.get_tag('LY'), read.get_tag( 'MX'))][read.get_tag('SM')] += 1 else: if read.has_tag('DS'): if not read.is_read1: self.skipReasons['Not R1'] += 1 return self.usableCount[(read.get_tag('LY'), read.get_tag('MX'))][read.get_tag('SM')] += 1 if not read.is_duplicate: self.moleculeCount[(read.get_tag('LY'), read.get_tag( 'MX'))][read.get_tag('SM')] += 1 else: self.skipReasons['No DS'] += 1 break
def __repr__(self): return 'Plate statistic'
[docs] def cell_counts_to_dataframe(self, cell_counts, mux, name='raw_reads'): df = pd.DataFrame({name: cell_counts}) offset = 0 # Offset is zero for all protocols since 0.1.12 format = 384 if ('384' in mux or mux.startswith('CS2')) else 96 df['col'] = [index2well[format] [(offset + int(x.rsplit('_')[-1]))][1] for x in df.index] df['row'] = [-rows.index(index2well[format] [(offset + int(x.rsplit('_')[-1]))][0]) for x in df.index] df['size'] = (df[name] / np.percentile(df[name], 99) * 200) return df
def __iter__(self): for data, name in [ (self.rawFragmentCount, 'raw_reads'), (self.usableCount, 'usable_reads'), (self.moleculeCount, 'unique_molecules')]: for (library, mux), cellCounts in data.items(): df = self.cell_counts_to_dataframe(cellCounts, mux, name=name) for i, row in df.iterrows(): yield i, row
[docs] def plot(self, target_path, title=None): for data, name in [ (self.rawFragmentCount, 'raw_reads'), (self.usableCount, 'usable_reads'), (self.moleculeCount, 'unique_molecules')]: for (library, mux), cellCounts in data.items(): df = self.cell_counts_to_dataframe(cellCounts, mux, name=name) df.plot.scatter(x='col', y='row', s=df['size'], c=[(0.2, 0.2, 0.5, 0.9)] ) # Annotate the outliers with values: ax = plt.gca() for ii, row in df.iterrows(): if row[name] > 0 and ( row[name] < np.percentile( df[name], 5) or row[name] > np.percentile( df[name], 95)): text = ax.annotate(human_readable(int(row[name])), (row['col'], row['row']), ha='center', va='baseline', color='w', size=7) text.set_path_effects([path_effects.Stroke( linewidth=3, foreground='black'), path_effects.Normal()]) plt.yticks( sorted( df['row'].unique())[ ::-1], sorted(rows), rotation=0) plt.xticks( sorted( df['col'].unique()), sorted(columns), rotation=0) plt.title(fr'{name} with ${mux}$ adapter' + f'\n{library}') # Create legend: #ld = [] # for x in np.linspace(1, max(df[name]), 4): # size = (x/np.percentile(df[name],99))*200 # ld.append( mlines.Line2D([], [], color='blue', marker='.', linestyle='None', # markersize=np.sqrt(size), label=f'{int(x)}:{size}')) # plt.legend(handles=ld) plt.savefig( target_path.replace( '.png', '') + f'{name}_{mux}_{library}.png') plt.close()