diff --git a/lib/index.js b/lib/index.js index c9e6deb..60c4cd3 100755 --- a/lib/index.js +++ b/lib/index.js @@ -3554,6 +3554,64 @@ Stream.prototype.debounce = function (ms) { }; exposeMethod('debounce'); + +/** + * Ends a source stream when a given stream ends or emits a value. + * + * @id takeUntil + * @section Higher-order Streams + * @name Stream.takeUntil(stream) + * @param {Stream} stream - another stream which stops the source + * @api public + * + * a = _() + * b = _() + * a.takeUntil(b) + * a.each(_.log) + * a.write(1) + * // => 1 + * a.write(2) + * // => 2 + * b.write(1) + * // nothing + * a.write(3) + * // nothing + */ + +Stream.prototype.takeUntil = function (stream) { + if (!_.isStream(stream)) { + throw new Error('Invalid stream to takeUntil:', stream); + } + var first = true, + done = false; + + return this.consume(function (err, x, push, next) { + if (first) { + stream.pull(function() { + if (!done) { + push(null, _.nil); + done = true; + } + }); + first = false; + } + if (err) { + push(err); + next(); + } + else if (x === _.nil) { + done = true; + push(null, x); + } + else if (!done) { + push(null, x); + next(); + } + }); +}; +exposeMethod('takeUntil'); + + /** * Creates a new Stream, which when read from, only returns the last * seen value from the source. The source stream does not experience diff --git a/test/test.js b/test/test.js index 741f65f..6e344f8 100755 --- a/test/test.js +++ b/test/test.js @@ -1545,6 +1545,130 @@ exports['wrap EventEmitter (or jQuery) on handler with args wrapping by array'] }); }; + +exports['takeUntil'] = { + setUp: function (callback) { + this.clock = sinon.useFakeTimers(); + callback(); + }, + tearDown: function (callback) { + this.clock.restore(); + callback(); + }, + 'invalid stream': function (test) { + test.throws(function () { + _([1,2,3]).takeUntil(10); + }); + test.done(); + }, + 'async generator': function (test) { + function delay(push, ms, x) { + setTimeout(function () { + push(null, x); + }, ms); + } + var source = _(function (push, next) { + delay(push, 10, 1); + delay(push, 20, 2); + delay(push, 30, 3); + // should be stopped + delay(push, 40, 4); + delay(push, 50, 5); + delay(push, 60, _.nil); + }) + var stopStream = _(function (push, next) { + delay(push, 25, 1); + delay(push, 35, _.nil); + }) + var results = []; + source.takeUntil(stopStream).each(function (x) { + results.push(x); + }); + this.clock.tick(10); + test.same(results, [1]); + this.clock.tick(10); + test.same(results, [1, 2]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(20); + test.same(results, [1, 2, 3]); + test.done(); + }, + 'toplevel - async generator': function (test) { + function delay(push, ms, x) { + setTimeout(function () { + push(null, x); + }, ms); + } + var source = _(function (push, next) { + delay(push, 10, 1); + delay(push, 20, 2); + delay(push, 30, 3); + // should be stopped + delay(push, 40, 4); + delay(push, 50, 5); + delay(push, 60, _.nil); + }) + var stopStream = _(function (push, next) { + delay(push, 25, 1); + delay(push, 35, _.nil); + }) + var results = []; + _.takeUntil(stopStream, source).each(function (x) { + results.push(x); + }); + this.clock.tick(10); + test.same(results, [1]); + this.clock.tick(10); + test.same(results, [1, 2]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(20); + test.same(results, [1, 2, 3]); + test.done(); + }, + 'toplevel - partial application, async generator': function (test) { + function delay(push, ms, x) { + setTimeout(function () { + push(null, x); + }, ms); + } + var source = _(function (push, next) { + delay(push, 10, 1); + delay(push, 20, 2); + delay(push, 30, 3); + // should be stopped + delay(push, 40, 4); + delay(push, 50, 5); + delay(push, 60, _.nil); + }) + var stopStream = _(function (push, next) { + delay(push, 25, 1); + delay(push, 35, _.nil); + }) + var results = []; + _.takeUntil(stopStream)(source).each(function (x) { + results.push(x); + }); + this.clock.tick(10); + test.same(results, [1]); + this.clock.tick(10); + test.same(results, [1, 2]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(10); + test.same(results, [1, 2, 3]); + this.clock.tick(20); + test.same(results, [1, 2, 3]); + test.done(); + }, + 'noValueOnError': noValueOnErrorTest(_.takeUntil(_())) +}; + exports['sequence'] = function (test) { _.sequence([[1,2], [3], [[4],5]]).toArray(function (xs) { test.same(xs, [1,2,3,[4],5]);