#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pysam
from itertools import chain
from singlecellmultiomics.molecule import MoleculeIterator, ReadIterator
import singlecellmultiomics
import singlecellmultiomics.molecule
from singlecellmultiomics.molecule.consensus import calculate_consensus
import singlecellmultiomics.fragment
from singlecellmultiomics.bamProcessing.bamFunctions import sorted_bam_file, get_reference_from_pysam_alignmentFile, write_program_tag, MapabilityReader, verify_and_fix_bam,add_blacklisted_region
from singlecellmultiomics.utils import is_main_chromosome
from singlecellmultiomics.utils.submission import submit_job
import singlecellmultiomics.alleleTools
from singlecellmultiomics.universalBamTagger.customreads import CustomAssingmentQueryNameFlagger, BulkFlagger
import singlecellmultiomics.features
from pysamiterators import MatePairIteratorIncludingNonProper, MatePairIterator
from singlecellmultiomics.universalBamTagger.tagging import generate_tasks, prefetch, run_tagging_tasks, write_job_gen_to_bed
from singlecellmultiomics.bamProcessing.bamBinCounts import blacklisted_binning_contigs,get_bins_from_bed_iter
from singlecellmultiomics.utils.binning import bp_chunked
from singlecellmultiomics.bamProcessing import merge_bams, get_contigs_with_reads, sam_to_bam
from singlecellmultiomics.fastaProcessing import CachedFastaNoHandle
from singlecellmultiomics.utils.prefetch import UnitialisedClass
from multiprocessing import Pool
from typing import Generator
import argparse
import uuid
import os
import sys
import colorama
import pkg_resources
import pickle
from datetime import datetime
from time import sleep
# Supress [E::idx_find_and_load] Could not retrieve index file for, see https://github.com/pysam-developers/pysam/issues/939
pysam.set_verbosity(0)
argparser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='Assign molecules, set sample tags, set alleles')
argparser.add_argument('bamin', type=str, help='Input BAM file, SAM files can also be supplied but will be converted to BAM')
argparser.add_argument(
'-ref',
type=str,
default=None,
help="Path to reference fast (autodected if not supplied)")
argparser.add_argument('-o', type=str, help="output bam file", required=True)
argparser.add_argument(
'-method',
type=str,
default=None,
required=True,
help="""Protocol to tag, select from:nla, qflag, chic, nla_transcriptome, vasa, cs, cs_feature_counts, nla_taps ,chic_taps, nla_no_overhang. nla (Data with digested by Nla III enzyme)
nla (Data with digested by Nla III enzyme)
qflag (Only add basic tags like sampple and UMI, no molecule assignment)
chic (Data digested using mnase fusion)
nla_transcriptome (Data with transcriptome and genome digested by Nla III )
vasa (VASA transcriptomic data)
cs (CELseq data, 1 and 2)
cs_feature_counts (Single end, deduplicate using a bam file tagged using featurecounts, deduplicates a umi per gene)
fl_feature_counts (deduplicate using a bam file tagged using featurecounts, deduplicates based on fragment position)
nla_taps (Data with digested by Nla III enzyme and methylation converted by TAPS)
chic_taps (Data with digested by mnase enzyme and methylation converted by TAPS)
nla_tapsp_transcriptome (Add feature annotation to nla_ptaps mode )
nla_taps_transcriptome (Add feature annotation to nla_taps mode )
nla_no_overhang (Data with digested by Nla III enzyme, without the CATG present in the reads)
scartrace (Lineage tracing )
""")
argparser.add_argument(
'-qflagger',
type=str,
default=None,
help="Query flagging algorithm, use -bulk to set the same sample and umi to all reads")
argparser.add_argument(
'--ignore_bam_issues',
action='store_true',
help='Ignore truncation')
argparser.add_argument('-custom_flags', type=str, default="MI,RX,bi,SM")
r_select = argparser.add_argument_group('Region selection')
r_select.add_argument('-head', type=int)
r_select.add_argument('-contig', type=str, help='Contig to only process')
r_select.add_argument('-skip_contig', type=str, help='Contigs not to process')
r_select.add_argument('-region_start', type=int, help='Zero based start coordinate of region to process')
r_select.add_argument('-region_end', type=int, help='Zero based end coordinate of region to process')
r_select.add_argument('-blacklist', type=str, help='BED file containing regions to skip')
allele_gr = argparser.add_argument_group('alleles')
allele_gr.add_argument('-alleles', type=str, help="Phased allele file (VCF)")
allele_gr.add_argument(
'-allele_samples',
type=str,
help="Comma separated samples to extract from the VCF file. For example B6,SPRET")
allele_gr.add_argument(
'-unphased_alleles',
type=str,
help="Unphased allele file (VCF)")
allele_gr.add_argument(
'--set_allele_resolver_verbose',
action='store_true',
help='Makes the allele resolver print more')
allele_gr.add_argument(
'--haplo_molecule_assignment',
action='store_true',
help='Take allele information into account during molecule assignment ')
allele_gr.add_argument(
'--use_allele_cache',
action='store_true',
help='''Write and use a cache file for the allele information. NOTE: THIS IS NOT THREAD SAFE! Meaning you should not use this function on multiple libraries at the same time when the cache files are not yet available.
Once they are available there is not thread safety issue anymore''')
argparser.add_argument('-molecule_iterator_verbosity_interval',type=int,default=None,help='Molecule iterator information interval in seconds')
argparser.add_argument('--molecule_iterator_verbose', action='store_true', help='Show progress indication on command line')
argparser.add_argument('-stats_file_path',type=str,default=None,help='Path to logging file, ends with ".tsv"')
argparser.add_argument('-jobbed',type=str,default=None,help='Path to location to write multiprocessing region log file')
argparser.add_argument(
'--multiprocess',
action='store_true',
help="Use multiple the CPUs of you system to achieve (much) faster tagging")
argparser.add_argument(
'--one_contig_per_process',
action='store_true',
help="Do not split contigs/chromosomes into chunks for parallel processing. Use this when you want to correctly deduplicate mates which are mapping very far apart. (>50kb)")
argparser.add_argument(
'-tagthreads',
type=int,
help='Amount of processes to use for tagging (--multiprocess needs to be enabled). Uses all available CPUs when not set.'
)
argparser.add_argument(
'-max_time_per_segment',
default=None,
type=float,
help='Maximum time spent on a single genomic location')
argparser.add_argument(
'-temp_folder',
default='./',
help="Temp folder location")
fragment_settings = argparser.add_argument_group('Fragment settings')
fragment_settings.add_argument('-read_group_format', type=int, default=0, help="0: Every cell/sequencing unit gets a read group, 1: Every library/sequencing unit gets a read group")
fragment_settings.add_argument('-max_fragment_size', type=int, default=None, help='Reject fragments with a fragment size higher the specified value')
fragment_settings.add_argument(
'--resolve_unproperly_paired_reads',
action='store_true',
help='When enabled bamtagmultiome will look through the complete bam file in a hunt for the mate, the two mates will always end up in 1 molecule if both present in the bam file. This also works when the is_proper_pair bit is not set. Use this option when you want to find the breakpoints of genomic re-arrangements.')
fragment_settings.add_argument(
'--allow_cycle_shift',
action='store_true',
help='NlaIII: Allow fragments to be shifted slightly around the cut site.')
fragment_settings.add_argument(
'-assignment_radius',
type=int,
help='Molecule assignment radius')
fragment_settings.add_argument(
'-libname',type=str,
help='Overwrite library name with this value')
fragment_settings.add_argument(
'--no_rejects',
action='store_true',
help='Do not write rejected reads to output file')
fragment_settings.add_argument(
'--no_overflow',
action='store_true',
help='Do not write overflow reads to output file. Overflow reads are reads which are discarded because the molecule reached the maximum capacity of associated fragments')
fragment_settings.add_argument(
'--no_restriction_motif_check',
action='store_true',
help='Do not check for restriction motifs (NLAIII)')
molecule_settings = argparser.add_argument_group('Molecule settings')
molecule_settings.add_argument(
'-mapfile',
type=str,
help='Path to *.safe.bgzf file, used to decide if molecules are uniquely mappable, generate one using createMapabilityIndex.py ')
molecule_settings.add_argument('-umi_hamming_distance', type=int, default=1)
molecule_settings.add_argument(
'-annotmethod',
type=int,
default=1,
help="Annotation resolving method. 0: molecule consensus aligned blocks. 1: per read per aligned base")
molecule_settings.add_argument(
'--feature_container_verbose',
action='store_true',
help='Make the feature container print more')
cluster = argparser.add_argument_group('cluster execution')
cluster.add_argument(
'--cluster',
action='store_true',
help='split by chromosomes and submit the job on cluster')
cluster.add_argument(
'-mem',
default=40,
type=int,
help='Memory requested per job')
cluster.add_argument(
'-time',
default=52,
type=int,
help='Time requested per job')
cluster.add_argument(
'-sched',
default='slurm',
type=str,
help='Scheduler to use: sge, slurm, local')
cluster.add_argument(
'-clusterdir',
type=str,
default=None,
help='Folder to store cluster files in (scripts and sterr/out, when not specified a "cluster" folder will be made in same directory as -o')
scartrace_settings = argparser.add_argument_group('Scartrace specific settings')
scartrace_settings.add_argument('-scartrace_r1_primers', type=str, default='CCTTGAACTTCTGGTTGTAG', help='Comma separated list of R1 primers used. Only fragments starting with this read are taken into account.')
tr = argparser.add_argument_group('transcriptome specific settings')
tr.add_argument('-exons', type=str, help='Exon GTF file')
tr.add_argument(
'-introns',
type=str,
help='Intron GTF file, use exonGTFtoIntronGTF.py to create this file')
cg = argparser.add_argument_group('molecule consensus specific settings')
cg.add_argument(
'--consensus',
action='store_true',
help='Calculate molecule consensus read, this feature is _VERY_ experimental')
cg.add_argument('--no_source_reads', action='store_true',
help='Do not write original reads, only consensus ')
ma = argparser.add_argument_group('Molecule assignment settings')
ma.add_argument(
'--every_fragment_as_molecule',
action='store_true',
help='Assign every fragment as a molecule, this effectively disables UMI deduplication')
ma.add_argument(
'--no_umi_cigar_processing',
action='store_true',
help='Do not use the alignment during deduplication')
ma.add_argument('-max_associated_fragments',type=int, default=None, help="Limit the maximum amount of reads associated to a single molecule.")
[docs]def tag_multiome_multi_processing(
input_bam_path: str,
out_bam_path: str,
molecule_iterator: Generator = None, # @todo add custom molecule iterator?
molecule_iterator_args: dict = None,
ignore_bam_issues: bool = False, # @todo add ignore_bam_issues
head: int = None, # @todo add head
no_source_reads: bool = False, # @todo add no_source_reads
# One extra parameter is the fragment size:
fragment_size: int = None,
# And the blacklist is optional:
blacklist_path: str = None,
bp_per_job: int = None,
bp_per_segment: int = None,
temp_folder_root: str = '/tmp/scmo',
max_time_per_segment: int = None,
use_pool: bool = True,
one_contig_per_process: bool =False,
additional_args: dict = None,
n_threads=None,
job_bed_file: str = None # Writes job blocks to a bed file for inspection
):
assert bp_per_job is not None
assert fragment_size is not None
assert bp_per_segment is not None
if not os.path.exists(temp_folder_root):
raise ValueError(f'The path {temp_folder_root} does not exist')
temp_folder = os.path.join( temp_folder_root , f'scmo_{uuid.uuid4()}' )
if not os.path.exists(temp_folder):
os.makedirs(temp_folder, exist_ok=True)
if molecule_iterator_args.get('skip_contigs', None) is not None:
contig_blacklist = molecule_iterator_args.get('skip_contigs')
else:
contig_blacklist = []
if molecule_iterator_args.get('contig',None) is not None:
assert molecule_iterator_args.get('start') is None, 'regions are not implemented'
contig_whitelist = [molecule_iterator_args['contig']]
else:
contig_whitelist = [contig for contig in get_contigs_with_reads(input_bam_path) if not contig in contig_blacklist]
for prune in ['start', 'end', 'contig', 'progress_callback_function','alignments']:
if prune in molecule_iterator_args:
del molecule_iterator_args[prune]
iteration_args = {
'molecule_iterator_args': molecule_iterator_args,
'molecule_iterator_class': MoleculeIterator
}
# Define the regions to be processed and group into segments to perform tagging on
if one_contig_per_process:
#job_gen = [ [(contig,0,contig_len,0,contig_len),] for contig,contig_len in get_contigs_with_reads(input_bam_path, True) ]
if blacklist_path is not None:
raise NotImplementedError('A blacklist is currently incompatible with --multiprocessing in single contig mode')
job_gen = [[('*',None,None,None,None),],] + [ [(contig,None,None,None,None),] for contig,contig_len in get_contigs_with_reads(input_bam_path, True) if contig!='*' ]
else:
regions = blacklisted_binning_contigs(
contig_length_resource=input_bam_path,
bin_size=bp_per_segment,
fragment_size=fragment_size,
blacklist_path=blacklist_path,
contig_whitelist=contig_whitelist
)
# Chunk into jobs of roughly equal size: (A single job will process multiple segments)
job_gen = [[('*',None,None,None,None),],] + list(bp_chunked(regions, bp_per_job))
if job_bed_file is not None:
assert not one_contig_per_process, 'Cannot write bed file with jobs. Each contig is a separate job, there are no bins.'
job_gen = list(job_gen) # Convert the job generator to a list to allow it to be traversed multiple times
write_job_gen_to_bed( job_gen, job_bed_file)
print(f'Wrote job bed file to {job_bed_file}')
tasks = generate_tasks(input_bam_path=input_bam_path,
job_gen=job_gen,
iteration_args=iteration_args,
temp_folder=temp_folder,
additional_args= additional_args,
max_time_per_segment=max_time_per_segment)
# Create header bam:
temp_header_bam_path = f'{temp_folder}/{uuid.uuid4()}_header.bam'
with pysam.AlignmentFile(input_bam_path) as input_bam:
input_header = input_bam.header.as_dict()
# Write provenance information to BAM header
write_program_tag(
input_header,
program_name='bamtagmultiome',
command_line=" ".join(
sys.argv),
version=singlecellmultiomics.__version__,
description=f'SingleCellMultiOmics molecule processing, executed at {datetime.now().strftime("%d/%m/%Y %H:%M:%S")}')
if blacklist_path is not None:
for contig, start, end in get_bins_from_bed_iter(blacklist_path):
add_blacklisted_region(input_header,contig,start,end,source=blacklist_path)
# Prefetch the genomic resources with the defined genomic interval reducing I/O load during processing of the region
# this is now done automatically, by tagging.run_tagging_task
# @todo : Obtain auto blacklisted regions if applicable
# @todo : Progress indication
bam_files_generated = []
if use_pool:
workers = Pool(n_threads)
job_generator = workers.imap_unordered(run_tagging_tasks, tasks)
else:
job_generator = (run_tagging_tasks(task) for task in tasks)
total_processed_molecules = 0
for bam, meta in job_generator:
if bam is not None:
bam_files_generated.append(bam)
if len(meta):
total_processed_molecules+=meta['total_molecules']
timeouts = meta.get('timeout_tasks',[])
for timeout in timeouts:
print('blacklisted', timeout['contig'], timeout['start'], timeout['end'])
add_blacklisted_region(input_header,
contig=timeout['contig'],
start=timeout['start'],
end=timeout['end']
)
if head is not None and total_processed_molecules>head:
print('Head was supplied, stopping')
break
tagged_bam_generator = [temp_header_bam_path] + bam_files_generated
with pysam.AlignmentFile(temp_header_bam_path, 'wb', header=input_header) as out:
pass
pysam.index(temp_header_bam_path)
# merge the results and clean up:
print('Merging final bam files')
merge_bams(list(tagged_bam_generator), out_bam_path)
if use_pool:
workers.close()
# Remove the temp dir:
sleep(5)
try:
os.rmdir(temp_folder)
except Exception:
sys.stderr.write(f'Failed to remove {temp_folder}\n')
write_status(out_bam_path, 'Reached end. All ok!')
[docs]def tag_multiome_single_thread(
input_bam_path,
out_bam_path,
molecule_iterator = None,
molecule_iterator_args = None,
consensus_model = None,
consensus_model_args={}, # Clearly the consensus model class and arguments should be part of molecule
ignore_bam_issues=False,
head=None,
no_source_reads=False
):
input_bam = pysam.AlignmentFile(input_bam_path, "rb", ignore_truncation=ignore_bam_issues, threads=4)
input_header = input_bam.header.as_dict()
# Write provenance information to BAM header
write_program_tag(
input_header,
program_name='bamtagmultiome',
command_line=" ".join(
sys.argv),
version=singlecellmultiomics.__version__,
description=f'SingleCellMultiOmics molecule processing, executed at {datetime.now().strftime("%d/%m/%Y %H:%M:%S")}')
print(f'Started writing to {out_bam_path}')
# de-prefetch all:
# contig, start, end, start, end , args
molecule_iterator_args = prefetch(molecule_iterator_args['contig'],
molecule_iterator_args['start'],
molecule_iterator_args['end'],
molecule_iterator_args['start'],
molecule_iterator_args['end'],
molecule_iterator_args)
molecule_iterator_args_wo_alignment = {k:v for k, v in molecule_iterator_args.items() if k != 'alignments'}
molecule_iterator_args_wo_alignment_unmapped = molecule_iterator_args_wo_alignment.copy()
molecule_iterator_args_wo_alignment_unmapped['contig'] = '*'
molecule_iterator_exec = chain(
molecule_iterator(input_bam, **molecule_iterator_args_wo_alignment_unmapped), molecule_iterator(input_bam, **molecule_iterator_args_wo_alignment),
)
print('Params:',molecule_iterator_args)
read_groups = dict() # Store unique read groups in this dict
with sorted_bam_file(out_bam_path, header=input_header, read_groups=read_groups) as out:
try:
for i, molecule in enumerate(molecule_iterator_exec):
# Stop when enough molecules are processed
if head is not None and (i - 1) >= head:
break
# set unique molecule identifier
molecule.set_meta('mi', f'{molecule.get_a_reference_id()}_{i}')
# Write tag values
molecule.write_tags()
"""
if unphased_allele_resolver is not None: # write unphased allele tag:
molecule.write_allele_phasing_information_tag(
unphased_allele_resolver, 'ua')
"""
# Update read groups
for fragment in molecule:
rgid = fragment.get_read_group()
if not rgid in read_groups:
read_groups[rgid] = fragment.get_read_group(True)[1]
# Calculate molecule consensus
if consensus_model is not None:
calculate_consensus(molecule,
consensus_model,
i,
out,
**consensus_model_args)
# Write the reads to the output file
if not no_source_reads:
molecule.write_pysam(out)
except Exception as e:
write_status(out_bam_path,'FAIL, The file is not complete')
raise e
# Reached the end of the generator
write_status(out_bam_path,'Reached end. All ok!')
[docs]def write_status(output_path, message):
status_path = output_path.replace('.bam','.status.txt')
with open(status_path,'w') as o:
o.write(message+'\n')
[docs]def run_multiome_tagging_cmd(commandline):
args = argparser.parse_args(commandline)
run_multiome_tagging(args)
[docs]def run_multiome_tagging(args):
"""
Run multiome tagging adds molecule information
Arguments:
bamin (str) : bam file to process, sam files can also be supplied but will be converted
o(str) : path to output bam file
method(str): Protocol to tag, select from:nla, qflag, chic, nla_transcriptome, vasa, cs, nla_taps ,chic_taps, nla_no_overhang, scartrace
qflagger(str): Query flagging algorithm to use, this algorithm extracts UMI and sample information from your reads. When no query flagging algorithm is specified, the `singlecellmultiomics.universalBamTagger.universalBamTagger.QueryNameFlagger` is used
method(str) : Method name, what kind of molecules need to be extracted. Select from:
nla (Data with digested by Nla III enzyme)
qflag (Only add basic tags like sampple and UMI, no molecule assignment)
chic (Data digested using mnase fusion)
nla_transcriptome (Data with transcriptome and genome digested by Nla III )
vasa (VASA transcriptomic data)
cs (CELseq data, 1 and 2)
cs_feature_counts (Single end, deduplicate using a bam file tagged using featurecounts, deduplicates a umi per gene)
fl_feature_counts (deduplicate using a bam file tagged using featurecounts, deduplicates based on fragment position)
nla_taps (Data with digested by Nla III enzyme and methylation converted by TAPS)
chic_taps (Data with digested by mnase enzyme and methylation converted by TAPS), chic_taps_transcriptome (Same as chic_taps, but includes annotations)
chic_nla
scartrace (lineage tracing protocol)
custom_flags(str): Arguments passed to the query name flagger, comma separated "MI,RX,bi,SM"
ref(str) : Path to reference fasta file, autodected from bam header when not supplied
umi_hamming_distance(int) : Max hamming distance on UMI's
head (int) : Amount of molecules to process
contig (str) : only process this contig
region_start(int) : Zero based start coordinate of single region to process
region_end(int) : Zero based end coordinate of single region to process, None: all contigs when contig is not set, complete contig when contig is set.
alleles (str) : path to allele VCF
allele_samples(str): Comma separated samples to extract from the VCF file. For example B6,SPRET
unphased_alleles(str) : Path to VCF containing unphased germline SNPs
mapfile (str) : 'Path to \*.safe.bgzf file, used to decide if molecules are uniquely mappable, generate one using createMapabilityIndex.py
annotmethod (int) : Annotation resolving method. 0: molecule consensus aligned blocks. 1: per read per aligned base
cluster (bool) : Run contigs in separate cluster jobs
resolve_unproperly_paired_reads(bool) : When enabled bamtagmultiome will look through the complete bam file in a hunt for the mate, the two mates will always end up in 1 molecule if both present in the bam file. This also works when the is_proper_pair bit is not set. Use this option when you want to find the breakpoints of genomic re-arrangements.
no_rejects(bool) : Do not write rejected reads
mem (int) : Amount of gigabytes to request for cluster jobs
time(int) : amount of wall clock hours to request for cluster jobs
exons(str): Path to exon annotation GTF file
introns(str): Path to intron annotation GTF file
consensus(bool) : Calculate molecule consensus read, this feature is _VERY_ experimental
consensus_model(str) : Path to consensus calling model, when none specified, this is learned based on the supplied bam file, ignoring sites supplied by -consensus_mask_variants
consensus_mask_variants(str): Path VCF file masked for training on consensus caller
consensus_n_train(int) : Amount of bases used for training the consensus model
no_source_reads(bool) : Do not write original reads, only consensus
scartrace_r1_primers(str) : comma separated list of R1 primers used in scartrace protocol
"""
MISC_ALT_CONTIGS_SCMO = 'MISC_ALT_CONTIGS_SCMO'
every_fragment_as_molecule = args.every_fragment_as_molecule
skip_contig = set(args.skip_contig.split(',')) if args.skip_contig is not None else set()
if not args.o.endswith('.bam'):
raise ValueError(
"Supply an output which ends in .bam, for example -o output.bam")
write_status(args.o,'unfinished')
# verify wether the input is SAM, if so we need to convert:
if args.bamin.endswith('.sam'):
print('Input file is in SAM format. Performing conversion.')
bam_path = args.bamin.replace('.sam','.bam')
args.bamin, _ = bam_path, sam_to_bam(args.bamin, bam_path)
# Verify wether the input file is indexed and sorted...
if not args.ignore_bam_issues:
verify_and_fix_bam(args.bamin)
for remove_existing_path in [args.o, f'{args.o}.bai']:
if os.path.exists(remove_existing_path):
print(f"Removing existing file {remove_existing_path}")
os.remove(remove_existing_path)
input_bam = pysam.AlignmentFile(args.bamin, "rb", ignore_truncation=args.ignore_bam_issues, threads=4)
# autodetect reference:
reference = None
if args.ref is None:
args.ref = get_reference_from_pysam_alignmentFile(input_bam)
if args.ref is not None:
try:
reference = UnitialisedClass(CachedFastaNoHandle, args.ref)
print(f'Loaded reference from {args.ref}')
except Exception as e:
print("Error when loading the reference file, continuing without a reference")
reference = None
##### Set up consensus
consensus_model_args = {'consensus_mode':None}
if args.consensus:
consensus_model_args = {'consensus_mode': 'majority'}
if args.no_source_reads:
consensus_model_args['no_source_reads'] = True
##### Define fragment and molecule class arguments and instances: ####
query_name_flagger = None
if args.qflagger is not None:
if args.qflagger == 'custom_flags':
query_name_flagger = CustomAssingmentQueryNameFlagger(
args.custom_flags.split(','))
elif args.qflagger == 'bulk':
query_name_flagger = BulkFlagger()
else:
raise ValueError("Select from 'custom_flags, ..' ")
molecule_class_args = {
'umi_hamming_distance': args.umi_hamming_distance,
'reference': reference
}
fragment_class_args = {
'read_group_format' : args.read_group_format
}
yield_invalid = True # if invalid reads should be written
yield_overflow = True # if overflow reads should be written
if args.max_fragment_size is not None:
fragment_class_args['max_fragment_size'] = args.max_fragment_size
if args.no_rejects:
yield_invalid = False
if args.no_overflow:
yield_overflow = False
ignore_conversions = None
if args.method == 'nla_taps' or args.method == 'chic_taps':
ignore_conversions = set([('C', 'T'), ('G', 'A')])
if args.alleles is not None and args.alleles!='none':
molecule_class_args['allele_resolver'] = singlecellmultiomics.alleleTools.AlleleResolver(
args.alleles,
select_samples=args.allele_samples.split(',') if args.allele_samples is not None else None,
lazyLoad=True,
use_cache=args.use_allele_cache,
verbose = args.set_allele_resolver_verbose,
ignore_conversions=ignore_conversions)
if args.mapfile is not None:
assert args.mapfile.endswith('safe.bgzf'), 'the mapfile name should end with safe.bgzf '
molecule_class_args['mapability_reader'] = MapabilityReader(args.mapfile)
### Transcriptome configuration ###
if args.method in ('cs', 'vasa', 'chict') or args.method.endswith('_transcriptome'):
print(
colorama.Style.BRIGHT +
'Running in transcriptome annotation mode' +
colorama.Style.RESET_ALL)
if args.exons is None :
raise ValueError("Supply an exon GTF file")
if args.introns is not None and args.exons is None:
raise ValueError("Please supply both intron and exon GTF files")
transcriptome_features = singlecellmultiomics.features.FeatureContainer(verbose=args.feature_container_verbose)
print("Loading exons", end='\r')
transcriptome_features.preload_GTF(
path=args.exons,
select_feature_type=['exon'],
identifierFields=(
'exon_id',
'gene_id'),
store_all=True,
contig=args.contig,
head=None)
if args.introns is not None:
print("Loading introns", end='\r')
transcriptome_features.preload_GTF(
path=args.introns,
select_feature_type=['intron'],
identifierFields=['transcript_id'],
store_all=True,
contig=args.contig,
head=None)
print("All features loaded")
# Add more molecule class arguments
transcriptome_feature_args = {
'features': transcriptome_features,
'auto_set_intron_exon_features': True,
}
bp_per_job = 10_000_000
pooling_method=1
bp_per_segment = 999_999_999 #@todo make this None or so
fragment_size = 500
one_contig_per_process=False
max_time_per_segment = args.max_time_per_segment
### Method specific configuration ###
if args.method == 'qflag':
molecule_class = singlecellmultiomics.molecule.Molecule
fragment_class = singlecellmultiomics.fragment.FragmentStartPosition
# Write all reads
yield_invalid = True
bp_per_job = 3_000_000
bp_per_segment = 3_000_000
fragment_size = 0
elif args.method == 'chic':
molecule_class = singlecellmultiomics.molecule.CHICMolecule
fragment_class = singlecellmultiomics.fragment.CHICFragment
bp_per_job = 5_000_000
bp_per_segment = 50_000
fragment_size = 1000
if max_time_per_segment is None:
max_time_per_segment = 20*60 #
elif args.method == 'nla' or args.method == 'nla_no_overhang':
molecule_class = singlecellmultiomics.molecule.NlaIIIMolecule
fragment_class = singlecellmultiomics.fragment.NlaIIIFragment
if args.method == 'nla_no_overhang':
assert reference is not None, 'Supply a reference fasta using -ref!'
fragment_class_args.update({
'reference': reference,
'no_overhang': True
})
bp_per_job = 10_000_000
bp_per_segment = 10_000_000
fragment_size = 1000
elif args.method == 'chic_nla':
molecule_class=singlecellmultiomics.molecule.CHICNLAMolecule
fragment_class=singlecellmultiomics.fragment.CHICFragment
assert reference is not None, 'Supply a reference fasta using -ref!'
molecule_class_args.update({
'reference': reference,
})
bp_per_job = 5_000_000
bp_per_segment = 50_000
fragment_size = 1000
elif args.method == 'cs_feature_counts' :
molecule_class = singlecellmultiomics.molecule.Molecule
fragment_class = singlecellmultiomics.fragment.FeatureCountsSingleEndFragment
elif args.method == 'fl_feature_counts':
molecule_class = singlecellmultiomics.molecule.Molecule
fragment_class = singlecellmultiomics.fragment.FeatureCountsFullLengthFragment
elif args.method == 'episeq':
molecule_class = singlecellmultiomics.molecule.Molecule
fragment_class = singlecellmultiomics.fragment.FeatureCountsSingleEndFragment
elif args.method == 'nla_transcriptome':
molecule_class = singlecellmultiomics.molecule.AnnotatedNLAIIIMolecule
fragment_class = singlecellmultiomics.fragment.NlaIIIFragment
molecule_class_args.update(transcriptome_feature_args)
molecule_class_args.update({
'stranded': None # data is not stranded
})
elif args.method == 'chict' :
molecule_class = singlecellmultiomics.molecule.AnnotatedCHICMolecule
fragment_class = singlecellmultiomics.fragment.CHICFragment
bp_per_job = 100_000_000 # One contig at a time to prevent reading the annotations too much
bp_per_segment = 500_000
fragment_size = 50_000
molecule_class_args.update(transcriptome_feature_args)
elif args.method == 'nla_taps':
molecule_class = singlecellmultiomics.molecule.TAPSNlaIIIMolecule
fragment_class = singlecellmultiomics.fragment.NlaIIIFragment
molecule_class_args.update({
'reference': reference,
'taps': singlecellmultiomics.molecule.TAPS()
})
if args.consensus:
bp_per_job = 2_000_000
else:
bp_per_job = 5_000_000
bp_per_segment = 1_000_000
fragment_size = 1000
elif args.method == 'nla_taps_transcriptome': # Annotates reads in transcriptome
molecule_class = singlecellmultiomics.molecule.AnnotatedTAPSNlaIIIMolecule
fragment_class = singlecellmultiomics.fragment.NlaIIIFragment
molecule_class_args.update(transcriptome_feature_args)
molecule_class_args.update({
'reference': reference,
'taps': singlecellmultiomics.molecule.TAPS()
})
bp_per_job = 5_000_000
bp_per_segment = 5_000_000
fragment_size = 100_000
elif args.method == 'nla_tapsp_transcriptome': # Annotates reads in transcriptome
molecule_class = singlecellmultiomics.molecule.TAPSPTaggedMolecule
fragment_class = singlecellmultiomics.fragment.NlaIIIFragment
molecule_class_args.update(transcriptome_feature_args)
molecule_class_args.update({
'reference': reference,
'taps': singlecellmultiomics.molecule.TAPS()
})
bp_per_job = 5_000_000
bp_per_segment = 5_000_000
fragment_size = 100_000
elif args.method == 'chic_taps_transcriptome':
bp_per_job = 5_000_000
bp_per_segment = 5_000_000
fragment_size = 100_000
molecule_class_args.update({
'reference': reference,
'taps': singlecellmultiomics.molecule.TAPS(),
'taps_strand':'R'
})
molecule_class_args.update(transcriptome_feature_args)
molecule_class = singlecellmultiomics.molecule.AnnotatedTAPSCHICMolecule
fragment_class = singlecellmultiomics.fragment.CHICFragment
elif args.method == 'chic_taps':
bp_per_job = 5_000_000
bp_per_segment = 1_000_000
fragment_size = 500
molecule_class_args.update({
'reference': reference,
'taps': singlecellmultiomics.molecule.TAPS(),
'taps_strand':'R'
})
molecule_class = singlecellmultiomics.molecule.TAPSCHICMolecule
fragment_class = singlecellmultiomics.fragment.CHICFragment
elif args.method == 'vasa' or args.method == 'cs':
one_contig_per_process=True
bp_per_job = 5_000_000
bp_per_segment = 1_000_000
fragment_size = 100_000
molecule_class = singlecellmultiomics.molecule.TranscriptMolecule
fragment_class = singlecellmultiomics.fragment.SingleEndTranscriptFragment
#molecule_class_args.update(transcriptome_feature_args)
fragment_class_args.update(transcriptome_feature_args)
fragment_class_args.update({'stranded': True })
molecule_class_args.update({'cache_size':1000})
elif args.method == 'scartrace':
molecule_class = singlecellmultiomics.molecule.ScarTraceMolecule
fragment_class = singlecellmultiomics.fragment.ScarTraceFragment
r1_primers = args.scartrace_r1_primers.split(',')
fragment_class_args.update({
'scartrace_r1_primers': r1_primers,
#'reference': reference
})
else:
raise ValueError("Supply a valid method")
# Allow or disallow cycle shift:
if args.allow_cycle_shift and fragment_class is singlecellmultiomics.fragment.NlaIIIFragment:
fragment_class_args['allow_cycle_shift'] = True
# This disables restriction motif checking
if args.no_restriction_motif_check:
fragment_class_args['check_motif'] = False
# This disables umi_cigar_processing:
if args.no_umi_cigar_processing:
fragment_class_args['no_umi_cigar_processing'] = True
if args.max_associated_fragments is not None:
molecule_class_args['max_associated_fragments'] = args.max_associated_fragments
if args.assignment_radius is not None:
fragment_class_args['assignment_radius'] = args.assignment_radius
if args.multiprocess:
one_contig_per_process=True
#raise NotImplementedError('-assignment_radius is currently incompatible with --multiprocess')
if args.multiprocess:
# Force one contig per process for now.
print("Forcing one contig per process")
one_contig_per_process=True
if args.libname is not None:
fragment_class_args['library_name'] = args.libname
# This decides what molecules we will traverse
if args.contig == MISC_ALT_CONTIGS_SCMO:
contig = None
else:
contig = args.contig
# This decides to only extract a single genomic region:
if args.region_start is not None:
if args.region_end is None:
raise ValueError('When supplying -region_start then also supply -region_end')
region_start = args.region_start
region_end = args.region_end
else:
region_start = None
region_end = None
# Overwrite this flag when it is set:
if args.one_contig_per_process:
one_contig_per_process=True
last_update = datetime.now()
init_time = datetime.now()
if args.molecule_iterator_verbosity_interval is not None and (args.molecule_iterator_verbose or (args.stats_file_path is not None )):
stats_handle = None
if args.stats_file_path is not None:
stats_handle = open(args.stats_file_path,'w')
def progress_callback_function( iteration, mol_iter, reads ):
nonlocal last_update
nonlocal init_time
nonlocal stats_handle
now = datetime.now()
diff = (datetime.now()-last_update).total_seconds()
if diff>args.molecule_iterator_verbosity_interval:
diff_from_init = (datetime.now()-init_time).total_seconds()
_contig, _pos = None, None
for read in reads:
if read is not None:
_contig, _pos = read.reference_name, read.reference_start
if args.molecule_iterator_verbose:
print( f'{mol_iter.yielded_fragments} fragments written, {mol_iter.deleted_fragments} fragments deleted ({(mol_iter.deleted_fragments/(mol_iter.deleted_fragments + mol_iter.yielded_fragments))*100:.2f} %), current pos: {_contig}, {_pos}, {mol_iter.waiting_fragments} fragments waiting ' , end='\r')
if stats_handle is not None:
stats_handle.write(f'{diff_from_init}\t{mol_iter.waiting_fragments}\t{mol_iter.yielded_fragments}\t{mol_iter.deleted_fragments}\t{_contig}\t{_pos}\n')
stats_handle.flush()
last_update = now
else:
progress_callback_function = None
molecule_iterator_args = {
#'alignments': input_bam,
'query_name_flagger': query_name_flagger,
'molecule_class': molecule_class,
'fragment_class': fragment_class,
'molecule_class_args': molecule_class_args,
'fragment_class_args': fragment_class_args,
'yield_invalid': yield_invalid,
'yield_overflow': yield_overflow,
'start': region_start,
'end': region_end,
'contig': contig,
'every_fragment_as_molecule': every_fragment_as_molecule,
'skip_contigs':skip_contig,
'progress_callback_function':progress_callback_function,
'pooling_method' : pooling_method,
'perform_allele_clustering': args.haplo_molecule_assignment and molecule_class_args.get('allele_resolver', None) is not None
}
if args.resolve_unproperly_paired_reads:
molecule_iterator_args['iterator_class'] = MatePairIteratorIncludingNonProper
if args.contig == MISC_ALT_CONTIGS_SCMO:
# When MISC_ALT_CONTIGS_SCMO is set as argument, all molecules with reads
# mapping to a contig returning True from the is_main_chromosome
# function are used
def Misc_contig_molecule_generator(input_bam, **molecule_iterator_args):
for reference in input_bam.references:
if not is_main_chromosome(reference):
molecule_iterator_args['contig'] = reference
yield from MoleculeIterator(input_bam, **molecule_iterator_args)
molecule_iterator = Misc_contig_molecule_generator
else:
molecule_iterator = MoleculeIterator
if args.method == 'qflag':
molecule_iterator_args['every_fragment_as_molecule'] = True
molecule_iterator_args['iterator_class'] = ReadIterator
#####
consensus_model_path = None
# We needed to check if every argument is properly placed. If so; the jobs
# can be sent to the cluster
if args.cluster:
if args.contig is None:
write_status(args.o,'Submitting jobs. If this file remains, a job failed.')
# Create jobs for all chromosomes:
unique_id = str(uuid.uuid4())
temp_prefix = os.path.abspath(os.path.dirname(
args.o)) + '/SCMO_' + unique_id
hold_merge = []
## Create folder to store cluster files:
if args.clusterdir is None:
cluster_file_folder = os.path.abspath(os.path.dirname(
args.o)) + '/cluster'
else:
cluster_file_folder = args.clusterdir
print(f'Writing cluster scripts and standard out and error to {cluster_file_folder}')
if not os.path.exists(cluster_file_folder):
try:
os.makedirs(cluster_file_folder,exist_ok=True)
except Exception as e:
print(e)
pass
found_alts = 0
files_to_merge = []
for ci,chrom in enumerate([_chrom for _chrom in
(list(input_bam.references) + [MISC_ALT_CONTIGS_SCMO])
if not _chrom in skip_contig]):
if not is_main_chromosome(chrom):
found_alts += 1
continue
if chrom == MISC_ALT_CONTIGS_SCMO and found_alts == 0:
continue
temp_bam_path = f'{temp_prefix}_{chrom}.bam'
if os.path.exists(temp_bam_path):
print(f"Removing existing temporary file {temp_bam_path}")
os.remove(temp_bam_path)
arguments = " ".join(
[x for x in sys.argv if not x == args.o and x != '-o']) + f" -contig {chrom} -o {temp_bam_path}"
files_to_merge.append(temp_bam_path)
if consensus_model_path is not None:
arguments += f' -consensus_model {consensus_model_path}'
job = f'SCMULTIOMICS_{ci}_{unique_id}'
write_status(temp_bam_path,'SUBMITTED')
job_id = submit_job(f'{arguments};', job_name=job, target_directory=cluster_file_folder, working_directory=None,
threads_n=1, memory_gb=args.mem, time_h=args.time, scheduler=args.sched, copy_env=True,
email=None, mail_when_finished=False, hold=None,submit=True)
print(f'Job for contig {chrom} submitted with job id: {job_id}')
hold_merge.append(job_id)
hold = hold_merge
job = f'SCMULTIOMICS_MERGE_{unique_id}'
if args.sched == 'local':
hold = None
final_status = args.o.replace('.bam','.status.txt')
# Create list of output files
command = f'samtools merge -@ 4 -c {args.o} {" ".join(files_to_merge)} && samtools index {args.o} && rm {temp_prefix}*.ba* && rm {temp_prefix}*.status.txt && echo "All done" > {final_status}'
final_job_id = submit_job(f'{command};', job_name=job, target_directory=cluster_file_folder, working_directory=None,
threads_n=4, memory_gb=10, time_h=args.time, scheduler=args.sched, copy_env=True,
email=None, mail_when_finished=False, hold=hold,submit=True)
print(f'final job id is:{final_job_id}')
exit()
#####
# Load unphased variants to memory
"""
unphased_allele_resolver = None
if args.unphased_alleles is not None:
unphased_allele_resolver = singlecellmultiomics.alleleTools.AlleleResolver(
use_cache=args.use_allele_cache,
phased=False, ignore_conversions=ignore_conversions,verbose = args.set_allele_resolver_verbose)
try:
for i, variant in enumerate(
pysam.VariantFile(
args.unphased_alleles).fetch(
args.contig)):
if 'PASS' not in list(variant.filter):
continue
if not all(
len(allele) == 1 for allele in variant.alleles) or len(
variant.alleles) != 2:
continue
if sum([len(set(variant.samples[sample].alleles))
== 2 for sample in variant.samples]) < 2:
# Not heterozygous
continue
unphased_allele_resolver.locationToAllele[variant.chrom][variant.pos - 1] = {
variant.alleles[0]: {'U'}, variant.alleles[1]: {'V'}}
except Exception as e: # todo catch this more nicely
print(e)
"""
if args.multiprocess:
print("Tagging using multi-processing")
tag_multiome_multi_processing(input_bam_path=args.bamin, out_bam_path=args.o, molecule_iterator=molecule_iterator,
molecule_iterator_args=molecule_iterator_args,ignore_bam_issues=args.ignore_bam_issues,
head=args.head, no_source_reads=args.no_source_reads,
fragment_size=fragment_size, blacklist_path=args.blacklist,bp_per_job=bp_per_job,
bp_per_segment=bp_per_segment, temp_folder_root=args.temp_folder, max_time_per_segment=max_time_per_segment,
additional_args=consensus_model_args, n_threads=args.tagthreads, one_contig_per_process=one_contig_per_process,
job_bed_file=args.jobbed
)
else:
if consensus_model_args.get('consensus_mode') is not None:
raise NotImplementedError('Please use --multiprocess')
# Alignments are passed as pysam handle:
if args.blacklist is not None:
raise NotImplementedError("Blacklist can only be used with --multiprocess")
tag_multiome_single_thread(
args.bamin,
args.o,
molecule_iterator = molecule_iterator,
molecule_iterator_args = molecule_iterator_args,
consensus_model = None ,
consensus_model_args={},
ignore_bam_issues=False,
head=args.head,
no_source_reads=args.no_source_reads
)
if __name__ == '__main__':
args = argparser.parse_args()
run_multiome_tagging(args)