Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ open System.Threading.Tasks
open System.Runtime.ExceptionServices
#if !FABLE_COMPILER
open System.Linq
open System.Threading.Channels
#endif

#nowarn "40" "3218"
Expand Down Expand Up @@ -374,6 +375,17 @@ module AsyncSeq =
member x.MoveNext() = async { return None }
member x.Dispose() = () } }

let emptyAsync<'T> (action : Async<unit>) : AsyncSeq<'T> =
{ new IAsyncEnumerable<'T> with
member x.GetEnumerator() =
{ new IAsyncEnumerator<'T> with
member x.MoveNext() =
async {
do! action
return None
}
member x.Dispose() = () } }

let singleton (v:'T) : AsyncSeq<'T> =
{ new IAsyncEnumerable<'T> with
member x.GetEnumerator() =
Expand Down Expand Up @@ -1946,6 +1958,75 @@ module AsyncSeq =
#endif


#if !FABLE_COMPILER
open System.Threading.Channels

let toChannel (writer : ChannelWriter<'a>) (xs : AsyncSeq<'a>) : Async<unit> =
async {
try
do!
xs
|> iterAsync
(fun x ->
async {
if not (writer.TryWrite(x)) then
let! ct = Async.CancellationToken

do!
writer.WriteAsync(x, ct).AsTask()
|> Async.AwaitTask
})

writer.Complete()
with exn ->
writer.Complete(error = exn)
}

let fromChannel (reader : ChannelReader<'a>) : AsyncSeq<'a> =
asyncSeq {
let mutable keepGoing = true

while keepGoing do
let mutable item = Unchecked.defaultof<'a>

if reader.TryRead(&item) then
yield item
else
let! ct = Async.CancellationToken

let! hasMoreData =
reader.WaitToReadAsync(ct).AsTask()
|> Async.AwaitTask

if not hasMoreData then
keepGoing <- false
}

let prefetch (numberToPrefetch : int) (xs : AsyncSeq<'a>) : AsyncSeq<'a> =
if numberToPrefetch = 0 then
xs
else
if numberToPrefetch < 1 then
invalidArg (nameof numberToPrefetch) "must be at least zero"
asyncSeq {
let opts = BoundedChannelOptions(numberToPrefetch)
opts.SingleWriter <- true
opts.SingleReader <- true

let channel = Channel.CreateBounded(opts)

let! fillChannelTask =
toChannel channel.Writer xs
|> Async.StartChild

yield!
append
(fromChannel channel.Reader)
(emptyAsync fillChannelTask)
}

#endif


[<AutoOpen>]
module AsyncSeqExtensions =
Expand Down
19 changes: 19 additions & 0 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,25 @@ module AsyncSeq =
#endif
#endif

#if !FABLE_COMPILER

open System.Threading.Channels

/// Fills a channel writer with the values from an async seq.
/// The writer will be closed when the async seq completes or raises an error.
val toChannel<'T> : writer: ChannelWriter<'T> -> source: AsyncSeq<'T> -> Async<unit>

/// Creates an async seq from a channel reader.
/// The async seq will read values from the channel reader until it is closed.
/// If the reader raises an error than the sequence will raise it.
val fromChannel<'T> : reader: ChannelReader<'T> -> AsyncSeq<'T>

/// Transforms an async seq to a new one that fetches values ahead of time to improve throughput.
val prefetch<'T> : numberToPrefetch: int -> source: AsyncSeq<'T> -> AsyncSeq<'T>

#endif


/// An automatically-opened module that contains the `asyncSeq` builder and an extension method
[<AutoOpen>]
module AsyncSeqExtensions =
Expand Down
1 change: 1 addition & 0 deletions src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<ItemGroup>
<PackageReference Update="FSharp.Core" Version="4.7.2" />
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="5.0" />
<PackageReference Include="System.Threading.Channels" Version="*" />
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
</ItemGroup>
</Project>
Loading