Skip to content

Commit 3a9ab7f

Browse files
author
ladeak
committed
- Segments cache
- Initial tests for Http3DeframingPipeReader
1 parent 3e066d8 commit 3a9ab7f

5 files changed

Lines changed: 186 additions & 80 deletions

File tree

src/CHttpServer/CHttpServer/Http3/Http3DeframingPipeReader.cs

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
using System.IO.Pipelines;
33
using CHttpServer.Http3;
44

5-
// TODO buffer segments
6-
75
internal sealed class Http3DeframingPipeReader : PipeReader
86
{
97
private enum StreamReadingStatus
@@ -15,17 +13,17 @@ private enum StreamReadingStatus
1513

1614
private class Segment : ReadOnlySequenceSegment<byte>
1715
{
18-
public Segment(ReadOnlyMemory<byte> data, object? source, int sourceOffset, long framePayloadRemaining)
16+
public void Initialize(ReadOnlyMemory<byte> data, object? source, int sourceOffset, long framePayloadRemaining)
1917
{
2018
base.Memory = data;
2119
Source = source;
2220
SourceOffset = sourceOffset;
2321
FramePayloadRemaining = framePayloadRemaining;
2422
}
2523

26-
public object? Source { get; }
27-
public int SourceOffset { get; }
28-
public long FramePayloadRemaining { get; }
24+
public object? Source { get; private set; }
25+
public int SourceOffset { get; private set; }
26+
public long FramePayloadRemaining { get; private set; }
2927

3028
public Segment SetNext(Segment s)
3129
{
@@ -54,6 +52,15 @@ public Segment Slice(int start)
5452
throw new InvalidOperationException("Not a Segment");
5553
}
5654

55+
public void Reset()
56+
{
57+
Memory = ReadOnlyMemory<byte>.Empty;
58+
Source = null;
59+
SourceOffset = 0; FramePayloadRemaining = 0;
60+
RunningIndex = 0;
61+
Next = null;
62+
}
63+
5764
public SequencePosition End => new SequencePosition(this, Memory.Length);
5865
}
5966

@@ -62,10 +69,12 @@ public Segment Slice(int start)
6269
private long _payloadRemainingLength = 0;
6370
private Segment? _head;
6471
private Segment? _tail;
72+
private Stack<Segment> _segmentsBuffer;
6573

6674
public Http3DeframingPipeReader(PipeReader pipeReader)
6775
{
6876
_pipeReader = pipeReader;
77+
_segmentsBuffer = new Stack<Segment>(capacity: 10);
6978
}
7079

7180
public void Reset(PipeReader pipeReader)
@@ -77,7 +86,7 @@ public void Reset(PipeReader pipeReader)
7786
_streamReadingState = StreamReadingStatus.ReadingFrameHeader;
7887
}
7988

