Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,16 @@ public override bool TryComplete(Exception? error)
ChannelUtilities.FailOperations(blockedReadersHead, ChannelUtilities.CreateInvalidCompletionException(error));
ChannelUtilities.SetOrFailOperations(waitingReadersHead, result: false, error: error);

// If we didn't complete above because IsEmpty was false, recheck. A concurrent TryRead
// may have dequeued the last item but not yet called CompleteIfDone, or its CompleteIfDone
// may have seen _doneWriting as null. Since no new items can be enqueued at this point,
// if the queue is now empty, we should complete. ChannelUtilities.Complete is idempotent
// and thread-safe.
if (!completeTask && parent._items.IsEmpty)
{
ChannelUtilities.Complete(parent._completion, error);
}

// Successfully transitioned to completed.
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<Compile Include="TestBase.cs" />
<Compile Include="UnboundedChannelTests.cs" />
<Compile Include="$(CommonTestPath)System\Diagnostics\DebuggerAttributes.cs" Link="Common\System\Diagnostics\DebuggerAttributes.cs" />
<Compile Include="$(CommonTestPath)System\Threading\Tasks\TaskTimeoutExtensions.cs" Link="Common\System\Threading\Tasks\TaskTimeoutExtensions.cs" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFrameworkIdentifier)' == '.NETCoreApp'">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,41 @@ public void AllowSynchronousContinuations_CompletionTask_ContinuationsInvokedAcc
((IAsyncResult)r).AsyncWaitHandle.WaitOne(); // avoid inlining the continuation
r.GetAwaiter().GetResult();
}

[OuterLoop]
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task Completion_CompletesAfterConcurrentTryReadAndTryComplete()
{
for (int iter = 0; iter < 100_000; iter++)
{
Channel<int> channel = CreateChannel();

channel.Writer.TryWrite(1);
channel.Writer.TryWrite(2);

channel.Reader.TryRead(out _);

using var barrier = new Barrier(2);
Task t1 = Task.Run(() =>
{
barrier.SignalAndWait();
channel.Reader.TryRead(out _);
});

Task t2 = Task.Run(() =>
{
barrier.SignalAndWait();
channel.Writer.TryComplete();
});

await Task.WhenAll(t1, t2);

channel.Reader.TryRead(out _);

Task completionTask = channel.Reader.Completion;
await completionTask.WaitAsync(TimeSpan.FromSeconds(10));
}
}
}

public abstract class SingleReaderUnboundedChannelTests : UnboundedChannelTests
Expand Down
Loading