From b4aee98bea6e5b769b16991e686db840bd72e85a Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Tue, 1 Nov 2022 10:49:26 +0300 Subject: [PATCH 1/9] feat: add take_while and drop_while --- README.md | 2 ++ src/superstream/stream.py | 8 +++++++- tests/test_stream.py | 10 ++++++++++ 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 872ab14..130cad2 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,8 @@ pip install superstream | peek | - | peek 在 java stream 多为调试功能, python stream 将不会实现,可用 map 并返回元素本身代替 | | limit | limit | | | skip | skip | | +| takeWhile | take_while | | +| dropWhile | drop_while | | | forEach | for_each | | | reduce | reduce | | | count | count | | diff --git a/src/superstream/stream.py b/src/superstream/stream.py index e59e749..240c59d 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -1,6 +1,6 @@ from functools import reduce from typing import TypeVar, Callable, List, Set, Generic, Dict, Iterable, Optional, Any -from itertools import islice, chain, count, starmap +from itertools import islice, chain, count, starmap, takewhile, dropwhile from collections import deque T = TypeVar('T') @@ -75,6 +75,12 @@ def limit(self, max_size: int) -> 'Stream[T]': def skip(self, n: int) -> 'Stream[T]': return Stream(islice(self._stream, n, None)) + def take_while(self, pred: Callable[[T], bool]) -> 'Stream[T]': + return Stream(takewhile(pred, self._stream)) + + def drop_while(self, pred: Callable[[T], bool]) -> 'Stream[T]': + return Stream(dropwhile(pred, self._stream)) + def min(self, key: Callable[[T], Any] = None, default: T = None) -> Optional[T]: """ :param default: use default value when stream is empty diff --git a/tests/test_stream.py b/tests/test_stream.py index b67b6a0..4fe6573 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -82,6 +82,16 @@ def test_skip(self): b = Stream(a).skip(1).to_list() assert b == [2, 3] + def test_take_while(self): + a = [1, 4, 6, 4, 1] + b = Stream(a).take_while(lambda x: x < 5).to_list() + assert b == [1, 4] + + def test_drop_while(self): + a = [1, 4, 6, 4, 1] + b = Stream(a).drop_while(lambda x: x < 5).to_list() + assert b == [6, 4, 1] + def test_min(self): a = [{'a': 1}, {'a': 2}] b = Stream(a).min(key=lambda x: x['a']) From 48057fb5ec20172997c248e227d2f200331c287c Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 14:01:29 +0300 Subject: [PATCH 2/9] fix: rename starmap to star_map, to keep the naming style consistent --- README.md | 2 +- src/superstream/stream.py | 2 +- tests/test_stream.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 130cad2..b3713b4 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ pip install superstream | ------------------- | ------------- | ---------------------------------------------------------------------------------------- | | filter | filter | | | map | map | | -| - | starmap | map 的 \* 参数解包版本 | +| - | star_map | map 的 \* 参数解包版本 | | flatMap | flat_map | | | collect | collect | | | - | collects | collect 对应的中间操作,collects(collector) 相当于 Java 中的 collect(collector).stream() | diff --git a/src/superstream/stream.py b/src/superstream/stream.py index 240c59d..11241fe 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -24,7 +24,7 @@ def of(*args: T) -> 'Stream[T]': def map(self, func: Callable[[T], R]) -> 'Stream[R]': return Stream(map(func, self._stream)) - def starmap(self, func: Callable[..., R]) -> 'Stream[R]': + def star_map(self, func: Callable[..., R]) -> 'Stream[R]': return Stream(starmap(func, self._stream)) def flat_map(self, func: Callable[[T], 'Stream[R]']) -> 'Stream[R]': diff --git a/tests/test_stream.py b/tests/test_stream.py index 4fe6573..d23ae7d 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -10,9 +10,9 @@ def test_map(self): b = Stream(a).map(lambda x: x * 2).to_list() assert b == [2, 4, 6] - def test_starmap(self): + def test_star_map(self): a = [(2,5), (3,2), (10,3)] - b = Stream(a).starmap(pow).to_list() + b = Stream(a).star_map(pow).to_list() assert b == [32, 9, 1000] def test_flat_map(self): From af01182be27564a43df14dda4eda76107e3bcb11 Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 14:06:49 +0300 Subject: [PATCH 3/9] feat: add peek, implemented with laziness --- README.md | 2 +- src/superstream/stream.py | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b3713b4..6e517f6 100644 --- a/README.md +++ b/README.md @@ -64,7 +64,7 @@ pip install superstream | collect(toMap) | to_map | to_dict 的别名,保持与 Java 的一致 | | distinct | distinct | | | sorted | sorted | | -| peek | - | peek 在 java stream 多为调试功能, python stream 将不会实现,可用 map 并返回元素本身代替 | +| peek | peek | | | limit | limit | | | skip | skip | | | takeWhile | take_while | | diff --git a/src/superstream/stream.py b/src/superstream/stream.py index 11241fe..9f228d7 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -37,6 +37,11 @@ def for_each(self, func: Callable[[T], None]) -> None: for i in self._stream: func(i) + def peek(self, func: Callable[[T], None]) -> 'Stream[T]': + for i in self._stream: + func(i) + yield i + def distinct(self): return Stream(list(dict.fromkeys(self._stream))) From 7e4fb98c6f84445d437d28e089188d0fd6acaa9c Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 14:11:15 +0300 Subject: [PATCH 4/9] refactor: speed up for_each, by using deque to avoid explicit for loop --- src/superstream/stream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/superstream/stream.py b/src/superstream/stream.py index 9f228d7..50a811a 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -34,8 +34,7 @@ def filter(self, func: Callable[[T], bool]) -> 'Stream[T]': return Stream(filter(func, self._stream)) def for_each(self, func: Callable[[T], None]) -> None: - for i in self._stream: - func(i) + deque(map(func, self._stream), maxlen=0) def peek(self, func: Callable[[T], None]) -> 'Stream[T]': for i in self._stream: From 59b3e3631896050fef0bcd6f2f2065da1aaa244e Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 14:14:25 +0300 Subject: [PATCH 5/9] refactor: speed up group_by, by using deque to avoid explicit for loop --- src/superstream/stream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/superstream/stream.py b/src/superstream/stream.py index 50a811a..cd28623 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -57,8 +57,9 @@ def sum(self, start: T = 0) -> T: def group_by(self, classifier: Callable[[T], K]) -> Dict[K, List[T]]: groups = {} - for i in self._stream: + def _classify(i): groups.setdefault(classifier(i), []).append(i) + deque(map(_classify, self._stream), maxlen=0) return groups def reduce(self, func: Callable[[T, T], T], initial: T = _initial_missing) -> Optional[T]: From 90ee5727b8bb8bfc0b3661be0a2b25e732ab928d Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 15:18:17 +0300 Subject: [PATCH 6/9] Revert "refactor: speed up group_by, by using deque to avoid explicit for loop" This reverts commit ca21311f0c9c0f41598ad5679fb56bd05ae20e11. --- src/superstream/stream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/superstream/stream.py b/src/superstream/stream.py index cd28623..50a811a 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -57,9 +57,8 @@ def sum(self, start: T = 0) -> T: def group_by(self, classifier: Callable[[T], K]) -> Dict[K, List[T]]: groups = {} - def _classify(i): + for i in self._stream: groups.setdefault(classifier(i), []).append(i) - deque(map(_classify, self._stream), maxlen=0) return groups def reduce(self, func: Callable[[T, T], T], initial: T = _initial_missing) -> Optional[T]: From 6b85df1f169a9b6cf5c8b521df7390086f96a0c4 Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 15:33:53 +0300 Subject: [PATCH 7/9] feat: make distinct lazy, and compatible with seq of unhashable items --- src/superstream/stream.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/superstream/stream.py b/src/superstream/stream.py index 50a811a..f568f23 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -10,6 +10,23 @@ _initial_missing = object() +def unique_everseen(iterable, key=None): + seenset = set() + seenset_add = seenset.add + seenlist = [] + seenlist_add = seenlist.append + use_key = key is not None + for element in iterable: + k = key(element) if use_key else element + try: + if k not in seenset: + seenset_add(k) + yield element + except TypeError: + if k not in seenlist: + seenlist_add(k) + yield element + class Stream(Generic[T]): def __init__(self, stream: Iterable[T]): self._stream = iter(stream) @@ -41,8 +58,8 @@ def peek(self, func: Callable[[T], None]) -> 'Stream[T]': func(i) yield i - def distinct(self): - return Stream(list(dict.fromkeys(self._stream))) + def distinct(self, key: Callable[[T], K] = None) -> 'Stream[T]': + return Stream(unique_everseen(self._stream, key)) def sorted(self, key=None, reverse=False) -> 'Stream[T]': return Stream(sorted(self._stream, key=key, reverse=reverse)) From 066bcef161f23f5dd643f4a247b8c2552acc2b4a Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 15:38:02 +0300 Subject: [PATCH 8/9] feat: add optional parameter for map, to support multiple processing --- src/superstream/stream.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/superstream/stream.py b/src/superstream/stream.py index f568f23..1e60455 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -38,8 +38,17 @@ def __iter__(self): def of(*args: T) -> 'Stream[T]': return Stream(args) - def map(self, func: Callable[[T], R]) -> 'Stream[R]': - return Stream(map(func, self._stream)) + def map(self, func: Callable[[T], R], pool_map=None) -> 'Stream[R]': + """ + :param pool_map: a map method of a multiple processing pool + for example: + multiprocessing.Pool.map + multiprocessing.dummy.Pool.imap + multiprocess.Pool.map_async + pathos.multiprocessing.ProcessPool.uimap + """ + map_method = pool_map or map + return Stream(map_method(func, self._stream)) def star_map(self, func: Callable[..., R]) -> 'Stream[R]': return Stream(starmap(func, self._stream)) From c6d7dd8a992758b5d7d40c69db72b88a48387cd7 Mon Sep 17 00:00:00 2001 From: Qing Zhan <9190969+zq46@users.noreply.github.com> Date: Wed, 2 Nov 2022 15:45:11 +0300 Subject: [PATCH 9/9] feat: add zip_by, zip_with, concat_by, concat_with for convenience --- src/superstream/stream.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/superstream/stream.py b/src/superstream/stream.py index 1e60455..8d6280d 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -167,3 +167,15 @@ def collect(self, func: Callable[[Iterable[T]], R]) -> R: def collects(self, func: Callable[[Iterable[T]], Iterable[R]]) -> 'Stream[R]': return Stream(func(self._stream)) + + def zip_by(self, *iterables: Iterable[Any]) -> 'Stream[(T, *Any)]': + return Stream(zip(*iterables, self._stream)) + + def zip_with(self, *iterables: Iterable[Any]) -> 'Stream[(T, *Any)]': + return Stream(zip(self._stream, *iterables)) + + def concat_by(self, iterables: Iterable[T]) -> 'Stream[T]': + return Stream(chain(iterables, self._stream)) + + def concat_with(self, iterables: Iterable[T]) -> 'Stream[T]': + return Stream(chain(self._stream, iterables))