diff --git a/README.md b/README.md index 872ab14..6e517f6 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ pip install superstream | ------------------- | ------------- | ---------------------------------------------------------------------------------------- | | filter | filter | | | map | map | | -| - | starmap | map 的 \* 参数解包版本 | +| - | star_map | map 的 \* 参数解包版本 | | flatMap | flat_map | | | collect | collect | | | - | collects | collect 对应的中间操作,collects(collector) 相当于 Java 中的 collect(collector).stream() | @@ -64,9 +64,11 @@ pip install superstream | collect(toMap) | to_map | to_dict 的别名,保持与 Java 的一致 | | distinct | distinct | | | sorted | sorted | | -| peek | - | peek 在 java stream 多为调试功能, python stream 将不会实现,可用 map 并返回元素本身代替 | +| peek | peek | | | limit | limit | | | skip | skip | | +| takeWhile | take_while | | +| dropWhile | drop_while | | | forEach | for_each | | | reduce | reduce | | | count | count | | diff --git a/src/superstream/stream.py b/src/superstream/stream.py index e59e749..8d6280d 100644 --- a/src/superstream/stream.py +++ b/src/superstream/stream.py @@ -1,6 +1,6 @@ from functools import reduce from typing import TypeVar, Callable, List, Set, Generic, Dict, Iterable, Optional, Any -from itertools import islice, chain, count, starmap +from itertools import islice, chain, count, starmap, takewhile, dropwhile from collections import deque T = TypeVar('T') @@ -10,6 +10,23 @@ _initial_missing = object() +def unique_everseen(iterable, key=None): + seenset = set() + seenset_add = seenset.add + seenlist = [] + seenlist_add = seenlist.append + use_key = key is not None + for element in iterable: + k = key(element) if use_key else element + try: + if k not in seenset: + seenset_add(k) + yield element + except TypeError: + if k not in seenlist: + seenlist_add(k) + yield element + class Stream(Generic[T]): def __init__(self, stream: Iterable[T]): self._stream = iter(stream) @@ -21,10 +38,19 @@ def __iter__(self): def of(*args: T) -> 'Stream[T]': return Stream(args) - def map(self, func: Callable[[T], R]) -> 'Stream[R]': - return Stream(map(func, self._stream)) + def map(self, func: Callable[[T], R], pool_map=None) -> 'Stream[R]': + """ + :param pool_map: a map method of a multiple processing pool + for example: + multiprocessing.Pool.map + multiprocessing.dummy.Pool.imap + multiprocess.Pool.map_async + pathos.multiprocessing.ProcessPool.uimap + """ + map_method = pool_map or map + return Stream(map_method(func, self._stream)) - def starmap(self, func: Callable[..., R]) -> 'Stream[R]': + def star_map(self, func: Callable[..., R]) -> 'Stream[R]': return Stream(starmap(func, self._stream)) def flat_map(self, func: Callable[[T], 'Stream[R]']) -> 'Stream[R]': @@ -34,11 +60,15 @@ def filter(self, func: Callable[[T], bool]) -> 'Stream[T]': return Stream(filter(func, self._stream)) def for_each(self, func: Callable[[T], None]) -> None: + deque(map(func, self._stream), maxlen=0) + + def peek(self, func: Callable[[T], None]) -> 'Stream[T]': for i in self._stream: func(i) + yield i - def distinct(self): - return Stream(list(dict.fromkeys(self._stream))) + def distinct(self, key: Callable[[T], K] = None) -> 'Stream[T]': + return Stream(unique_everseen(self._stream, key)) def sorted(self, key=None, reverse=False) -> 'Stream[T]': return Stream(sorted(self._stream, key=key, reverse=reverse)) @@ -75,6 +105,12 @@ def limit(self, max_size: int) -> 'Stream[T]': def skip(self, n: int) -> 'Stream[T]': return Stream(islice(self._stream, n, None)) + def take_while(self, pred: Callable[[T], bool]) -> 'Stream[T]': + return Stream(takewhile(pred, self._stream)) + + def drop_while(self, pred: Callable[[T], bool]) -> 'Stream[T]': + return Stream(dropwhile(pred, self._stream)) + def min(self, key: Callable[[T], Any] = None, default: T = None) -> Optional[T]: """ :param default: use default value when stream is empty @@ -131,3 +167,15 @@ def collect(self, func: Callable[[Iterable[T]], R]) -> R: def collects(self, func: Callable[[Iterable[T]], Iterable[R]]) -> 'Stream[R]': return Stream(func(self._stream)) + + def zip_by(self, *iterables: Iterable[Any]) -> 'Stream[(T, *Any)]': + return Stream(zip(*iterables, self._stream)) + + def zip_with(self, *iterables: Iterable[Any]) -> 'Stream[(T, *Any)]': + return Stream(zip(self._stream, *iterables)) + + def concat_by(self, iterables: Iterable[T]) -> 'Stream[T]': + return Stream(chain(iterables, self._stream)) + + def concat_with(self, iterables: Iterable[T]) -> 'Stream[T]': + return Stream(chain(self._stream, iterables)) diff --git a/tests/test_stream.py b/tests/test_stream.py index b67b6a0..d23ae7d 100644 --- a/tests/test_stream.py +++ b/tests/test_stream.py @@ -10,9 +10,9 @@ def test_map(self): b = Stream(a).map(lambda x: x * 2).to_list() assert b == [2, 4, 6] - def test_starmap(self): + def test_star_map(self): a = [(2,5), (3,2), (10,3)] - b = Stream(a).starmap(pow).to_list() + b = Stream(a).star_map(pow).to_list() assert b == [32, 9, 1000] def test_flat_map(self): @@ -82,6 +82,16 @@ def test_skip(self): b = Stream(a).skip(1).to_list() assert b == [2, 3] + def test_take_while(self): + a = [1, 4, 6, 4, 1] + b = Stream(a).take_while(lambda x: x < 5).to_list() + assert b == [1, 4] + + def test_drop_while(self): + a = [1, 4, 6, 4, 1] + b = Stream(a).drop_while(lambda x: x < 5).to_list() + assert b == [6, 4, 1] + def test_min(self): a = [{'a': 1}, {'a': 2}] b = Stream(a).min(key=lambda x: x['a'])