Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ module AsyncSeq =
yield invalidOp "" }


let DEFAULT_TIMEOUT_MS = 2000
let DEFAULT_TIMEOUT_MS = 500

let randomDelayMs (minMs:int) (maxMs:int) (s:AsyncSeq<'a>) =
let rand = new Random(int DateTime.Now.Ticks)
let randSleep = async { do! Async.Sleep(rand.Next(minMs, maxMs)) }
AsyncSeq.zipWith (fun _ a -> a) (AsyncSeq.replicateInfiniteAsync randSleep) s

let randomDelayDefault (s:AsyncSeq<'a>) =
randomDelayMs 0 50 s
randomDelayMs 0 10 s

let randomDelayMax m (s:AsyncSeq<'a>) =
randomDelayMs 0 m s
Expand Down Expand Up @@ -66,7 +66,7 @@ let EQ (a:AsyncSeq<'a>) (b:AsyncSeq<'a>) =
let runTimeout (timeoutMs:int) (a:Async<'a>) : 'a =
Async.RunSynchronously (a, timeoutMs)

let runTest a = runTimeout 5000 a
let runTest a = runTimeout 1000 a

type Assert with

Expand Down Expand Up @@ -264,7 +264,7 @@ let ``AsyncSeq.cache should work``() =
[<Test>]
let ``AsyncSeq.cache does not slow down late consumers``() =
let src =
AsyncSeq.initInfiniteAsync (fun _ -> Async.Sleep 1000)
AsyncSeq.initInfiniteAsync (fun _ -> Async.Sleep 10)
|> AsyncSeq.cache
let consume initialDelay amount =
async {
Expand All @@ -281,11 +281,11 @@ let ``AsyncSeq.cache does not slow down late consumers``() =
// The first to start will take 10s to consume 10 items
consume 0 10
// The second should take no time to consume 5 items, starting 5s later, as the first five items have already been cached.
consume 5000 5
consume 50 5
]
|> Async.RunSynchronously
Assert.LessOrEqual(abs(times.[0] - 10.0), 2.0f, "Sanity check: lead consumer should take 10s")
Assert.LessOrEqual(times.[1], 2.0, "Test purpose: follower should only read cached items")
Assert.LessOrEqual(abs(times.[0] - 0.1), 0.1f, "Sanity check: lead consumer should take ~100ms")
Assert.LessOrEqual(times.[1], 0.1, "Test purpose: follower should only read cached items")

[<Test>]
let ``AsyncSeq.unfoldAsync``() =
Expand Down Expand Up @@ -577,10 +577,10 @@ let ``AsyncSeq.bufferByCountAndTime should not block`` () =
let op =
asyncSeq {
while true do
do! Async.Sleep 1000
do! Async.Sleep 10
yield 0
}
|> AsyncSeq.bufferByCountAndTime 10 1000
|> AsyncSeq.bufferByCountAndTime 10 100
|> AsyncSeq.take 3
|> AsyncSeq.iter (ignore)

Expand All @@ -591,17 +591,17 @@ let ``AsyncSeq.bufferByCountAndTime should not block`` () =
Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token)
watch.Stop()
cts.Cancel(false)
Assert.Less (watch.ElapsedMilliseconds, 1000L)
Assert.Less (watch.ElapsedMilliseconds, 100L)

[<Test>]
let ``AsyncSeq.bufferByTime should not block`` () =
let op =
asyncSeq {
while true do
do! Async.Sleep 1000
do! Async.Sleep 10
yield 0
}
|> AsyncSeq.bufferByTime 1000
|> AsyncSeq.bufferByTime 100
|> AsyncSeq.take 3
|> AsyncSeq.iter (ignore)

Expand All @@ -612,15 +612,15 @@ let ``AsyncSeq.bufferByTime should not block`` () =
Async.StartWithContinuations(op, ignore, ignore, ignore, cts.Token)
watch.Stop()
cts.Cancel(false)
Assert.Less (watch.ElapsedMilliseconds, 1000L)
Assert.Less (watch.ElapsedMilliseconds, 100L)

// let s = asyncSeq {
// yield 1
// yield 2
// do! Async.Sleep 100
// do! Async.Sleep 10
// yield 3
// yield 4
// do! Async.Sleep 100
// do! Async.Sleep 10
// yield 5
// yield 6
// }
Expand Down Expand Up @@ -900,7 +900,7 @@ let ``AsyncSeq.mergeChoice``() =
[<Test>]
let ``AsyncSeq.merge should be fair``() =
let s1 = asyncSeq {
do! Async.Sleep 10
do! Async.Sleep 1
yield 1
}
let s2 = asyncSeq {
Expand All @@ -916,7 +916,7 @@ let ``AsyncSeq.merge should be fair 2``() =
yield 1
}
let s2 = asyncSeq {
do! Async.Sleep 10
do! Async.Sleep 1
yield 2
}
let actual = AsyncSeq.merge s1 s2
Expand Down Expand Up @@ -963,7 +963,7 @@ let ``AsyncSeq.collect works``() =

[<Test>]
let ``AsyncSeq.initInfinite scales``() =
AsyncSeq.initInfinite string |> AsyncSeq.take 1000 |> AsyncSeq.iter ignore |> Async.RunSynchronously
AsyncSeq.initInfinite string |> AsyncSeq.take 100 |> AsyncSeq.iter ignore |> Async.RunSynchronously

[<Test>]
let ``AsyncSeq.initAsync``() =
Expand Down Expand Up @@ -1037,7 +1037,7 @@ let ``AsyncSeq.distinctUntilChangedWithAsync``() =
[<Test>]
let ``AsyncSeq.takeUntil should complete immediately with completed signal``() =
let s = asyncSeq {
do! Async.Sleep 10
do! Async.Sleep 1
yield 1
yield 2
}
Expand All @@ -1063,7 +1063,7 @@ let ``AsyncSeq.skipUntil should not skip with completed signal``() =
let expected = [1;2;3;4] |> AsyncSeq.ofSeq
let actual =
asyncSeq {
do! Async.Sleep 100
do! Async.Sleep 10
yield! expected
}
|> AsyncSeq.skipUntilSignal AsyncOps.unit
Expand All @@ -1089,7 +1089,7 @@ let ``AsyncSeq.toBlockingSeq should work length 0``() =
[<Test>]
let ``AsyncSeq.toBlockingSeq should work length 2 with sleep``() =
let s = asyncSeq { yield 1
do! Async.Sleep 10
do! Async.Sleep 1
yield 2 } |> AsyncSeq.toBlockingSeq |> Seq.toList
Assert.True((s = [1; 2]))

Expand Down Expand Up @@ -1120,7 +1120,7 @@ let ``AsyncSeq.toBlockingSeq should be cancellable``() =
use! a = Async.OnCancel(fun x -> incr cancelCount)
while true do
yield 1
do! Async.Sleep 10
do! Async.Sleep 1
}

let asSeq = aseq |> AsyncSeq.toBlockingSeq
Expand All @@ -1130,15 +1130,15 @@ let ``AsyncSeq.toBlockingSeq should be cancellable``() =
Assert.AreEqual(canMoveNext, true)
Assert.AreEqual(cancelCount.Value, 0)
enum.Dispose()
System.Threading.Thread.Sleep(1000) // wait for task cancellation to be effective
System.Threading.Thread.Sleep(100) // wait for task cancellation to be effective
Assert.AreEqual(cancelCount.Value, 1)

[<Test>]
let ``AsyncSeq.while should allow do at end``() =
let s1 = asyncSeq {
while false do
yield 1
do! Async.Sleep 10
do! Async.Sleep 1
}
Assert.True(true)

Expand Down Expand Up @@ -1243,17 +1243,17 @@ let ``Async.mergeAll should work``() =
let ``Async.mergeAll should perform well``() =
let mergeTest n =
[ for i in 1 .. n ->
asyncSeq{ do! Async.Sleep 1000;
asyncSeq{ do! Async.Sleep 500;
yield i } ]
|> AsyncSeq.mergeAll
|> AsyncSeq.toListSynchronously

Assert.DoesNotThrow(fun _ -> mergeTest 1000 |> ignore)
Assert.DoesNotThrow(fun _ -> mergeTest 500 |> ignore)

[<Test>]
let ``Async.mergeAll should be fair``() =
let s1 = asyncSeq {
do! Async.Sleep 1000
do! Async.Sleep 300
yield 1
}
let s2 = asyncSeq {
Expand Down Expand Up @@ -1532,11 +1532,11 @@ let ``AsyncSeq.iterAsyncParallel should propagate exception`` () =
let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in parallel with another exception-throwing Async`` () =

let handle x = async {
do! Async.Sleep 50
do! Async.Sleep 5
}

let fakeAsync = async {
do! Async.Sleep 500
do! Async.Sleep 50
return "fakeAsync"
}

Expand All @@ -1546,7 +1546,7 @@ let ``AsyncSeq.iterAsyncParallel should cancel and not block forever when run in
match batch with
| Choice1Of2 batch ->
if (Seq.isEmpty batch) then
do! Async.Sleep 500
do! Async.Sleep 50
yield! loop()
else
yield batch
Expand Down Expand Up @@ -1612,7 +1612,7 @@ let ``AsyncSeq.iterAsyncParallelThrottled should throttle`` () =
let c = Interlocked.Increment count
if c > parallelism then
return failwith "oh no"
do! Async.Sleep 10
do! Async.Sleep 1
Interlocked.Decrement count |> ignore
return () })
|> Async.RunSynchronously
Expand Down Expand Up @@ -2141,12 +2141,12 @@ let ``AsyncSeq.fold with empty sequence should return seed``() =

[<Test>]
let ``AsyncSeq.ofSeq should work with large sequence``() =
let largeSeq = seq { 1 .. 1000 }
let largeSeq = seq { 1 .. 100 }
let asyncSeq = AsyncSeq.ofSeq largeSeq
let result = asyncSeq |> AsyncSeq.toListAsync |> Async.RunSynchronously
Assert.AreEqual(1000, result.Length)
Assert.AreEqual(100, result.Length)
Assert.AreEqual(1, result.[0])
Assert.AreEqual(1000, result.[999])
Assert.AreEqual(100, result.[99])

[<Test>]
let ``AsyncSeq.mapAsync should preserve order with async transformations``() =
Expand Down