Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 51 additions & 10 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -734,23 +734,64 @@ module AsyncSeq =
dispose e
| _ -> () } }

// Optimized iterAsync implementation to reduce allocations
type internal OptimizedIterAsyncEnumerator<'T>(enumerator: IAsyncEnumerator<'T>, f: 'T -> Async<unit>) =
let mutable disposed = false

member _.IterateAsync() =
let rec loop() = async {
let! next = enumerator.MoveNext()
match next with
| Some value ->
do! f value
return! loop()
| None -> return ()
}
loop()

interface IDisposable with
member _.Dispose() =
if not disposed then
disposed <- true
enumerator.Dispose()

// Optimized iteriAsync implementation with direct tail recursion
type internal OptimizedIteriAsyncEnumerator<'T>(enumerator: IAsyncEnumerator<'T>, f: int -> 'T -> Async<unit>) =
let mutable disposed = false

member _.IterateAsync() =
let rec loop count = async {
let! next = enumerator.MoveNext()
match next with
| Some value ->
do! f count value
return! loop (count + 1)
| None -> return ()
}
loop 0

interface IDisposable with
member _.Dispose() =
if not disposed then
disposed <- true
enumerator.Dispose()

let iteriAsync f (source : AsyncSeq<_>) =
async {
use ie = source.GetEnumerator()
let count = ref 0
let! move = ie.MoveNext()
let b = ref move
while b.Value.IsSome do
do! f !count b.Value.Value
let! moven = ie.MoveNext()
do incr count
b := moven
let enum = source.GetEnumerator()
use optimizer = new OptimizedIteriAsyncEnumerator<_>(enum, f)
return! optimizer.IterateAsync()
}

let iterAsync (f: 'T -> Async<unit>) (source: AsyncSeq<'T>) =
match source with
| :? AsyncSeqOp<'T> as source -> source.IterAsync f
| _ -> iteriAsync (fun i x -> f x) source
| _ ->
async {
let enum = source.GetEnumerator()
use optimizer = new OptimizedIterAsyncEnumerator<_>(enum, f)
return! optimizer.IterateAsync()
}

let iteri (f: int -> 'T -> unit) (inp: AsyncSeq<'T>) = iteriAsync (fun i x -> async.Return (f i x)) inp

Expand Down