Skip to content
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pip install superstream
| ------------------- | ------------- | ---------------------------------------------------------------------------------------- |
| filter | filter | |
| map | map | |
| - | starmap | map 的 \* 参数解包版本 |
| - | star_map | map 的 \* 参数解包版本 |
| flatMap | flat_map | |
| collect | collect | |
| - | collects | collect 对应的中间操作,collects(collector) 相当于 Java 中的 collect(collector).stream() |
Expand All @@ -64,9 +64,11 @@ pip install superstream
| collect(toMap) | to_map | to_dict 的别名,保持与 Java 的一致 |
| distinct | distinct | |
| sorted | sorted | |
| peek | - | peek 在 java stream 多为调试功能, python stream 将不会实现,可用 map 并返回元素本身代替 |
| peek | peek | |
| limit | limit | |
| skip | skip | |
| takeWhile | take_while | |
| dropWhile | drop_while | |
| forEach | for_each | |
| reduce | reduce | |
| count | count | |
Expand Down
60 changes: 54 additions & 6 deletions src/superstream/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from functools import reduce
from typing import TypeVar, Callable, List, Set, Generic, Dict, Iterable, Optional, Any
from itertools import islice, chain, count, starmap
from itertools import islice, chain, count, starmap, takewhile, dropwhile
from collections import deque

T = TypeVar('T')
Expand All @@ -10,6 +10,23 @@

_initial_missing = object()

def unique_everseen(iterable, key=None):
seenset = set()
seenset_add = seenset.add
seenlist = []
seenlist_add = seenlist.append
use_key = key is not None
for element in iterable:
k = key(element) if use_key else element
try:
if k not in seenset:
seenset_add(k)
yield element
except TypeError:
if k not in seenlist:
seenlist_add(k)
yield element

class Stream(Generic[T]):
def __init__(self, stream: Iterable[T]):
self._stream = iter(stream)
Expand All @@ -21,10 +38,19 @@ def __iter__(self):
def of(*args: T) -> 'Stream[T]':
return Stream(args)

def map(self, func: Callable[[T], R]) -> 'Stream[R]':
return Stream(map(func, self._stream))
def map(self, func: Callable[[T], R], pool_map=None) -> 'Stream[R]':
"""
:param pool_map: a map method of a multiple processing pool
for example:
multiprocessing.Pool.map
multiprocessing.dummy.Pool.imap
multiprocess.Pool.map_async
pathos.multiprocessing.ProcessPool.uimap
"""
map_method = pool_map or map
return Stream(map_method(func, self._stream))

def starmap(self, func: Callable[..., R]) -> 'Stream[R]':
def star_map(self, func: Callable[..., R]) -> 'Stream[R]':
return Stream(starmap(func, self._stream))

def flat_map(self, func: Callable[[T], 'Stream[R]']) -> 'Stream[R]':
Expand All @@ -34,11 +60,15 @@ def filter(self, func: Callable[[T], bool]) -> 'Stream[T]':
return Stream(filter(func, self._stream))

def for_each(self, func: Callable[[T], None]) -> None:
deque(map(func, self._stream), maxlen=0)

def peek(self, func: Callable[[T], None]) -> 'Stream[T]':
for i in self._stream:
func(i)
yield i

def distinct(self):
return Stream(list(dict.fromkeys(self._stream)))
def distinct(self, key: Callable[[T], K] = None) -> 'Stream[T]':
return Stream(unique_everseen(self._stream, key))

def sorted(self, key=None, reverse=False) -> 'Stream[T]':
return Stream(sorted(self._stream, key=key, reverse=reverse))
Expand Down Expand Up @@ -75,6 +105,12 @@ def limit(self, max_size: int) -> 'Stream[T]':
def skip(self, n: int) -> 'Stream[T]':
return Stream(islice(self._stream, n, None))

def take_while(self, pred: Callable[[T], bool]) -> 'Stream[T]':
return Stream(takewhile(pred, self._stream))

def drop_while(self, pred: Callable[[T], bool]) -> 'Stream[T]':
return Stream(dropwhile(pred, self._stream))

def min(self, key: Callable[[T], Any] = None, default: T = None) -> Optional[T]:
"""
:param default: use default value when stream is empty
Expand Down Expand Up @@ -131,3 +167,15 @@ def collect(self, func: Callable[[Iterable[T]], R]) -> R:

def collects(self, func: Callable[[Iterable[T]], Iterable[R]]) -> 'Stream[R]':
return Stream(func(self._stream))

def zip_by(self, *iterables: Iterable[Any]) -> 'Stream[(T, *Any)]':
return Stream(zip(*iterables, self._stream))

def zip_with(self, *iterables: Iterable[Any]) -> 'Stream[(T, *Any)]':
return Stream(zip(self._stream, *iterables))

def concat_by(self, iterables: Iterable[T]) -> 'Stream[T]':
return Stream(chain(iterables, self._stream))

def concat_with(self, iterables: Iterable[T]) -> 'Stream[T]':
return Stream(chain(self._stream, iterables))
14 changes: 12 additions & 2 deletions tests/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ def test_map(self):
b = Stream(a).map(lambda x: x * 2).to_list()
assert b == [2, 4, 6]

def test_starmap(self):
def test_star_map(self):
a = [(2,5), (3,2), (10,3)]
b = Stream(a).starmap(pow).to_list()
b = Stream(a).star_map(pow).to_list()
assert b == [32, 9, 1000]

def test_flat_map(self):
Expand Down Expand Up @@ -82,6 +82,16 @@ def test_skip(self):
b = Stream(a).skip(1).to_list()
assert b == [2, 3]

def test_take_while(self):
a = [1, 4, 6, 4, 1]
b = Stream(a).take_while(lambda x: x < 5).to_list()
assert b == [1, 4]

def test_drop_while(self):
a = [1, 4, 6, 4, 1]
b = Stream(a).drop_while(lambda x: x < 5).to_list()
assert b == [6, 4, 1]

def test_min(self):
a = [{'a': 1}, {'a': 2}]
b = Stream(a).min(key=lambda x: x['a'])
Expand Down