80-
private void AddSegment(Segment s)
89+
private void AppendDataSegment(Segment s)
8190
{
8291
if (_tail == null)
8392
_head = _tail = s;
@@ -110,6 +119,9 @@ public override void AdvanceTo(SequencePosition consumed, SequencePosition exami
110119
sourceObject = segmentExamined.Source;
111120
examined = new SequencePosition(sourceObject, sourceOffset);
112121

122+
while (_head != segmentConsumed && _head != null)
123+
_head = ReturnSegment(_head);
124+
113125
_payloadRemainingLength = segmentConsumed.FramePayloadRemaining - segmentOffset;
114126
if (_payloadRemainingLength == 0)
115127
_streamReadingState = StreamReadingStatus.ReadingFrameHeader;
@@ -155,6 +167,8 @@ public override async ValueTask<ReadResult> ReadAsync(CancellationToken token =
155167
// at the beginning of the segment.
156168
private bool ProcessReadResult(ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> sequence)
157169
{
170+
while (_head != null)
171+
_head = ReturnSegment(_head);
158172
_head = _tail = null;
159173
sequence = ReadOnlySequence<byte>.Empty;
160174
long bufferConsumed = 0;
@@ -173,9 +187,10 @@ private bool ProcessReadResult(ReadOnlySequence<byte> buffer, out ReadOnlySequen
173187
foreach (var s in dataPayload)
174188
{
175189
var position = buffer.GetPosition(currentPosition);
176-
var segment = new Segment(s, position.GetObject(), position.GetInteger(), currentPayloadRemainingLength);
190+
var segment = RentSegment();
191+
segment.Initialize(s, position.GetObject(), position.GetInteger(), currentPayloadRemainingLength);
177192
currentPayloadRemainingLength -= s.Length;
178-
AddSegment(segment);
193+
AppendDataSegment(segment);
179194
currentPosition += s.Length;
180195
}
181196
}
@@ -259,6 +274,21 @@ private static StreamReadingStatus NextRequestReadingState(ulong frameType)
259274
return streamReadingState;
260275
}
261276

277+
private Segment RentSegment()
278+
{
279+
if (!_segmentsBuffer.TryPop(out var segment))
280+
segment = new Segment();
281+
return segment;
282+
}
283+
284+
private Segment? ReturnSegment(Segment segment)
285+
{
286+
var next = segment.Next;
287+
segment.Reset();
288+
_segmentsBuffer.Push(segment);
289+
return next as Segment;
290+
}
291+
262292
public override async Task CopyToAsync(PipeWriter destination, CancellationToken cancellationToken = default)
263293
{
264294
StreamReadingStatus streamState = StreamReadingStatus.ReadingFrameHeader;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.IO.Pipelines;
2+
using CHttpServer.Http3;
3+
4+
namespace CHttpServer.Tests.Http3;
5+
6+
public class Http3DeframingPipeReaderTests
7+
{
8+
[Fact]
9+
public async Task HeadersFrameThrows()
10+
{
11+
var pipe = new Pipe();
12+
var sut = new Http3DeframingPipeReader(pipe.Reader);
13+
await pipe.Writer.WriteAsync(Http3FrameFixture.GetHeadersFrame(), TestContext.Current.CancellationToken);
14+
await Assert.ThrowsAsync<Http3ConnectionException>(async () => await sut.ReadAsync(TestContext.Current.CancellationToken));
15+
}
16+
17+
[Fact]
18+
public async Task ReservedFrameIgnored()
19+
{
20+
var pipe = new Pipe();
21+
var sut = new Http3DeframingPipeReader(pipe.Reader);
22+
await pipe.Writer.WriteAsync(Http3FrameFixture.GetReservedFrame(100), TestContext.Current.CancellationToken);
23+
pipe.Writer.Complete();
24+
var result = await sut.ReadAsync(TestContext.Current.CancellationToken);
25+
Assert.True(result.IsCompleted);
26+
Assert.True(result.Buffer.IsEmpty);
27+
}
28+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
using System.IO.Pipelines;
2+
using CHttpServer.Http3;
3+
4+
namespace CHttpServer.Tests.Http3;
5+
6+
internal class TestPipeWriter : PipeWriter
7+
{
8+
private byte[] _buffer = new byte[4096];
9+
private int _consumedLength = 0;
10+
11+
public ReadOnlySpan<byte> WrittenData => _buffer.AsSpan(0, _consumedLength);
12+
13+
public override void Advance(int bytes)
14+
{
15+
_consumedLength += bytes;
16+
}
17+
18+
public override void CancelPendingFlush()
19+
{
20+
}
21+
22+
public override void Complete(Exception? exception = null)
23+
{
24+
}
25+
26+
public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
27+
{
28+
return ValueTask.FromResult(new FlushResult());
29+
}
30+
31+
public override Memory<byte> GetMemory(int sizeHint = 0)
32+
{
33+
if (_buffer == null || sizeHint < _buffer.Length - _consumedLength)
34+
Grow(sizeHint);
35+
return _buffer.AsMemory(_consumedLength);
36+
}
37+
38+
public override Span<byte> GetSpan(int sizeHint = 0)
39+
{
40+
if (_buffer == null || sizeHint < _buffer.Length - _consumedLength)
41+
Grow(sizeHint);
42+
return _buffer.AsSpan(_consumedLength);
43+
}
44+
45+
private void Grow(int sizeHint)
46+
{
47+
var newSize = Math.Max(sizeHint + _consumedLength, _buffer.Length * 2);
48+
Array.Resize(ref _buffer, newSize);
49+
}
50+
}
51+
52+
internal class Http3FrameFixture
53+
{
54+
public static byte[] GetHeadersFrame()
55+
{
56+
var encoder = new QPackDecoder();
57+
var headers = new Http3ResponseHeaderCollection
58+
{
59+
{ ":path", "/" },
60+
{ ":authority", "localhost" },
61+
{ ":method", "GET" },
62+
{ ":scheme", "https" }
63+
};
64+
var writer = new TestPipeWriter();
65+
encoder.Encode(headers, writer);
66+
var payloadLength = VariableLenghtIntegerDecoder.Write(writer.WrittenData.Length);
67+
return [1, .. payloadLength.Span, .. writer.WrittenData];
68+
}
69+
70+
public static byte[] GetLargeHeadersFrame()
71+
{
72+
var encoder = new QPackDecoder();
73+
var headers = new Http3ResponseHeaderCollection
74+
{
75+
{ ":path", "/" },
76+
{ ":authority", "localhost" },
77+
{ ":method", "GET" },
78+
{ ":scheme", "https" },
79+
{ "x-custom-header", new string('a', 4096*2) }
80+
};
81+
var writer = new TestPipeWriter();
82+
encoder.Encode(headers, writer);
83+
var payloadLength = VariableLenghtIntegerDecoder.Write(writer.WrittenData.Length);
84+
return [1, .. payloadLength.Span, .. writer.WrittenData];
85+
}
86+
87+
public static byte[] GetData(int length)
88+
{
89+
var payloadLength = VariableLenghtIntegerDecoder.Write(length);
90+
var payload = Enumerable.Sequence(0, length - 1, 1).Select(x => (byte)x);
91+
return [0, .. payloadLength.Span, .. payload];
92+
}
93+
94+
public static byte[] GetReservedFrame(int length, int seed = 2)
95+
{
96+
var frameType = VariableLenghtIntegerDecoder.Write(seed * 0x1f + 0x21);
97+
var payloadLength = VariableLenghtIntegerDecoder.Write(length);
98+
var payload = Enumerable.Sequence(0, length - 1, 1).Select(x => (byte)x);
99+
return [.. frameType.Span, .. payloadLength.Span, .. payload];
100+
}
101+
}

0 commit comments

Comments
 (0)