import sklearn.ensemble
import numpy as np
import pandas as pd
import time
[docs]def calculate_consensus(molecule, consensus_model, molecular_identifier, out, **model_kwargs ):
"""
Create consensus read for molecule
Args:
molecule (singlecellmultiomics.molecule.Molecule)
consensus_model
molecular_identifier (str) : identier for this molecule, will be suffixed to the reference_id
out(pysam.AlingmentFile) : target bam file
**model_kwargs : arguments passed to the consensus model
"""
try:
consensus_reads = molecule.deduplicate_to_single_CIGAR_spaced(
out,
f'c_{molecule.get_a_reference_id()}_{molecular_identifier}',
consensus_model,
NUC_RADIUS=model_kwargs['consensus_k_rad']
)
for consensus_read in consensus_reads:
consensus_read.set_tag('RG', molecule[0].get_read_group())
consensus_read.set_tag('mi', molecular_identifier)
out.write(consensus_read)
except Exception as e:
molecule.set_rejection_reason('CONSENSUS_FAILED',set_qcfail=True)
molecule.write_pysam(out)
[docs]def base_calling_matrix_to_df(
x,
ref_info=None,
NUC_RADIUS=1,
USE_RT=True):
"""
Convert numpy base calling feature matrix to pandas dataframe with annotated columns
Args:
x(np.array) : feature matrix
ref_info(list) : reference position annotations (will be used as index)
NUC_RADIUS(int) : generate kmer features target nucleotide
USE_RT(bool) : use RT reaction features
Returns:
df (pd.DataFrame)
"""
df = pd.DataFrame(x)
# annotate the columns
BASE_COUNT = 5
RT_INDEX = 7 if USE_RT else None
STRAND_INDEX = 0
PHRED_INDEX = 1
RC_INDEX = 2
MATE_INDEX = 3
CYCLE_INDEX = 4
MQ_INDEX = 5
FS_INDEX = 6
COLUMN_OFFSET = 0
features_per_block = 8 - (not USE_RT)
block_header = ["?"] * features_per_block
block_header[STRAND_INDEX] = 'strand'
block_header[PHRED_INDEX] = 'phred'
block_header[RC_INDEX] = 'read_count'
block_header[MATE_INDEX] = 'mate'
block_header[CYCLE_INDEX] = 'cycle'
block_header[MQ_INDEX] = 'mq'
block_header[FS_INDEX] = 'fragment_size'
block_header[RT_INDEX] = 'rt_reactions'
k_header = []
for k in range(NUC_RADIUS * 2 + 1):
for base in 'ACGTN':
k_header += [(k, b, base) for b in block_header]
try:
df.columns = pd.MultiIndex.from_tuples(k_header)
except ValueError: # the dataframe is a concateenation of multiple molecules
pass
if ref_info is not None:
df.index = pd.MultiIndex.from_tuples(ref_info)
return df
[docs]def get_consensus_training_data(
molecule_iterator,
mask_variants=None,
n_train=100_000, # When None, training data is created until molecule source depletion
skip_already_covered_bases = True,
#yield_results=False, # Yield results instead of returning a matrix
**feature_matrix_args):
"""
Create a tensor/matrix containing alignment and base calling information, which can be used for consensus calling.
This function also creates a vector containing the corresponding reference bases, which can be used for training a consensus model.
Args:
molecule_iterator : generator which generates molecules from which base calling feature matrices are extracted
mask_variants (pysam.VariantFile) : variant locations which should be excluded from the matrix
n_train (int) : amount of rows in the matrix
skip_already_covered_bases(bool) : when True every reference position is at most a single row in the output matrix, this prevents overfitting
**feature_matrix_args : Arguments to pass to the feature matrix function of the molecules.
"""
#if not yield_results:
X = None
y = []
molecules_used = 0
training_set_size = 0
last_end = None
last_chrom = None
try:
for i, molecule in enumerate(molecule_iterator):
# Never train the same genomic location twice
if skip_already_covered_bases:
if last_chrom is not None and last_chrom != molecule.chromosome:
last_end = None
if last_end is not None and molecule.spanStart < last_end:
continue
train_result = molecule.get_base_calling_training_data(
mask_variants, **feature_matrix_args)
if train_result is None:
# Continue when the molecule does not have bases where we can learn from
continue
x, _y = train_result
training_set_size += len(_y)
#if yield_results:
# yield x, _y
#else:
if X is None:
X = np.empty((0, x.shape[1]))
print(
f"Creating feature matrix with {x.shape[1]} dimensions and {n_train} training base-calls")
y += _y
X = np.append(X, x, axis=0)
last_chrom = molecule.chromosome
if training_set_size >= n_train:
break
molecules_used+=1
if molecule.spanEnd is not None:
last_end = molecule.spanEnd
else:
last_end += len(_y)
except KeyboardInterrupt as e:
print("Got keyboard interrupt, stopping to load more data")
if molecules_used > 0:
print(
f'Finished, last genomic coordinate: {molecule.chromosome} {molecule.spanEnd}, training set size is {training_set_size}, used {molecules_used} molecules for training')
#if not yield_results:
return X, y
[docs]def train_consensus_model(
molecule_iterator,
mask_variants=None,
classifier=None,
n_train=100_000,
skip_already_covered_bases = True,
**feature_matrix_args):
if classifier is None: # default to random forest
classifier = sklearn.ensemble.RandomForestClassifier(
n_jobs=-1,
n_estimators=100,
oob_score=True,
max_depth=7,
min_samples_leaf=5
)
X, y = get_consensus_training_data(
molecule_iterator, mask_variants=mask_variants, n_train=n_train,
skip_already_covered_bases=skip_already_covered_bases,**feature_matrix_args)
y = np.array(y)
# remove unkown ref bases from set
X = np.array(X)[y != 'N']
y = y[y != 'N']
classifier.fit(X, y)
if isinstance(classifier, sklearn.ensemble.forest.RandomForestClassifier):
print(f"Model out of bag accuracy: {classifier.oob_score_}")
classifier.n_jobs = 1 # fix amount of jobs to one, otherwise apply will be very slow
return classifier