@@ -13,17 +13,22 @@ private readonly struct Segment()
1313
1414 public byte [ ] Reference { get ; init ; } = [ ] ;
1515 public Memory < byte > Used { get ; init ; } = Memory < byte > . Empty ;
16+
17+ public bool IsEmpty => Used . IsEmpty ;
18+
19+ public bool IsAllocated => Reference . Length > 0 ;
1620 }
1721
1822 private readonly byte [ ] _buffer = new byte [ 9 ] ;
1923 private readonly Lock _lockObject = new ( ) ;
20- private readonly List < Segment > _segments = new List < Segment > ( 128 ) { new Segment ( ) } ;
24+ private readonly List < Segment > _segments = new List < Segment > ( 128 ) ;
2125 private readonly ArrayPool < byte > _memoryPool = memoryPool ?? ArrayPool < byte > . Shared ;
2226 private Stream _responseStream = responseStream ;
2327 private readonly byte _frameType = frameType ;
2428 private CancellationTokenSource ? _cts ;
2529 private bool _isCompleted = false ;
2630 private long _unflushedBytes ;
31+ private Segment _currentSegment = Segment . Empty ;
2732 private TaskCompletionSource _tcs = new ( TaskCreationOptions . RunContinuationsAsynchronously ) ;
2833 private Func < CancellationToken , Task > ? _onResponseStartingCallback = onResponseStartingCallback ;
2934
@@ -60,12 +65,11 @@ public void Reset(Stream responseStream, Func<CancellationToken, Task>? onRespon
6065 public override void Advance ( int bytes )
6166 {
6267 ThrowIfCompleted ( ) ;
63- var current = _segments [ ^ 1 ] ;
68+ var current = _currentSegment ;
6469 var usedLength = current . Used . Length ;
6570 var available = current . Reference . Length - usedLength ;
6671 ArgumentOutOfRangeException . ThrowIfLessThan ( available , bytes ) ;
67- current = current with { Used = current . Reference . AsMemory ( 0 , usedLength + bytes ) } ;
68- _segments [ ^ 1 ] = current ;
72+ _currentSegment = current with { Used = current . Reference . AsMemory ( 0 , usedLength + bytes ) } ;
6973 _unflushedBytes += bytes ;
7074 }
7175
@@ -111,23 +115,19 @@ public override Memory<byte> GetMemory(int sizeHint = 0)
111115 {
112116 ThrowIfCompleted ( ) ;
113117 ArgumentOutOfRangeException . ThrowIfLessThan ( sizeHint , 0 ) ;
114- var current = _segments [ ^ 1 ] ;
118+ var current = _currentSegment ;
115119 var available = current . Reference . Length - current . Used . Length ;
116120 if ( sizeHint <= available )
117121 return current . Reference . AsMemory ( current . Used . Length ) ;
118122
119- var segment = new Segment ( ) { Reference = _memoryPool . Rent ( sizeHint ) } ; // Minimum size left for ArrayPool.
123+ // Not enough memory left in the current segment.
124+ if ( ! current . IsEmpty )
125+ _segments . Add ( current ) ;
126+ else if ( current . IsAllocated )
127+ _memoryPool . Return ( current . Reference , true ) ;
120128
121- // If it is unused segment, but too small, return the currently rented array and replace the segment.
122- if ( current . Used . Length == 0 )
123- {
124- if ( current . Reference . Length != 0 )
125- _memoryPool . Return ( current . Reference , true ) ;
126- _segments [ ^ 1 ] = segment ;
127- }
128- else
129- _segments . Add ( segment ) ;
130- return segment . Reference . AsMemory ( ) ;
129+ _currentSegment = new Segment ( ) { Reference = _memoryPool . Rent ( sizeHint ) } ; // Minimum size left for ArrayPool.
130+ return _currentSegment . Reference . AsMemory ( ) ;
131131 }
132132
133133 public override Span < byte > GetSpan ( int sizeHint = 0 ) => GetMemory ( sizeHint ) . Span ;
@@ -189,21 +189,20 @@ public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancel
189189 var dataFrameHeaderLength = PrepareFrameHeader ( _unflushedBytes ) ;
190190 await _responseStream . WriteAsync ( _buffer . AsMemory ( 0 , dataFrameHeaderLength ) , localToken ) ;
191191
192- int i = 0 ;
193- var emptySegment = Segment . Empty ;
194- for ( ; i < _segments . Count ; i ++ )
192+ // Shortcut: if flushed after every write, no need to address segments
193+ if ( _segments . Count == 0 )
195194 {
196- var memory = _segments [ i ] ;
197- if ( memory . Reference . Length == 0 )
198- break ;
199- if ( memory . Used . Length > 0 )
200- await _responseStream . WriteAsync ( memory . Used , localToken ) ;
201- _memoryPool . Return ( memory . Reference , true ) ;
202- _segments [ i ] = emptySegment ;
195+ if ( ! _currentSegment . IsEmpty )
196+ await _responseStream . WriteAsync ( _currentSegment . Used , localToken ) ;
197+ if ( _currentSegment . IsAllocated )
198+ _memoryPool . Return ( _currentSegment . Reference , true ) ;
199+ _currentSegment = Segment . Empty ;
200+ _unflushedBytes = 0 ;
201+ await _responseStream . FlushAsync ( ) ;
202+ return new FlushResult ( false , false ) ;
203203 }
204- _unflushedBytes = 0 ;
205- await _responseStream . FlushAsync ( localToken ) ;
206- return new FlushResult ( isCanceled : false , isCompleted : false ) ;
204+
205+ return await FlushAllSegmentsAsync ( localToken ) ;
207206 }
208207 catch ( OperationCanceledException )
209208 {
@@ -233,6 +232,27 @@ public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancel
233232 }
234233 }
235234
235+ private async Task < FlushResult > FlushAllSegmentsAsync ( CancellationToken localToken )
236+ {
237+ if ( ! _currentSegment . IsEmpty )
238+ _segments . Add ( _currentSegment ) ;
239+ else if ( _currentSegment . IsAllocated )
240+ _memoryPool . Return ( _currentSegment . Reference , true ) ;
241+ _currentSegment = Segment . Empty ;
242+
243+ for ( int i = 0 ; i < _segments . Count ; i ++ )
244+ {
245+ var memory = _segments [ i ] ;
246+ if ( ! memory . IsEmpty )
247+ await _responseStream . WriteAsync ( memory . Used , localToken ) ;
248+ _memoryPool . Return ( memory . Reference , true ) ;
249+ }
250+ _segments . Clear ( ) ;
251+ _unflushedBytes = 0 ;
252+ await _responseStream . FlushAsync ( localToken ) ;
253+ return new FlushResult ( isCanceled : false , isCompleted : false ) ;
254+ }
255+
236256 private void Flush ( )
237257 {
238258 if ( _unflushedBytes == 0 )
@@ -245,19 +265,40 @@ private void Flush()
245265 }
246266 var dataFrameHeaderLength = PrepareFrameHeader ( _unflushedBytes ) ;
247267 _responseStream . Write ( _buffer . AsSpan ( 0 , dataFrameHeaderLength ) ) ;
268+
269+ // Shortcut: if flushed after every write, no need to address segments
270+ if ( _segments . Count == 0 )
271+ {
272+ if ( ! _currentSegment . IsEmpty )
273+ _responseStream . Write ( _currentSegment . Used . Span ) ;
274+ if ( _currentSegment . IsAllocated )
275+ _memoryPool . Return ( _currentSegment . Reference , true ) ;
276+ _currentSegment = Segment . Empty ;
277+ _unflushedBytes = 0 ;
278+ _responseStream . Flush ( ) ;
279+ return ;
280+ }
281+
282+ FlushAllSegments ( ) ;
283+ }
284+
285+ private void FlushAllSegments ( )
286+ {
287+ if ( ! _currentSegment . IsEmpty )
288+ _segments . Add ( _currentSegment ) ;
289+ else if ( _currentSegment . IsAllocated )
290+ _memoryPool . Return ( _currentSegment . Reference , true ) ;
291+ _currentSegment = Segment . Empty ;
292+
248293 var source = CollectionsMarshal . AsSpan ( _segments ) ;
249- int i = 0 ;
250- var emptySegment = Segment . Empty ;
251- for ( ; i < _segments . Count ; i ++ )
294+ for ( int i = 0 ; i < _segments . Count ; i ++ )
252295 {
253296 ref var memory = ref source [ i ] ;
254- if ( memory . Reference . Length == 0 )
255- break ;
256297 if ( memory . Used . Length > 0 )
257298 _responseStream . Write ( memory . Used . Span ) ;
258299 _memoryPool . Return ( memory . Reference , true ) ;
259- source [ i ] = emptySegment ;
260300 }
301+ _segments . Clear ( ) ;
261302 _unflushedBytes = 0 ;
262303 _responseStream . Flush ( ) ;
263304 }
@@ -290,7 +331,9 @@ private void ClearSegments(Span<Segment> source)
290331 }
291332 _unflushedBytes = 0 ;
292333 _segments . Clear ( ) ;
293- _segments . Add ( Segment . Empty ) ;
334+ if ( _currentSegment . IsAllocated )
335+ _memoryPool . Return ( _currentSegment . Reference ) ;
336+ _currentSegment = Segment . Empty ;
294337 }
295338
296339 private void ThrowIfCompleted ( )
0 commit comments