From 9b41102025b5f187004e52c55612cce991f3da36 Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Wed, 13 Jun 2018 22:58:32 +0100 Subject: [PATCH 1/8] Implement flatMap --- lib/index.js | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/lib/index.js b/lib/index.js index 314f937..def2137 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2167,6 +2167,28 @@ addMethod('doto', function (f) { Stream.prototype.tap = Stream.prototype.doto; _.tap = _.doto; +/** + * Applies a function to each value from the source, and re-emits the + * source value + * + * @id flatTap + * @section Transforms + * @name Stream.flatTap(f) + * @param {Function} f - the function to apply + * @api public + * + * _([1, 2, 3]).flatTap(httpLog) + */ + +addMethod('flatTap', function (f) { + return this.flatMap(function (x) { + return f(x) + .flatMap(function () { + return _([x]); + }); + }); +}); + /** * Limits number of values through the stream to a maximum of number of values * per window. Errors are not limited but allowed to pass through as soon as From 2d8a7121cdf23cbf88894f507cfc228d1efa4844 Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Wed, 13 Jun 2018 23:12:34 +0100 Subject: [PATCH 2/8] Tests for flatTap --- test/test.js | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/test.js b/test/test.js index 467ecf3..234be58 100755 --- a/test/test.js +++ b/test/test.js @@ -4909,6 +4909,30 @@ exports['tap - doto alias'] = function (test) { test.done(); }; +exports.flatTap = function (test) { + test.expect(2); + + var seen; + function record(x) { + var y = x * 2; + seen.push(y); + return _([y]); + } + + seen = []; + _.flatTap(record, [1, 2, 3, 4]).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.same(seen, [2, 4, 6, 8]); + }); + test.done(); +}; + +exports['flatTap - noValueOnError'] = noValueOnErrorTest(_.doto(function (x) { return x; })); + +exports['flatTap - returnsSameStream'] = returnsSameStreamTest(function(s) { + return s.doto(function (x) { return x; }); +}); + exports.flatMap = function (test) { var f = function (x) { return _(function (push, next) { From 7172f193d6ba471003d22cb01f64e32866053f7a Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Wed, 13 Jun 2018 23:16:52 +0100 Subject: [PATCH 3/8] Same stream test for flatTap --- test/test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.js b/test/test.js index 234be58..f7257c6 100755 --- a/test/test.js +++ b/test/test.js @@ -4930,7 +4930,7 @@ exports.flatTap = function (test) { exports['flatTap - noValueOnError'] = noValueOnErrorTest(_.doto(function (x) { return x; })); exports['flatTap - returnsSameStream'] = returnsSameStreamTest(function(s) { - return s.doto(function (x) { return x; }); + return s.flatTap(function (x) { return _([x]); }); }); exports.flatMap = function (test) { From 1a2fb233c08aa6861e0a3196635b1bd1d915f51d Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Thu, 14 Jun 2018 11:31:29 +0100 Subject: [PATCH 4/8] flatTap - noValueOnError test --- test/test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.js b/test/test.js index f7257c6..5fc0ba8 100755 --- a/test/test.js +++ b/test/test.js @@ -4927,7 +4927,7 @@ exports.flatTap = function (test) { test.done(); }; -exports['flatTap - noValueOnError'] = noValueOnErrorTest(_.doto(function (x) { return x; })); +exports['flatTap - noValueOnError'] = noValueOnErrorTest(_.flatTap(function (x) { return _(); })); exports['flatTap - returnsSameStream'] = returnsSameStreamTest(function(s) { return s.flatTap(function (x) { return _([x]); }); From d3ee895aa639a496c816b4e557a7a1b90c0bde20 Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Thu, 14 Jun 2018 11:49:25 +0100 Subject: [PATCH 5/8] flatTap - More tests for each stream source. --- test/test.js | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/test/test.js b/test/test.js index 5fc0ba8..023dc9f 100755 --- a/test/test.js +++ b/test/test.js @@ -4933,6 +4933,62 @@ exports['flatTap - returnsSameStream'] = returnsSameStreamTest(function(s) { return s.flatTap(function (x) { return _([x]); }); }); +exports['flatTap - argument function throws'] = function (test) { + test.expect(4); + var err = new Error('error'); + var s = _([1, 2, 3, 4]).flatTap(function (x) { + if (x === 1) { throw err; } + if (x === 2) { _(['hello']); } + if (x === 3) { throw err; } + return _(['world']); + }); + + s.pull(errorEquals(test, 'error')); + s.pull(valueEquals(test, 2)); + s.pull(anyError(test)); + s.pull(valueEquals(test, 4)); + test.done(); +}; + +exports['flatTap - ArrayStream'] = function (test) { + var f = function (x) { + return _(function (push, next) { + setTimeout(function () { + push(null, x * 2); + push(null, _.nil); + }, 10); + }); + }; + _([1, 2, 3, 4]).flatTap(f).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.done(); + }); +}; + +exports['flatTap - GeneratorStream'] = function (test) { + var f = function (x) { + return _(function (push, next) { + setTimeout(function () { + push(null, x * 2); + push(null, _.nil); + }, 10); + }); + }; + var s = _(function (push, next) { + push(null, 1); + setTimeout(function () { + push(null, 2); + push(null, 3); + push(null, 4); + push(null, _.nil); + }, 10); + }); + s.flatTap(f).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.done(); + }); +}; + exports.flatMap = function (test) { var f = function (x) { return _(function (push, next) { From 768f6c27d390e0452c611e1451d9d69faf3499ea Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Mon, 22 Oct 2018 21:35:34 +0100 Subject: [PATCH 6/8] Update tests & use consume instead --- lib/index.js | 17 +++++-- test/test.js | 123 ++++++++++++++++++++++----------------------------- 2 files changed, 65 insertions(+), 75 deletions(-) diff --git a/lib/index.js b/lib/index.js index def2137..a565ba0 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2182,10 +2182,19 @@ _.tap = _.doto; addMethod('flatTap', function (f) { return this.flatMap(function (x) { - return f(x) - .flatMap(function () { - return _([x]); - }); + return f(x).consume(function (err, y, push, next) { + if (err) { + // next(); + push(null, _.nil); + } + else if (y === _.nil) { + push(null, x); + push(null, _.nil); + } + else { + next(); + } + }); }); }); diff --git a/test/test.js b/test/test.js index 023dc9f..bef8bc0 100755 --- a/test/test.js +++ b/test/test.js @@ -4909,84 +4909,65 @@ exports['tap - doto alias'] = function (test) { test.done(); }; -exports.flatTap = function (test) { - test.expect(2); - - var seen; - function record(x) { - var y = x * 2; - seen.push(y); - return _([y]); - } - - seen = []; - _.flatTap(record, [1, 2, 3, 4]).toArray(function (xs) { - test.same(xs, [1, 2, 3, 4]); - test.same(seen, [2, 4, 6, 8]); - }); - test.done(); -}; - -exports['flatTap - noValueOnError'] = noValueOnErrorTest(_.flatTap(function (x) { return _(); })); - -exports['flatTap - returnsSameStream'] = returnsSameStreamTest(function(s) { - return s.flatTap(function (x) { return _([x]); }); -}); -exports['flatTap - argument function throws'] = function (test) { - test.expect(4); - var err = new Error('error'); - var s = _([1, 2, 3, 4]).flatTap(function (x) { - if (x === 1) { throw err; } - if (x === 2) { _(['hello']); } - if (x === 3) { throw err; } - return _(['world']); - }); - - s.pull(errorEquals(test, 'error')); - s.pull(valueEquals(test, 2)); - s.pull(anyError(test)); - s.pull(valueEquals(test, 4)); - test.done(); -}; - -exports['flatTap - ArrayStream'] = function (test) { - var f = function (x) { - return _(function (push, next) { - setTimeout(function () { - push(null, x * 2); - push(null, _.nil); - }, 10); +exports.flatTap = { + 'flatTap - noValueOnError': noValueOnErrorTest(_.flatTap(function (x) { return _([x]); })), + 'flatTap - returnsSameStream': returnsSameStreamTest(function (s) { + return s.flatTap(function (x) { return _([2]); }); + }, [1], [1]), + 'flatTap - argument function throws': function (test) { + test.expect(4); + var err = new Error('error'); + var s = _([1, 2, 3, 4]).flatTap(function (x) { + if (x === 1) { throw err; } + if (x === 2) { _(['hello']); } + if (x === 3) { throw err; } + return _(['world']); }); - }; - _([1, 2, 3, 4]).flatTap(f).toArray(function (xs) { - test.same(xs, [1, 2, 3, 4]); - test.done(); - }); -}; -exports['flatTap - GeneratorStream'] = function (test) { - var f = function (x) { - return _(function (push, next) { + s.pull(errorEquals(test, 'error')); + s.pull(valueEquals(test, 2)); + s.pull(anyError(test)); + s.pull(valueEquals(test, 4)); + test.done(); + }, + 'flatTap - ArrayStream': function (test) { + var f = function (x) { + return _(function (push, next) { + setTimeout(function () { + push(null, x * 2); + push(null, _.nil); + }, 10); + }); + }; + _([1, 2, 3, 4]).flatTap(f).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.done(); + }); + }, + 'flatTap - GeneratorStream': function (test) { + var f = function (x) { + return _(function (push, next) { + setTimeout(function () { + push(null, x * 2); + push(null, _.nil); + }, 10); + }); + }; + var s = _(function (push, next) { + push(null, 1); setTimeout(function () { - push(null, x * 2); + push(null, 2); + push(null, 3); + push(null, 4); push(null, _.nil); }, 10); }); - }; - var s = _(function (push, next) { - push(null, 1); - setTimeout(function () { - push(null, 2); - push(null, 3); - push(null, 4); - push(null, _.nil); - }, 10); - }); - s.flatTap(f).toArray(function (xs) { - test.same(xs, [1, 2, 3, 4]); - test.done(); - }); + s.flatTap(f).toArray(function (xs) { + test.same(xs, [1, 2, 3, 4]); + test.done(); + }); + } }; exports.flatMap = function (test) { From 18a7add4b152e6ae5d2b333608e48494075a124a Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Wed, 24 Oct 2018 09:36:52 +0100 Subject: [PATCH 7/8] FlatTap - Improve tests & Use consume rather than just FlatMap --- lib/index.js | 13 ++++++++++--- test/test.js | 41 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/lib/index.js b/lib/index.js index a565ba0..9ccfb60 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2168,8 +2168,8 @@ Stream.prototype.tap = Stream.prototype.doto; _.tap = _.doto; /** - * Applies a function to each value from the source, and re-emits the - * source value + * Applies a function, which must return a (possibly empty) highland stream, to each value from the source, and re-emits the + * source value when the function return ends. * * @id flatTap * @section Transforms @@ -2177,7 +2177,14 @@ _.tap = _.doto; * @param {Function} f - the function to apply * @api public * - * _([1, 2, 3]).flatTap(httpLog) + * const httpLog = H(new Promnise(res => { + * setTimeout(() => { + * console.log('Log'); + * res('Success') + * }, 3000) + * })) + * + * _([1, 2, 3]).flatTap(httpLog) // Will emit [1,2,3] */ addMethod('flatTap', function (f) { diff --git a/test/test.js b/test/test.js index bef8bc0..fc2e92a 100755 --- a/test/test.js +++ b/test/test.js @@ -4911,10 +4911,37 @@ exports['tap - doto alias'] = function (test) { exports.flatTap = { - 'flatTap - noValueOnError': noValueOnErrorTest(_.flatTap(function (x) { return _([x]); })), + 'flatTap - noValueOnError': noValueOnErrorTest(_.flatTap(function (x) { return _(); })), 'flatTap - returnsSameStream': returnsSameStreamTest(function (s) { return s.flatTap(function (x) { return _([2]); }); }, [1], [1]), + 'flatTap - On a empty stream': function (test) { + test.expect(1); + var s = _([]).flatTap(function (x) { + return _([6]); + }); + + s.pull(valueEquals(test, _.nil)); + test.done(); + }, + 'flatTap - Returns an Empty stream': function (test) { + test.expect(1); + var s = _([3]).flatTap(function (x) { + return _([]); + }); + + s.pull(valueEquals(test, 3)); + test.done(); + }, + 'flatTap - Emits an multiple values': function (test) { + test.expect(1); + var s = _([3]).flatTap(function (x) { + return _([5, 6, 7, 8]); + }); + + s.pull(valueEquals(test, 3)); + test.done(); + }, 'flatTap - argument function throws': function (test) { test.expect(4); var err = new Error('error'); @@ -4932,24 +4959,31 @@ exports.flatTap = { test.done(); }, 'flatTap - ArrayStream': function (test) { + var seen = []; var f = function (x) { return _(function (push, next) { setTimeout(function () { - push(null, x * 2); + var y = x * 2; + seen.push(y); + push(null, y); push(null, _.nil); }, 10); }); }; _([1, 2, 3, 4]).flatTap(f).toArray(function (xs) { test.same(xs, [1, 2, 3, 4]); + test.same(seen, [2, 4, 6, 8]); test.done(); }); }, 'flatTap - GeneratorStream': function (test) { + var seen = []; var f = function (x) { return _(function (push, next) { setTimeout(function () { - push(null, x * 2); + var y = x * 2; + seen.push(y); + push(null, y); push(null, _.nil); }, 10); }); @@ -4965,6 +4999,7 @@ exports.flatTap = { }); s.flatTap(f).toArray(function (xs) { test.same(xs, [1, 2, 3, 4]); + test.same(seen, [2, 4, 6, 8]); test.done(); }); } From 689c3567328b54557ca2e66b10f166ee7b301077 Mon Sep 17 00:00:00 2001 From: Tom Whale Date: Thu, 25 Oct 2018 14:39:00 +0100 Subject: [PATCH 8/8] FlatTap - Push errors back onto the stream, and respective tests --- lib/index.js | 4 ++-- test/test.js | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/lib/index.js b/lib/index.js index 9ccfb60..2fb0827 100755 --- a/lib/index.js +++ b/lib/index.js @@ -2177,7 +2177,7 @@ _.tap = _.doto; * @param {Function} f - the function to apply * @api public * - * const httpLog = H(new Promnise(res => { + * const httpLog = _(new Promnise(res => { * setTimeout(() => { * console.log('Log'); * res('Success') @@ -2191,7 +2191,7 @@ addMethod('flatTap', function (f) { return this.flatMap(function (x) { return f(x).consume(function (err, y, push, next) { if (err) { - // next(); + push(err); push(null, _.nil); } else if (y === _.nil) { diff --git a/test/test.js b/test/test.js index fc2e92a..7aaa0d5 100755 --- a/test/test.js +++ b/test/test.js @@ -4942,6 +4942,21 @@ exports.flatTap = { s.pull(valueEquals(test, 3)); test.done(); }, + 'flatTap - Emits an error values': function (test) { + test.expect(4); + var s = _([1, 2, 3, 4]).flatTap(function (x) { + return _(function (push) { + push({errorMessage: 'error'}); + push(null, _.nil); + }); + }); + + s.pull(anyError(test)); + s.pull(anyError(test)); + s.pull(anyError(test)); + s.pull(anyError(test)); + test.done(); + }, 'flatTap - argument function throws': function (test) { test.expect(4); var err = new Error('error');