Skip to content

Commit d609fee

Browse files
author
ladeak
committed
Request pipe based data transfer to the application in H/3.
1 parent 5e63b09 commit d609fee

2 files changed

Lines changed: 71 additions & 44 deletions

File tree

src/CHttpServer/CHttpServer/Http3/Http3Stream.QPackHeaderHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ internal sealed partial class Http3Stream : IQPackHeaderHandler, IHttpRequestFea
1414

1515
private string _hostDecoded;
1616
private byte[] _hostEncoded;
17+
private Pipe _requestDataToAppPipe;
18+
1719

1820
public string Protocol { get => "HTTP/3"; set => throw new PlatformNotSupportedException(); }
1921
public string PathBase { get => string.Empty; set => throw new PlatformNotSupportedException(); }
@@ -27,7 +29,7 @@ internal sealed partial class Http3Stream : IQPackHeaderHandler, IHttpRequestFea
2729
IHeaderDictionary IHttpRequestFeature.Headers { get => _requestHeaders; set => throw new PlatformNotSupportedException(); }
2830
#pragma warning restore CS9266 // Property accessor should use 'field' because the other accessor is using it.
2931

30-
public PipeReader Reader => PipeReader.Create(Stream.Null);
32+
public PipeReader Reader => _requestDataToAppPipe.Reader;
3133

3234
private readonly Http3RequestHeaderCollection _requestHeaders;
3335

src/CHttpServer/CHttpServer/Http3/Http3Stream.cs

