From f2037851b73fdf8be0c96f0a435686ecdc506ab9 Mon Sep 17 00:00:00 2001 From: Daniel Standage Date: Thu, 27 Jul 2017 14:38:06 -0700 Subject: [PATCH 1/4] Refactoring the sample loading code, taking advantage of new bulk loading functions in khmer --- kevlar/count.py | 22 +++++-- kevlar/counting.py | 132 +++++++++++++++++-------------------- kevlar/novel.py | 10 +-- kevlar/tests/test_count.py | 8 ++- 4 files changed, 87 insertions(+), 85 deletions(-) diff --git a/kevlar/count.py b/kevlar/count.py index a947d60..3e8c704 100644 --- a/kevlar/count.py +++ b/kevlar/count.py @@ -15,6 +15,12 @@ import kevlar +def split_infiles_outfiles(filelist): + outfiles = [flist[0] for flist in filelist] + infilelists = [flist[1:] for flist in filelist] + return outfiles, infilelists + + def main(args): if (args.num_bands is None) is not (args.band is None): raise ValueError('Must specify --num-bands and --band together') @@ -24,10 +30,12 @@ def main(args): timer.start('loadctrl') print('[kevlar::count] Loading control samples', file=args.logfile) + outfiles, infilelists = split_infiles_outfiles(args.control) controls = kevlar.counting.load_samples_with_dilution( - args.control, args.ksize, args.memory, memfraction=args.mem_frac, - maxfpr=args.max_fpr, maxabund=args.ctrl_max, masks=None, - numbands=args.num_bands, band=args.band, logfile=args.logfile + infilelists, args.ksize, args.memory, outfiles=outfiles, + memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, + mask=None, numbands=args.num_bands, band=args.band, + logfile=args.logfile ) elapsed = timer.stop('loadctrl') numcontrols = len(controls) @@ -36,10 +44,12 @@ def main(args): print('[kevlar::count] Loading case samples', file=args.logfile) timer.start('loadcase') + outfiles, infilelists = split_infiles_outfiles(args.case) cases = kevlar.counting.load_samples_with_dilution( - args.case, args.ksize, args.memory, memfraction=args.mem_frac, - maxfpr=args.max_fpr, maxabund=args.ctrl_max, masks=controls, - numbands=args.num_bands, band=args.band, logfile=args.logfile + infilelists, args.ksize, args.memory, outfiles=outfiles, + memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, + mask=controls[0], numbands=args.num_bands, band=args.band, + logfile=args.logfile ) elapsed = timer.stop('loadcase') numcases = len(cases) diff --git a/kevlar/counting.py b/kevlar/counting.py index 4cf164c..3088aff 100644 --- a/kevlar/counting.py +++ b/kevlar/counting.py @@ -19,38 +19,44 @@ class KevlarSampleIOError(ValueError): pass +class KevlarOutfileMismatchError(ValueError): + pass + + def load_sample_seqfile(seqfiles, ksize, memory, maxfpr=0.2, - masks=None, maskmaxabund=1, numbands=None, band=None, + mask=None, numbands=None, band=None, outfile=None, logfile=sys.stderr): """ Compute k-mer abundances for the specified sequence input. Expected input is a list of one or more FASTA/FASTQ files corresponding to a single sample. A counttable is created and populated with abundances - of all k-mers observed in the input. + of all k-mers observed in the input. If `mask` is provided, only k-mers not + present in the mask will be loaded. """ message = 'loading sample from ' + ','.join(seqfiles) print('[kevlar::counting] ', message, file=logfile) sketch = khmer.Counttable(ksize, memory / 4, 4) n, nkmers = 0, 0 - for n, read in enumerate(kevlar.multi_file_iter_khmer(seqfiles), 1): - for subseq in kevlar.clean_subseqs(read.sequence, ksize): - for kmer in sketch.get_kmers(subseq): - if numbands: - khash = sketch.hash(kmer) - if khash & (numbands - 1) != band - 1: - continue - if masks: - for mask in masks: - if mask.get(kmer) > maskmaxabund: - break - else: - sketch.add(kmer) - nkmers += 1 - else: - sketch.add(kmer) - nkmers += 1 + + for seqfile in seqfiles: + if mask: + if numbands: + nr, nk = sketch.consume_seqfile_banding_with_mask( + seqfile, numbands, band, mask + ) + else: + nr, nk = sketch.consume_seqfile_with_mask(seqfile, mask) + else: + if numbands: + nr, nk = sketch.consume_seqfile_banding( + seqfile, numbands, band + ) + else: + nr, nk = sketch.consume_seqfile(seqfile) + n += nr + nkmers += nk message = 'done loading reads' if numbands: @@ -62,77 +68,60 @@ def load_sample_seqfile(seqfiles, ksize, memory, maxfpr=0.2, if fpr > maxfpr: message += ' (FPR too high, bailing out!!!)' raise SystemExit(message) - else: - if outfile: - if not outfile.endswith(('.ct', '.counttable')): - outfile += '.counttable' - sketch.save(outfile) - message += '; saved to "{:s}"'.format(outfile) - print('[kevlar::counting] ', message, file=logfile) + if outfile: + if not outfile.endswith(('.ct', '.counttable')): + outfile += '.counttable' + sketch.save(outfile) + message += '; saved to "{:s}"'.format(outfile) + print('[kevlar::counting] ', message, file=logfile) return sketch -def load_samples(samplelists, ksize, memory, maxfpr=0.2, numbands=None, - band=None, logfile=sys.stderr): +def load_samples_with_dilution(samplelists, ksize, memory, mask=None, + memfraction=None, maxfpr=0.2, maxabund=1, + numbands=None, band=None, + outfiles=None, logfile=sys.stderr): """ Load a group of related samples using a memory-efficient strategy. - Samples loaded initially are used as masks for subsequently loaded samples. - The first sample is allocated the full amount of memory, while subsequent - samples require only a fraction since they are first checked against the - mask(s). - """ - numsamples = len(samplelists) - message = 'computing k-mer abundances for {:d} samples'.format(numsamples) - print('[kevlar::counting] ', message, file=logfile) - - sketches = list() - for seqfiles in samplelists: - sketch = load_sample_seqfile( - seqfiles, ksize, memory, maxfpr=maxfpr, numbands=numbands, - band=band, outfile=None, logfile=logfile - ) - sketches.append(sketch) - return sketches - + By default, each sample is loaded into a dedicated counttable, which occupy + `memory` bytes of memory each. Setting `memfraction` to a value between 0.0 + and 1.0 will activate "masked" mode. -def load_samples_with_dilution(samplelists, ksize, memory, memfraction=0.1, - maxfpr=0.2, maxabund=1, masks=None, - numbands=None, band=None, skipsave=False, - logfile=sys.stderr): - """ - Load a group of related samples using a memory-efficient strategy. + If `mask` is provided, it serves as a mask for all other samples. If it is + not provided, the first sample is loaded normally and then serves as a mask + for all subsequent samples. - Samples loaded initially are used as masks for subsequently loaded samples. - The first sample is allocated the full amount of memory, while subsequent - samples require only a fraction since they are first checked against the - mask(s). + In "masked mode", sample uses only `memory * memfraction` bytes of memory, + and any k-mer present in the mask (above a given threshold `maxabund`) is + ignored. In this way, we avoid taking up space storing abundances for + k-mers we know we're not interested in. """ numsamples = len(samplelists) + if outfiles is None: + outfiles = [None] * numsamples + if numsamples != len(outfiles): + message = '# of samples ({:d}) '.format(numsamples) + message += 'does not match # of outfiles ({:d})'.format(len(outfiles)) + raise KevlarOutfileMismatchError(message) message = 'computing k-mer abundances for {:d} samples'.format(numsamples) print('[kevlar::counting] ', message, file=logfile) sketches = list() - for samplelist in samplelists: - if len(samplelist) < 2: - message = 'must specify an output file and at least one input file' - raise KevlarSampleIOError(message) - outfile = samplelist[0] - seqfiles = samplelist[1:] - if masks: - mymasks = masks + for seqfiles, outfile in zip(samplelists, outfiles): + if mask: + mymask = mask sketchmem = memory * memfraction - elif len(sketches) == 0: - mymasks = None + elif len(sketches) == 0 or memfraction is None: + mymask = None sketchmem = memory else: - mymasks = sketches + mymask = sketches[0] sketchmem = memory * memfraction sketch = load_sample_seqfile( - seqfiles, ksize, sketchmem, maxfpr=maxfpr, masks=mymasks, - maskmaxabund=maxabund, numbands=numbands, band=band, - outfile=outfile, logfile=logfile + seqfiles, ksize, sketchmem, maxfpr=maxfpr, mask=mymask, + numbands=numbands, band=band, outfile=outfile, logfile=logfile ) sketches.append(sketch) return sketches @@ -150,7 +139,6 @@ def load_samples_sketchfiles(sketchfiles, maxfpr=0.2, logfile=sys.stderr): if fpr > maxfpr: message += ' (FPR too high, bailing out!!!)' raise SystemExit(message) - else: - print(message, file=logfile) + print(message, file=logfile) sketches.append(sketch) return sketches diff --git a/kevlar/novel.py b/kevlar/novel.py index 86b4df7..7d23ab4 100644 --- a/kevlar/novel.py +++ b/kevlar/novel.py @@ -66,9 +66,10 @@ def main(args): args.control_counts, args.max_fpr, args.logfile ) else: - controls = kevlar.counting.load_samples( + controls = kevlar.counting.load_samples_with_dilution( args.control, args.ksize, args.memory, maxfpr=args.max_fpr, - numbands=args.num_bands, band=args.band, logfile=args.logfile + memfraction=None, numbands=args.num_bands, band=args.band, + logfile=args.logfile ) elapsed = timer.stop('loadctrl') message = 'Control samples loaded in {:.2f} sec'.format(elapsed) @@ -90,9 +91,10 @@ def main(args): args.case_counts, args.max_fpr, args.logfile ) else: - cases = kevlar.counting.load_samples( + cases = kevlar.counting.load_samples_with_dilution( args.case, args.ksize, args.memory, maxfpr=args.max_fpr, - numbands=args.num_bands, band=args.band, logfile=args.logfile + memfraction=None, numbands=args.num_bands, band=args.band, + logfile=args.logfile ) elapsed = timer.stop('loadcases') print('[kevlar::novel] Case samples loaded in {:.2f} sec'.format(elapsed), diff --git a/kevlar/tests/test_count.py b/kevlar/tests/test_count.py index 895b3eb..3d05065 100644 --- a/kevlar/tests/test_count.py +++ b/kevlar/tests/test_count.py @@ -14,12 +14,13 @@ import screed import kevlar from kevlar.tests import data_file, data_glob +import sys @pytest.mark.parametrize('numbands,band,kmers_stored', [ (0, 0, 15600), - (2, 1, 7992), - (16, 7, 1218), + (2, 1, 7663), + (16, 7, 859), ]) def test_count_simple(numbands, band, kmers_stored, capsys): with NamedTemporaryFile(suffix='.counttable') as ctrl1out, \ @@ -32,12 +33,13 @@ def test_count_simple(numbands, band, kmers_stored, capsys): '--case', caseout.name, case, '--control', ctrl1out.name, ctrls[0], '--control', ctrl2out.name, ctrls[1], - '--ksize', '25', '--memory', '5K', '--ctrl-max', '0', + '--ksize', '25', '--memory', '50K', '--ctrl-max', '0', '--num-bands', str(numbands), '--band', str(band), ] args = kevlar.cli.parser().parse_args(arglist) kevlar.count.main(args) out, err = capsys.readouterr() + print(err, file=sys.stderr) assert '600 reads processed' in str(err) assert '{:d} k-mers stored'.format(kmers_stored) in str(err) From 921da74c4c425404e223b1346ac9515e15c41fd5 Mon Sep 17 00:00:00 2001 From: Daniel Standage Date: Thu, 27 Jul 2017 16:33:23 -0700 Subject: [PATCH 2/4] Make memfrac optional; Closes #101 --- kevlar/cli/count.py | 11 ++++++----- kevlar/tests/test_count.py | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/kevlar/cli/count.py b/kevlar/cli/count.py index c28bc48..8153c31 100644 --- a/kevlar/cli/count.py +++ b/kevlar/cli/count.py @@ -60,9 +60,10 @@ def subparser(subparsers): mem_desc = """\ Specify how much memory to allocate for the sketch data structures - used to store k-mer counts. The first control sample will be - allocated the full amount of specifed `--memory`, and all subsequent - samples will be allocated a fraction thereof. + used to store k-mer counts. If `--mem-frac` is not set, all samples will be + allocated `MEM` bytes. If `--mem-frac` is set, then the first control + sample will be allocated `MEM` bytes, and all other samples will be + allocated `MEM * F` bytes. """ mem_desc = textwrap.dedent(mem_desc) memory_args = subparser.add_argument_group('Memory allocation', mem_desc) @@ -72,9 +73,9 @@ def subparser(subparsers): 'the initial control sample; default is 1M' ) memory_args.add_argument( - '-f', '--mem-frac', type=float, default=0.1, metavar='F', + '-f', '--mem-frac', type=float, default=None, metavar='F', help='fraction of the total memory to allocate to subsequent samples; ' - 'default is 0.1' + 'must be between 0.0 and 1.0' ) memory_args.add_argument( '--max-fpr', type=float, default=0.2, metavar='FPR', diff --git a/kevlar/tests/test_count.py b/kevlar/tests/test_count.py index 3d05065..961cbc5 100644 --- a/kevlar/tests/test_count.py +++ b/kevlar/tests/test_count.py @@ -33,7 +33,8 @@ def test_count_simple(numbands, band, kmers_stored, capsys): '--case', caseout.name, case, '--control', ctrl1out.name, ctrls[0], '--control', ctrl2out.name, ctrls[1], - '--ksize', '25', '--memory', '50K', '--ctrl-max', '0', + '--ksize', '25', '--ctrl-max', '0', + '--mem-frac', '0.1', '--memory', '50K', '--num-bands', str(numbands), '--band', str(band), ] args = kevlar.cli.parser().parse_args(arglist) From 7b740bc09cd7d3b71e251977a47af1f4c173efd9 Mon Sep 17 00:00:00 2001 From: Daniel Standage Date: Fri, 28 Jul 2017 11:28:20 -0700 Subject: [PATCH 3/4] Rename load_samples_with_dilution --> load_samples --- kevlar/count.py | 4 ++-- kevlar/counting.py | 7 +++---- kevlar/novel.py | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/kevlar/count.py b/kevlar/count.py index 3e8c704..b5b4670 100644 --- a/kevlar/count.py +++ b/kevlar/count.py @@ -31,7 +31,7 @@ def main(args): timer.start('loadctrl') print('[kevlar::count] Loading control samples', file=args.logfile) outfiles, infilelists = split_infiles_outfiles(args.control) - controls = kevlar.counting.load_samples_with_dilution( + controls = kevlar.counting.load_samples( infilelists, args.ksize, args.memory, outfiles=outfiles, memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, mask=None, numbands=args.num_bands, band=args.band, @@ -45,7 +45,7 @@ def main(args): print('[kevlar::count] Loading case samples', file=args.logfile) timer.start('loadcase') outfiles, infilelists = split_infiles_outfiles(args.case) - cases = kevlar.counting.load_samples_with_dilution( + cases = kevlar.counting.load_samples( infilelists, args.ksize, args.memory, outfiles=outfiles, memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, mask=controls[0], numbands=args.num_bands, band=args.band, diff --git a/kevlar/counting.py b/kevlar/counting.py index 3088aff..ea98177 100644 --- a/kevlar/counting.py +++ b/kevlar/counting.py @@ -78,10 +78,9 @@ def load_sample_seqfile(seqfiles, ksize, memory, maxfpr=0.2, return sketch -def load_samples_with_dilution(samplelists, ksize, memory, mask=None, - memfraction=None, maxfpr=0.2, maxabund=1, - numbands=None, band=None, - outfiles=None, logfile=sys.stderr): +def load_samples(samplelists, ksize, memory, mask=None, memfraction=None, + maxfpr=0.2, maxabund=1, numbands=None, band=None, + outfiles=None, logfile=sys.stderr): """ Load a group of related samples using a memory-efficient strategy. diff --git a/kevlar/novel.py b/kevlar/novel.py index 7d23ab4..e82e64c 100644 --- a/kevlar/novel.py +++ b/kevlar/novel.py @@ -66,7 +66,7 @@ def main(args): args.control_counts, args.max_fpr, args.logfile ) else: - controls = kevlar.counting.load_samples_with_dilution( + controls = kevlar.counting.load_samples( args.control, args.ksize, args.memory, maxfpr=args.max_fpr, memfraction=None, numbands=args.num_bands, band=args.band, logfile=args.logfile @@ -91,7 +91,7 @@ def main(args): args.case_counts, args.max_fpr, args.logfile ) else: - cases = kevlar.counting.load_samples_with_dilution( + cases = kevlar.counting.load_samples( args.case, args.ksize, args.memory, maxfpr=args.max_fpr, memfraction=None, numbands=args.num_bands, band=args.band, logfile=args.logfile From d233c3a67e40cad062b3973719ce2598b0d12613 Mon Sep 17 00:00:00 2001 From: Daniel Standage Date: Fri, 28 Jul 2017 16:49:51 -0700 Subject: [PATCH 4/4] Messy but roughly functional parallel counting --- kevlar/count.py | 31 ++++++++++++++++--------------- kevlar/counting.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 16 deletions(-) diff --git a/kevlar/count.py b/kevlar/count.py index b5b4670..0e31c91 100644 --- a/kevlar/count.py +++ b/kevlar/count.py @@ -31,8 +31,9 @@ def main(args): timer.start('loadctrl') print('[kevlar::count] Loading control samples', file=args.logfile) outfiles, infilelists = split_infiles_outfiles(args.control) - controls = kevlar.counting.load_samples( - infilelists, args.ksize, args.memory, outfiles=outfiles, + o2, i2 = split_infiles_outfiles(args.case) + kevlar.counting.load_simplex_parallel( + infilelists + i2, args.ksize, args.memory, outfiles=outfiles + o2, memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, mask=None, numbands=args.num_bands, band=args.band, logfile=args.logfile @@ -42,19 +43,19 @@ def main(args): message = '{:d} samples loaded in {:.2f} sec'.format(numcontrols, elapsed) print('[kevlar::count]', message, file=args.logfile) - print('[kevlar::count] Loading case samples', file=args.logfile) - timer.start('loadcase') - outfiles, infilelists = split_infiles_outfiles(args.case) - cases = kevlar.counting.load_samples( - infilelists, args.ksize, args.memory, outfiles=outfiles, - memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, - mask=controls[0], numbands=args.num_bands, band=args.band, - logfile=args.logfile - ) - elapsed = timer.stop('loadcase') - numcases = len(cases) - message = '{:d} sample(s) loaded in {:.2f} sec'.format(numcases, elapsed) - print('[kevlar::count]', message, file=args.logfile) + #print('[kevlar::count] Loading case samples', file=args.logfile) + #timer.start('loadcase') + #outfiles, infilelists = split_infiles_outfiles(args.case) + #cases = kevlar.counting.load_samples( + # infilelists, args.ksize, args.memory, outfiles=outfiles, + # memfraction=args.mem_frac, maxfpr=args.max_fpr, maxabund=args.ctrl_max, + # mask=controls[0], numbands=args.num_bands, band=args.band, + # logfile=args.logfile + #) + #elapsed = timer.stop('loadcase') + #numcases = len(cases) + #message = '{:d} sample(s) loaded in {:.2f} sec'.format(numcases, elapsed) + #print('[kevlar::count]', message, file=args.logfile) total = timer.stop() message = 'Total time: {:.2f} seconds'.format(total) diff --git a/kevlar/counting.py b/kevlar/counting.py index ea98177..9ee8de3 100644 --- a/kevlar/counting.py +++ b/kevlar/counting.py @@ -8,6 +8,7 @@ # ----------------------------------------------------------------------------- from __future__ import print_function +import multiprocessing import re import sys @@ -109,7 +110,7 @@ def load_samples(samplelists, ksize, memory, mask=None, memfraction=None, sketches = list() for seqfiles, outfile in zip(samplelists, outfiles): - if mask: + if mask and memfraction is not None: mymask = mask sketchmem = memory * memfraction elif len(sketches) == 0 or memfraction is None: @@ -126,6 +127,37 @@ def load_samples(samplelists, ksize, memory, mask=None, memfraction=None, return sketches +def load_simplex_parallel(samplelists, ksize, memory, mask=None, + memfraction=None, maxfpr=0.2, maxabund=1, + numbands=None, band=None, outfiles=None, + logfile=sys.stderr): + numsamples = len(samplelists) + if outfiles is None or numsamples != len(outfiles): + message = '# of samples ({:d}) '.format(numsamples) + message += 'does not match # of outfiles ({:d})'.format(len(outfiles)) + raise KevlarOutfileMismatchError(message) + message = 'computing k-mer abundances for {:d} samples'.format(numsamples) + print('[kevlar::counting] ', message, file=logfile) + + procs = list() + mymask = None + sketchmem = memory + for seqfiles, outfile in zip(samplelists, outfiles): + process = multiprocessing.Process( + target=load_sample_seqfile, + args=(seqfiles, ksize, sketchmem), + kwargs={ + 'maxfpr': maxfpr, 'mask': mymask, 'numbands': numbands, + 'band': band, 'outfile': outfile, 'logfile': logfile + }, + ) + process.start() + procs.append(process) + + for process in procs: + process.join() + + def load_samples_sketchfiles(sketchfiles, maxfpr=0.2, logfile=sys.stderr): """Load samples from pre-computed k-mer abundances.""" sketches = list()