Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions src/Streamly/Internal/Data/Stream/IsStream/Top.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ module Streamly.Internal.Data.Stream.IsStream.Top
-- | These are not exactly set operations because streams are not
-- necessarily sets, they may have duplicated elements.
, intersectBy
, mergeIntersectBy
, intersectBySorted
, differenceBy
, mergeDifferenceBy
, differenceBySorted
, unionBy
, mergeUnionBy
, unionBySorted

-- ** Join operations
, crossJoin
, innerJoin
, mergeInnerJoin
, joinInnerMerge
, hashInnerJoin
, leftJoin
, mergeLeftJoin
Expand Down Expand Up @@ -65,6 +65,7 @@ import Streamly.Internal.Data.Stream.IsStream.Common (concatM)
import Streamly.Internal.Data.Stream.IsStream.Type
(IsStream(..), adapt, foldl', fromList)
import Streamly.Internal.Data.Stream.Serial (SerialT)
--import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)

import qualified Data.List as List
Expand All @@ -78,6 +79,7 @@ import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as StreamD

import Prelude hiding (filter, zipWith, concatMap, concat)

Expand Down Expand Up @@ -306,10 +308,14 @@ hashInnerJoin = undefined
--
-- Time: O(m + n)
--
-- /Unimplemented/
{-# INLINE mergeInnerJoin #-}
mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
mergeInnerJoin = undefined
-- /Pre-release/
{-# INLINE joinInnerMerge #-}
joinInnerMerge :: (IsStream t, MonadIO m, Eq a, Eq b) =>
(a -> b -> Ordering) -> t m a -> t m b -> t m (a, b)
joinInnerMerge eq s1 =
IsStream.fromStreamD
. StreamD.joinInnerMerge eq (IsStream.toStreamD s1)
. IsStream.toStreamD

-- XXX We can do this concurrently.
-- XXX If the second stream is sorted and passed as an Array or a seek capable
Expand Down Expand Up @@ -514,11 +520,14 @@ intersectBy eq s1 s2 =
--
-- Time: O(m+n)
--
-- /Unimplemented/
{-# INLINE mergeIntersectBy #-}
mergeIntersectBy :: -- (IsStream t, Monad m) =>
-- /Pre-release/
{-# INLINE intersectBySorted #-}
intersectBySorted :: (IsStream t, MonadIO m, Eq a) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeIntersectBy _eq _s1 _s2 = undefined
intersectBySorted eq s1 =
IsStream.fromStreamD
. StreamD.intersectBySorted eq (IsStream.toStreamD s1)
. IsStream.toStreamD

-- Roughly leftJoin s1 s2 = s1 `difference` s2 + s1 `intersection` s2

Expand Down Expand Up @@ -562,11 +571,14 @@ differenceBy eq s1 s2 =
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeDifferenceBy #-}
mergeDifferenceBy :: -- (IsStream t, Monad m) =>
-- /Pre-release/
{-# INLINE differenceBySorted #-}
differenceBySorted :: (IsStream t, MonadIO m) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeDifferenceBy _eq _s1 _s2 = undefined
differenceBySorted eq s1 =
IsStream.fromStreamD
. StreamD.differenceBySorted eq (IsStream.toStreamD s1)
. IsStream.toStreamD

-- | This is essentially an append operation that appends all the extra
-- occurrences of elements from the second stream that are not already present
Expand Down Expand Up @@ -610,8 +622,11 @@ unionBy eq s1 s2 =
--
-- Space: O(1)
--
-- /Unimplemented/
{-# INLINE mergeUnionBy #-}
mergeUnionBy :: -- (IsStream t, Monad m) =>
-- /Pre-release/
{-# INLINE unionBySorted #-}
unionBySorted :: (IsStream t, MonadAsync m, Ord a) =>
(a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeUnionBy _eq _s1 _s2 = undefined
unionBySorted cmp s1 =
IsStream.fromStreamD
. StreamD.unionBySorted cmp (IsStream.toStreamD s1)
. IsStream.toStreamD
Loading