Lines changed: 68 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ public Http3Stream(FeatureCollection features)
3535
Id = 0;
3636
_quicStream = null;
3737
_dataReader = PipeReader.Create(new ReadOnlySequence<byte>());
38+
_requestDataToAppPipe = new(new PipeOptions(pauseWriterThreshold: 1024 * 1024));
39+
_requestDataToAppPipe.Writer.Complete();
40+
_requestDataToAppPipe.Reader.Complete();
3841
_responseDataWriter = new(Stream.Null, frameType: 0);
3942
_responseHeaderWriter = new(Stream.Null, frameType: 1);
4043
_qpackDecoder = new QPackDecoder();
@@ -75,13 +78,13 @@ public void Initialize(Http3Connection? connection, QuicStream quicStream)
7578
_connection = connection;
7679
_quicStream = quicStream;
7780
_dataReader = PipeReader.Create(quicStream);
81+
_requestDataToAppPipe.Reset();
7882
_responseDataWriter.Reset(quicStream, StartAsync);
7983
_responseHeaderWriter.Reset(quicStream);
8084
Scheme = string.Empty;
8185
Method = string.Empty;
8286
QueryString = string.Empty;
8387
_isPathSet = false; // The actual Path is not reset.
84-
_isHostSet = false; // The actual Host is not reset.
8588
_features.ResetCheckpoint();
8689
_requestHeaders.ResetHeaderCollection();
8790
_responseHeaders.ResetHeaderCollection();
@@ -115,7 +118,15 @@ public async Task ProcessStreamAsync<TContext>(IHttpApplication<TContext> applic
115118

116119
while (!buffer.IsEmpty)
117120
{
118-
long bufferConsumed = ProcessStreamAsync(application, ref payloadRemainingLength, ref applicationProcessing, ref streamReadingState, buffer, token);
121+
long bufferConsumed = 0;
122+
if (streamReadingState == StreamReadingStatus.ReadingFrameHeader)
123+
bufferConsumed = ReadFrameHeader(ref payloadRemainingLength, ref streamReadingState, applicationProcessing, buffer);
124+
else if (streamReadingState == StreamReadingStatus.ReadingPayloadData)
125+
(bufferConsumed, payloadRemainingLength, streamReadingState) = await ProcessDataFrameAsync(payloadRemainingLength, streamReadingState, buffer);
126+
else if (streamReadingState == StreamReadingStatus.ReadingPayloadHeader)
127+
bufferConsumed = ReadHeaderFrame(application, ref payloadRemainingLength, ref applicationProcessing, ref streamReadingState, buffer, token);
128+
else if (streamReadingState == StreamReadingStatus.ReadingPayloadReserved)
129+
bufferConsumed = ReadReservedFrame(ref payloadRemainingLength, ref streamReadingState, buffer);
119130

120131
// Could not further process. Break the inner loop to read more data
121132
if (bufferConsumed == 0)
@@ -148,56 +159,75 @@ public async Task ProcessStreamAsync<TContext>(IHttpApplication<TContext> applic
148159
}
149160
}
150161

151-
private long ProcessStreamAsync<TContext>(IHttpApplication<TContext> application,
162+
private long ReadFrameHeader(
152163
ref long payloadRemainingLength,
153-
ref Task? applicationProcessing,
154164
ref StreamReadingStatus streamReadingState,
155-
ReadOnlySequence<byte> buffer,
156-
CancellationToken token) where TContext : notnull
165+
Task? applicationProcessing,
166+
ReadOnlySequence<byte> buffer)
157167
{
158-
long bufferConsumed = 0;
168+
if (!VariableLenghtIntegerDecoder.TryRead(buffer.FirstSpan, out var frameType, out int bytesReadFrameType))
169+
return 0; // Not enough data to read payload length.
159170

160-
if (streamReadingState == StreamReadingStatus.ReadingFrameHeader)
161-
{
162-
// FrameType is a single byte.
163-
if (!VariableLenghtIntegerDecoder.TryRead(buffer.FirstSpan, out var frameType, out int bytesReadFrameType))
164-
{
165-
// Not enough data to read payload length.
166-
return 0;
167-
}
168-
buffer = buffer.Slice(bytesReadFrameType);
169-
if (!VariableLenghtIntegerDecoder.TryRead(buffer, out var payloadLength, out int bytesReadPayloadLength))
170-
{
171-
// Not enough data to read payload length.
172-
return 0;
173-
}
174-
payloadRemainingLength = checked((long)payloadLength);
175-
streamReadingState = NextRequestReadingState(applicationProcessing, frameType);
176-
buffer = buffer.Slice(bytesReadPayloadLength);
177-
bufferConsumed += bytesReadFrameType + bytesReadPayloadLength;
178-
return bufferConsumed;
179-
}
171+
buffer = buffer.Slice(bytesReadFrameType);
172+
if (!VariableLenghtIntegerDecoder.TryRead(buffer, out var payloadLength, out int bytesReadPayloadLength))
173+
return 0; // Not enough data to read payload length.
180174

175+
payloadRemainingLength = checked((long)payloadLength);
176+
streamReadingState = NextRequestReadingState(applicationProcessing, frameType);
177+
buffer = buffer.Slice(bytesReadPayloadLength);
178+
var bufferConsumed = bytesReadFrameType + bytesReadPayloadLength;
179+
return bufferConsumed;
180+
}
181+
182+
private long ReadReservedFrame(
183+
ref long payloadRemainingLength,
184+
ref StreamReadingStatus streamReadingState,
185+
ReadOnlySequence<byte> buffer)
186+
{
181187
if (payloadRemainingLength < buffer.Length)
182188
buffer = buffer.Slice(0, payloadRemainingLength);
189+
var bufferConsumed = buffer.Length; // Read the complete reserved frame.
190+
payloadRemainingLength -= bufferConsumed;
191+
if (payloadRemainingLength == 0)
192+
streamReadingState = StreamReadingStatus.ReadingFrameHeader;
193+
return bufferConsumed;
194+
}
183195

184-
if (streamReadingState == StreamReadingStatus.ReadingPayloadHeader)
196+
private long ReadHeaderFrame<TContext>(IHttpApplication<TContext> application,
197+
ref long payloadRemainingLength,
198+
ref Task? applicationProcessing,
199+
ref StreamReadingStatus streamReadingState,
200+
ReadOnlySequence<byte> buffer,
201+
CancellationToken token) where TContext : notnull
202+
{
203+
if (payloadRemainingLength < buffer.Length)
204+
buffer = buffer.Slice(0, payloadRemainingLength);
205+
var bufferConsumed = ProcessHeaderFrame(buffer);
206+
payloadRemainingLength -= bufferConsumed;
207+
if (payloadRemainingLength == 0)
185208
{
186-
bufferConsumed = ProcessHeaderFrame(buffer);
187-
if (payloadRemainingLength == bufferConsumed)
188-
applicationProcessing = Task.Run(() => StartApplicationProcessing(application, token), token);
209+
streamReadingState = StreamReadingStatus.ReadingFrameHeader;
210+
applicationProcessing = Task.Run(() => StartApplicationProcessing(application, token), token);
189211
}
212+
return bufferConsumed;
213+
}
190214

191-
if (streamReadingState == StreamReadingStatus.ReadingPayloadData)
192-
bufferConsumed = ProcessDataFrame(buffer);
215+
private async Task<(long Consumed, long PayloadRemaining, StreamReadingStatus Status)> ProcessDataFrameAsync(
216+
long payloadRemainingLength,
217+
StreamReadingStatus streamReadingState,
218+
ReadOnlySequence<byte> buffer)
219+
{
220+
if (payloadRemainingLength < buffer.Length)
221+
buffer = buffer.Slice(0, payloadRemainingLength);
193222

194-
if (streamReadingState == StreamReadingStatus.ReadingPayloadReserved)
195-
bufferConsumed = buffer.Length; // Read the complete reserved frame.
223+
foreach (var segment in buffer)
224+
await _requestDataToAppPipe.Writer.WriteAsync(segment);
225+
var bufferConsumed = buffer.Length;
196226

197227
payloadRemainingLength -= bufferConsumed;
198228
if (payloadRemainingLength == 0)
199229
streamReadingState = StreamReadingStatus.ReadingFrameHeader;
200-
return bufferConsumed;
230+
return (bufferConsumed, payloadRemainingLength, streamReadingState);
201231
}
202232

203233
private StreamReadingStatus NextRequestReadingState(Task? applicationProcessing, ulong frameType)
@@ -227,6 +257,8 @@ private StreamReadingStatus NextRequestReadingState(Task? applicationProcessing,
227257

228258
private async Task CloseStreamAsync()
229259
{
260+
await _requestDataToAppPipe.Writer.CompleteAsync();
261+
await _requestDataToAppPipe.Reader.CompleteAsync();
230262
await _dataReader.CompleteAsync();
231263
await _responseHeaderWriter.CompleteAsync();
232264
await _responseDataWriter.CompleteAsync();
@@ -235,13 +267,6 @@ private async Task CloseStreamAsync()
235267
_connection?.StreamClosed(this);
236268
}
237269

238-
private long ProcessDataFrame(ReadOnlySequence<byte> buffer)
239-
{
240-
// 'Copy' to Body
241-
return buffer.Length;
242-
243-
}
244-
245270
private long ProcessHeaderFrame(ReadOnlySequence<byte> buffer)
246271
{
247272
_qpackDecoder.DecodeHeader(buffer, this, out long consumed);

0 commit comments

Comments
 (0)