Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ This is what has been implemented so far, is planned or skipped:
| ❓ | `sortByAscending` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") |
| ❓ | `sortByDescending` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") |
| ❓ | `sortWith` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") |
| | `splitInto` | `splitInto` | | |
| ❓ | `splitInto` | `splitInto` | | |
| ✅ [#304][] | `sum` | `sum` | | |
| ✅ [#304][] | `sumBy` | `sumBy` | `sumByAsync` | |
| ✅ [#76][] | `tail` | `tail` | | |
Expand Down
1 change: 1 addition & 0 deletions release-notes.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Release notes:

0.6.0
- adds TaskSeq.splitInto
- fixes: async { for item in taskSeq do ... } no longer wraps exceptions in AggregateException, #129
- adds TaskSeq.compareWith and TaskSeq.compareWithAsync
- adds TaskSeq.scan and TaskSeq.scanAsync, #289
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<Compile Include="TaskSeq.Zip.Tests.fs" />
<Compile Include="TaskSeq.CompareWith.Tests.fs" />
<Compile Include="TaskSeq.ChunkBySize.Tests.fs" />
<Compile Include="TaskSeq.SplitInto.Tests.fs" />
<Compile Include="TaskSeq.Windowed.Tests.fs" />
<Compile Include="TaskSeq.Tests.CE.fs" />
<Compile Include="TaskSeq.StateTransitionBug.Tests.CE.fs" />
Expand Down
143 changes: 143 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.SplitInto.Tests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
module TaskSeq.Tests.SplitInto

open Xunit
open FsUnit.Xunit

open FSharp.Control

//
// TaskSeq.splitInto
//

module EmptySeq =
[<Fact>]
let ``TaskSeq-splitInto with null source raises`` () = assertNullArg <| fun () -> TaskSeq.splitInto 1 null

[<Fact>]
let ``TaskSeq-splitInto with zero raises ArgumentException before awaiting`` () =
fun () -> TaskSeq.empty<int> |> TaskSeq.splitInto 0 |> ignore // throws eagerly, before enumeration
|> should throw typeof<System.ArgumentException>

[<Fact>]
let ``TaskSeq-splitInto with negative raises ArgumentException before awaiting`` () =
fun () -> TaskSeq.empty<int> |> TaskSeq.splitInto -1 |> ignore
|> should throw typeof<System.ArgumentException>

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-splitInto on empty sequence yields empty`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.splitInto 1
|> verifyEmpty

[<Theory; ClassData(typeof<TestEmptyVariants>)>]
let ``TaskSeq-splitInto(99) on empty sequence yields empty`` variant =
Gen.getEmptyVariant variant
|> TaskSeq.splitInto 99
|> verifyEmpty

module Immutable =
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto preserves all elements in order`` variant = task {
do!
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 3
|> TaskSeq.collect TaskSeq.ofArray
|> verify1To10
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto(2) splits 10-element sequence into 2 chunks of 5`` variant = task {
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 2
|> TaskSeq.toArrayAsync

chunks |> should equal [| [| 1..5 |]; [| 6..10 |] |]
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto(5) splits 10-element sequence into 5 chunks of 2`` variant = task {
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 5
|> TaskSeq.toArrayAsync

chunks
|> should equal [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |]
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto(10) splits 10-element sequence into 10 singleton chunks`` variant = task {
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 10
|> TaskSeq.toArrayAsync

chunks |> Array.length |> should equal 10

chunks
|> Array.iteri (fun i chunk -> chunk |> should equal [| i + 1 |])
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto(1) returns the whole sequence as one chunk`` variant = task {
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 1
|> TaskSeq.toArrayAsync

chunks |> should equal [| [| 1..10 |] |]
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto(3) distributes remainder across first chunks`` variant = task {
// 10 elements into 3 chunks: 10 / 3 = 3 remainder 1 β†’ [4; 3; 3]
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 3
|> TaskSeq.toArrayAsync

chunks |> Array.length |> should equal 3
chunks.[0] |> should equal [| 1; 2; 3; 4 |]
chunks.[1] |> should equal [| 5; 6; 7 |]
chunks.[2] |> should equal [| 8; 9; 10 |]
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto(4) distributes remainder across first chunks`` variant = task {
// 10 elements into 4 chunks: 10 / 4 = 2 remainder 2 β†’ [3; 3; 2; 2]
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 4
|> TaskSeq.toArrayAsync

chunks |> Array.length |> should equal 4
chunks.[0] |> should equal [| 1; 2; 3 |]
chunks.[1] |> should equal [| 4; 5; 6 |]
chunks.[2] |> should equal [| 7; 8 |]
chunks.[3] |> should equal [| 9; 10 |]
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-splitInto count greater than length returns one element per chunk`` variant = task {
// 10 elements into 20 chunks β†’ 10 singleton chunks
let! chunks =
Gen.getSeqImmutable variant
|> TaskSeq.splitInto 20
|> TaskSeq.toArrayAsync

chunks |> Array.length |> should equal 10

chunks
|> Array.iteri (fun i chunk -> chunk |> should equal [| i + 1 |])
}

module SideEffects =
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-splitInto preserves all side-effectful elements in order`` variant = task {
do!
Gen.getSeqWithSideEffect variant
|> TaskSeq.splitInto 3
|> TaskSeq.collect TaskSeq.ofArray
|> verify1To10
}
1 change: 1 addition & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ type TaskSeq private () =
static member distinctUntilChanged source = Internal.distinctUntilChanged source
static member pairwise source = Internal.pairwise source
static member chunkBySize chunkSize source = Internal.chunkBySize chunkSize source
static member splitInto count source = Internal.splitInto count source
static member windowed windowSize source = Internal.windowed windowSize source

static member forall predicate source = Internal.forall (Predicate predicate) source
Expand Down
21 changes: 21 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,27 @@ type TaskSeq =
/// <exception cref="T:System.ArgumentException">Thrown when <paramref name="chunkSize" /> is not positive.</exception>
static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>

/// <summary>
/// Splits the input task sequence into at most <paramref name="count" /> chunks of roughly equal size.
/// The last chunk may be smaller if the total number of elements does not divide evenly.
/// When the source has fewer elements than <paramref name="count" />, the number of chunks equals
/// the number of elements (each chunk has one element). Returns an empty task sequence when the
/// source is empty.
///
/// Unlike <see cref="TaskSeq.chunkBySize" />, which fixes the chunk size, this function fixes
/// the number of chunks. The whole source sequence must be evaluated before any chunk is yielded.
///
/// If <paramref name="count" /> is not positive, an <see cref="T:System.ArgumentException" /> is raised immediately
/// (before the sequence is evaluated).
/// </summary>
///
/// <param name="count">The maximum number of chunks. Must be positive.</param>
/// <param name="source">The input task sequence.</param>
/// <returns>A task sequence of non-overlapping array chunks.</returns>
/// <exception cref="T:System.ArgumentNullException">Thrown when the input task sequence is null.</exception>
/// <exception cref="T:System.ArgumentException">Thrown when <paramref name="count" /> is not positive.</exception>
static member splitInto: count: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>

/// <summary>
/// Returns a task sequence of sliding windows of a given size over the source sequence.
/// Each window is a fresh array of exactly <paramref name="windowSize" /> consecutive elements.
Expand Down
23 changes: 23 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,29 @@ module internal TaskSeqInternal =
yield buffer.[0 .. count - 1]
}

let splitInto count (source: TaskSeq<'T>) : TaskSeq<'T[]> =
if count < 1 then
invalidArg (nameof count) $"The value must be positive, but was %i{count}."

checkNonNull (nameof source) source

taskSeq {
let! ra = toResizeArrayAsync source
let arr = ra.ToArray()
let n = arr.Length

if n > 0 then
// Split into at most `count` chunks (fewer chunks if n < count).
let actual = min count n
let k = n / actual
let m = n % actual // first m chunks get one extra element

for i in 0 .. actual - 1 do
let start = i * k + min i m
let len = k + (if i < m then 1 else 0)
yield arr.[start .. start + len - 1]
}

let windowed windowSize (source: TaskSeq<_>) =
if windowSize <= 0 then
invalidArg (nameof windowSize) $"The value must be positive, but was %i{windowSize}."
Expand Down
Loading