Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
### 4.10.1

* Added `AsyncSeq.sortByAsync` β€” sorts an async sequence by an asynchronous key-generating function, computing each key exactly once and returning a sorted array. Mirrors the `*Async` pattern of `minByAsync`, `maxByAsync`, and `countByAsync`.
* Added `AsyncSeq.sortByDescendingAsync` β€” same as `sortByAsync` but orders descending.

### 4.10.0

* Added `AsyncSeq.withCancellation` β€” returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).
Expand Down
14 changes: 14 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2084,6 +2084,20 @@
let sortWith (comparer:'T -> 'T -> int) (source:AsyncSeq<'T>) : array<'T> =
toSortedSeq (Array.sortWith comparer) source

let sortByAsync (projection:'T -> Async<'Key>) (source:AsyncSeq<'T>) : Async<array<'T>> when 'Key : comparison = async {
let! pairs =
source
|> mapAsync (fun x -> async { let! k = projection x in return (k, x) })
|> toArrayAsync
return pairs |> Array.sortBy fst |> Array.map snd }

let sortByDescendingAsync (projection:'T -> Async<'Key>) (source:AsyncSeq<'T>) : Async<array<'T>> when 'Key : comparison = async {
let! pairs =
source
|> mapAsync (fun x -> async { let! k = projection x in return (k, x) })
|> toArrayAsync
return pairs |> Array.sortByDescending fst |> Array.map snd }

let rev (source: AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
let! arr = toArrayAsync source
for i in arr.Length - 1 .. -1 .. 0 do
Expand Down Expand Up @@ -2450,7 +2464,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2467 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2467 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
12 changes: 12 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,18 @@ module AsyncSeq =
/// large or infinite sequences.
val sortByDescending : projection:('T -> 'Key) -> source:AsyncSeq<'T> -> array<'T> when 'Key : comparison

/// Applies an asynchronous key-generating function to each element of an AsyncSeq and returns
/// an array ordered by the computed keys. Each key is computed exactly once per element.
/// This function digests the whole initial sequence before returning. As a result this
/// function should not be used with large or infinite sequences.
val sortByAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> Async<array<'T>> when 'Key : comparison

/// Applies an asynchronous key-generating function to each element of an AsyncSeq and returns
/// an array ordered descending by the computed keys. Each key is computed exactly once per element.
/// This function digests the whole initial sequence before returning. As a result this
/// function should not be used with large or infinite sequences.
val sortByDescendingAsync : projection:('T -> Async<'Key>) -> source:AsyncSeq<'T> -> Async<array<'T>> when 'Key : comparison

/// Sorts the given async sequence using the given comparison function and returns an array.
/// This function returns an array that digests the whole initial sequence as soon as
/// that sequence is iterated. As a result this function should not be used with
Expand Down
48 changes: 48 additions & 0 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2000,7 +2000,7 @@
let actual =
ls
|> AsyncSeq.ofSeq
|> AsyncSeq.groupBy p

Check warning on line 2003 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand All @@ -2009,7 +2009,7 @@
let expected = asyncSeq { raise (exn("test")) }
let actual =
asyncSeq { raise (exn("test")) }
|> AsyncSeq.groupBy (fun i -> i % 3)

Check warning on line 2012 in tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
|> AsyncSeq.mapAsyncParallel (snd >> AsyncSeq.toListAsync)
Assert.AreEqual(expected, actual)

Expand Down Expand Up @@ -3409,8 +3409,56 @@
let result = AsyncSeq.sortWith compare AsyncSeq.empty<int>
Assert.AreEqual([||], result)

// ── AsyncSeq.sortByAsync / sortByDescendingAsync ──────────────────────────────

[<Test>]
let ``AsyncSeq.sortByAsync sorts ascending by async projection`` () =
let source = asyncSeq { yield "banana"; yield "apple"; yield "cherry" }
let result =
source
|> AsyncSeq.sortByAsync (fun s -> async { return s.Length })
|> Async.RunSynchronously
Assert.AreEqual([| "apple"; "banana"; "cherry" |], result)

[<Test>]
let ``AsyncSeq.sortByAsync returns empty array for empty sequence`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.sortByAsync (fun x -> async { return x })
|> Async.RunSynchronously
Assert.AreEqual([||], result)

[<Test>]
let ``AsyncSeq.sortByAsync computes projection exactly once per element`` () =
let callCount = ref 0
let source = asyncSeq { yield 3; yield 1; yield 2 }
let result =
source
|> AsyncSeq.sortByAsync (fun x -> async { incr callCount; return x })
|> Async.RunSynchronously
Assert.AreEqual([| 1; 2; 3 |], result)
Assert.AreEqual(3, !callCount)

[<Test>]
let ``AsyncSeq.sortByDescendingAsync sorts descending by async projection`` () =
let source = asyncSeq { yield "hi"; yield "apple"; yield "cherry" }
let result =
source
|> AsyncSeq.sortByDescendingAsync (fun s -> async { return s.Length })
|> Async.RunSynchronously
Assert.AreEqual([| "cherry"; "apple"; "hi" |], result)

[<Test>]
let ``AsyncSeq.sortByDescendingAsync returns empty array for empty sequence`` () =
let result =
AsyncSeq.empty<int>
|> AsyncSeq.sortByDescendingAsync (fun x -> async { return x })
|> Async.RunSynchronously
Assert.AreEqual([||], result)

// ── AsyncSeq.mapFold ──────────────────────────────────────────────────────────


[<Test>]
let ``AsyncSeq.mapFold maps elements and accumulates state`` () =
let source = asyncSeq { yield 1; yield 2; yield 3 }
Expand Down
Loading