diff --git a/lib/index.js b/lib/index.js index 314f937..2fb0827 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2167,6 +2167,44 @@ addMethod('doto', function (f) { Stream.prototype.tap = Stream.prototype.doto; _.tap = _.doto; +/** + * Applies a function, which must return a (possibly empty) highland stream, to each value from the source, and re-emits the + * source value when the function return ends. + * + * @id flatTap + * @section Transforms + * @name Stream.flatTap(f) + * @param {Function} f - the function to apply + * @api public + * + * const httpLog = _(new Promnise(res => { + * setTimeout(() => { + * console.log('Log'); + * res('Success') + * }, 3000) + * })) + * + * _([1, 2, 3]).flatTap(httpLog) // Will emit [1,2,3] + */ + +addMethod('flatTap', function (f) { + return this.flatMap(function (x) { + return f(x).consume(function (err, y, push, next) { + if (err) { + push(err); + push(null, _.nil); + } + else if (y === _.nil) { + push(null, x); + push(null, _.nil); + } + else { + next(); + } + }); + }); +}); + /** * Limits number of values through the stream to a maximum of number of values * per window. Errors are not limited but allowed to pass through as soon as diff --git a/test/test.js b/test/test.js index 467ecf3..7aaa0d5 100755 --- a/test/test.js +++ b/test/test.js @@ -4909,6 +4909,117 @@ exports['tap - doto alias'] = function (test) { test.done(); }; + +exports.flatTap = { + 'flatTap - noValueOnError': noValueOnErrorTest(_.flatTap(function (x) { return _(); })), + 'flatTap - returnsSameStream': returnsSameStreamTest(function (s) { + return s.flatTap(function (x) { return _([2]); }); + }, [1], [1]), + 'flatTap - On a empty stream': function (test) { + test.expect(1); + var s = _([]).flatTap(function (x) { + return _([6]); + }); + + s.pull(valueEquals(test, _.nil)); + test.done(); + }, + 'flatTap - Returns an Empty stream': function (test) { + test.expect(1); + var s = _([3]).flatTap(function (x) { + return _([]); + }); + + s.pull(valueEquals(test, 3)); + test.done(); + }, + 'flatTap - Emits an multiple values': function (test) { + test.expect(1); + var s = _([3]).flatTap(function (x) { + return _([5, 6, 7, 8]); + }); + + s.pull(valueEquals(test, 3)); + test.done(); + }, + 'flatTap - Emits an error values': function (test) { + test.expect(4); + var s = _([1, 2, 3, 4]).flatTap(function (x) { + return _(function (push) { + push({errorMessage: 'error'}); + push(null, _.nil); + }); + }); + + s.pull(anyError(test)); + s.pull(anyError(test)); + s.pull(anyError(test)); + s.pull(anyError(test)); + test.done(); + }, + 'flatTap - argument function throws': function (test) { + test.expect(4); + var err = new Error('error'); + var s = _([1, 2, 3, 4]).flatTap(function (x) { + if (x === 1) { throw err; } + if (x === 2) { _(['hello']); } + if (x === 3) { throw err; } + return _(['world']); + }); + + s.pull(errorEquals(test, 'error')); + s.pull(valueEquals(test, 2)); + s.pull(anyError(test)); + s.pull(valueEquals(test, 4)); + test.done(); + }, + 'flatTap - ArrayStream': function (test) { + var seen = []; + var f = function (x) { + return _(function (push, next) { + setTimeout(function () { + var y = x * 2; + seen.push(y); + push(null, y); + push(null, _.nil); + }, 10); + }); + }; + _([1, 2, 3, 4]).flatTap(f).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.same(seen, [2, 4, 6, 8]); + test.done(); + }); + }, + 'flatTap - GeneratorStream': function (test) { + var seen = []; + var f = function (x) { + return _(function (push, next) { + setTimeout(function () { + var y = x * 2; + seen.push(y); + push(null, y); + push(null, _.nil); + }, 10); + }); + }; + var s = _(function (push, next) { + push(null, 1); + setTimeout(function () { + push(null, 2); + push(null, 3); + push(null, 4); + push(null, _.nil); + }, 10); + }); + s.flatTap(f).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.same(seen, [2, 4, 6, 8]); + test.done(); + }); + } +}; + exports.flatMap = function (test) { var f = function (x) { return _(function (push, next) {