Skip to content

Commit 60098c8

Browse files
authored
Merge pull request #50 from PickwickSoft/bugfix/fix-base-stream-knows-subclasses
Add stream converter
2 parents 3c43321 + 7c6b180 commit 60098c8

File tree

9 files changed

+120
-21
lines changed

9 files changed

+120
-21
lines changed

poetry.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pylintrc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,8 @@
33
# Minimum lines number of a similarity.
44
min-similarity-lines=-1
55

6+
[tests/*.py]
7+
disable=missing-function-docstring,no-member,missing-class-docstring,too-few-public-methods,too-many-public-methods,cyclic-import,import-error
8+
69
[MESSAGES CONTROL]
7-
disable=missing-function-docstring,missing-class-docstring,missing-module-docstring,import-error,too-few-public-methods,invalid-name,no-member,too-many-public-methods
10+
disable=invalid-name,missing-module-docstring

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "streams.py"
3-
version = "0.3.1"
3+
version = "0.3.2"
44
authors = ["Stefan Garlonta <stefan@pickwicksoft.org>"]
55
description = "A stream library for Python inspired by Java Stream API"
66
keywords = ["streams", "parallel", "data"]

pystreamapi/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from pystreamapi.__stream import Stream
22
from pystreamapi._streams.error.__levels import ErrorLevel
33

4-
__version__ = "0.3.1"
4+
__version__ = "0.3.2"
55
__all__ = ["Stream", "ErrorLevel"]

pystreamapi/__stream_converter.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from pystreamapi._streams.__base_stream import BaseStream
2+
from pystreamapi._streams.__parallel_stream import ParallelStream
3+
from pystreamapi._streams.__sequential_stream import SequentialStream
4+
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
5+
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
6+
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream
7+
8+
9+
class StreamConverter:
10+
"""Class for converting streams to other types of streams."""
11+
12+
@staticmethod
13+
def to_numeric_stream(stream: BaseStream) -> NumericBaseStream:
14+
"""Converts a stream to a numeric stream."""
15+
if isinstance(stream, SequentialStream):
16+
stream.__class__ = SequentialNumericStream
17+
if isinstance(stream, ParallelStream):
18+
stream.__class__ = ParallelNumericStream
19+
return stream
20+
21+
@staticmethod
22+
def to_parallel_stream(stream: BaseStream) -> ParallelStream:
23+
"""Converts a stream to a parallel stream."""
24+
if isinstance(stream, SequentialNumericStream):
25+
stream.__class__ = ParallelNumericStream
26+
elif isinstance(stream, SequentialStream):
27+
stream.__class__ = ParallelStream
28+
return stream
29+
30+
@staticmethod
31+
def to_sequential_stream(stream: BaseStream) -> SequentialStream:
32+
"""Converts a stream to a sequential stream."""
33+
if isinstance(stream, ParallelNumericStream):
34+
stream.__class__ = SequentialNumericStream
35+
elif isinstance(stream, ParallelStream):
36+
stream.__class__ = SequentialStream
37+
return stream

pystreamapi/_streams/__base_stream.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
if TYPE_CHECKING:
1818
from pystreamapi._streams.numeric.__numeric_base_stream import NumericBaseStream
19+
from pystreamapi._streams.__parallel_stream import ParallelStream
20+
from pystreamapi._streams.__sequential_stream import SequentialStream
1921

2022
K = TypeVar('K')
2123
_V = TypeVar('_V')
@@ -237,6 +239,13 @@ def __map_to_str(self):
237239
"""Converts the stream to strings."""
238240
self._map(str)
239241

242+
@_operation
243+
def parallel(self) -> 'ParallelStream[K]':
244+
"""Returns a parallel stream. If the stream is already parallel, it is returned."""
245+
# pylint: disable=import-outside-toplevel
246+
from pystreamapi.__stream_converter import StreamConverter
247+
return StreamConverter.to_parallel_stream(self)
248+
240249
@_operation
241250
def peek(self, action: Callable) -> 'BaseStream[K]':
242251
"""
@@ -269,6 +278,13 @@ def __reversed(self):
269278
except TypeError:
270279
self._source = reversed(list(self._source))
271280

281+
@_operation
282+
def sequential(self) -> SequentialStream[K]:
283+
"""Returns a sequential stream. If the stream is already sequential, it is returned."""
284+
# pylint: disable=import-outside-toplevel
285+
from pystreamapi.__stream_converter import StreamConverter
286+
return StreamConverter.to_sequential_stream(self)
287+
272288
@_operation
273289
def skip(self, n: int) -> 'BaseStream[K]':
274290
"""
@@ -430,6 +446,8 @@ def to_dict(self, key_mapper: Callable[[K], Any]) -> dict:
430446
:param key_mapper:
431447
"""
432448

433-
@abstractmethod
434449
def _to_numeric_stream(self) -> NumericBaseStream:
435-
"""Converts a stream to a numeric stream. To be implemented by subclasses."""
450+
"""Converts a stream to a numeric stream using the stream converter"""
451+
# pylint: disable=import-outside-toplevel
452+
from pystreamapi.__stream_converter import StreamConverter
453+
return StreamConverter.to_numeric_stream(self)

pystreamapi/_streams/__parallel_stream.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,3 @@ def _set_parallelizer_src(self):
9191

9292
def __mapper(self, mapper):
9393
return lambda x: self._one(mapper=mapper, item=x)
94-
95-
def _to_numeric_stream(self):
96-
# pylint: disable=import-outside-toplevel
97-
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
98-
self.__class__ = ParallelNumericStream
99-
return self

pystreamapi/_streams/__sequential_stream.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,3 @@ def reduce(self, predicate: Callable, identity=_identity_missing, depends_on_sta
6262
@stream.terminal
6363
def to_dict(self, key_mapper: Callable[[Any], Any]) -> dict:
6464
return self._group_to_dict(key_mapper)
65-
66-
def _to_numeric_stream(self):
67-
# pylint: disable=import-outside-toplevel
68-
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream
69-
self.__class__ = SequentialNumericStream
70-
return self

tests/test_stream_converter.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from unittest import TestCase
2+
3+
from pystreamapi._streams.__parallel_stream import ParallelStream
4+
from pystreamapi._streams.__sequential_stream import SequentialStream
5+
from pystreamapi._streams.numeric.__parallel_numeric_stream import ParallelNumericStream
6+
from pystreamapi._streams.numeric.__sequential_numeric_stream import SequentialNumericStream
7+
8+
9+
class TestStreamConverter(TestCase):
10+
11+
def test_convert_to_numeric_stream_sequential(self):
12+
stream = SequentialStream(["1", "2", "3"]).map_to_int()
13+
self.assertIsInstance(stream, SequentialNumericStream)
14+
15+
def test_convert_to_numeric_stream_parallel(self):
16+
stream = ParallelStream(["1", "2", "3"]).map_to_int()
17+
self.assertIsInstance(stream, ParallelNumericStream)
18+
19+
def test_convert_to_numeric_stream_numeric_parallel(self):
20+
stream = ParallelNumericStream(["1", "2", "3"]).map_to_int()
21+
self.assertIsInstance(stream, ParallelNumericStream)
22+
23+
def test_convert_to_parallel_stream_sequential(self):
24+
stream = SequentialStream(["1", "2", "3"]).parallel()
25+
self.assertIsInstance(stream, ParallelStream)
26+
27+
def test_convert_to_parallel_stream_sequential_numeric(self):
28+
stream = SequentialNumericStream(["1", "2", "3"]).parallel()
29+
self.assertIsInstance(stream, ParallelNumericStream)
30+
31+
def test_convert_to_parallel_stream_parallel(self):
32+
stream = ParallelStream(["1", "2", "3"]).parallel()
33+
self.assertIsInstance(stream, ParallelStream)
34+
35+
def test_convert_to_parallel_stream_parallel_numeric(self):
36+
stream = ParallelNumericStream(["1", "2", "3"]).parallel()
37+
self.assertIsInstance(stream, ParallelNumericStream)
38+
39+
def test_convert_to_sequential_stream_sequential(self):
40+
stream = SequentialStream(["1", "2", "3"]).sequential()
41+
self.assertIsInstance(stream, SequentialStream)
42+
43+
def test_convert_to_sequential_stream_sequential_numeric(self):
44+
stream = SequentialNumericStream(["1", "2", "3"]).sequential()
45+
self.assertIsInstance(stream, SequentialNumericStream)
46+
47+
def test_convert_to_sequential_stream_parallel(self):
48+
stream = ParallelStream(["1", "2", "3"]).sequential()
49+
self.assertIsInstance(stream, SequentialStream)
50+
51+
def test_convert_to_sequential_stream_parallel_numeric(self):
52+
stream = ParallelNumericStream(["1", "2", "3"]).sequential()
53+
self.assertIsInstance(stream, SequentialNumericStream)

0 commit comments

Comments
 (0)