@@ -77,6 +77,8 @@ class ShareImpl {
7777 #cancelled = false ;
7878 #pulling = false ;
7979 #pullWaiters = [ ] ;
80+ #cachedMinCursor = 0 ;
81+ #cachedMinCursorConsumers = 0 ;
8082
8183 constructor ( source , options ) {
8284 this . #source = source ;
@@ -114,6 +116,14 @@ class ShareImpl {
114116 } ;
115117
116118 this . #consumers. add ( state ) ;
119+ if ( this . #consumers. size === 1 ) {
120+ this . #cachedMinCursor = state . cursor ;
121+ this . #cachedMinCursorConsumers = 1 ;
122+ } else if ( state . cursor === this . #cachedMinCursor) {
123+ this . #cachedMinCursorConsumers++ ;
124+ } else {
125+ this . #recomputeMinCursor( ) ;
126+ }
117127 const self = this ;
118128
119129 return {
@@ -139,22 +149,26 @@ class ShareImpl {
139149
140150 if ( self . #cancelled) {
141151 state . detached = true ;
142- self . #consumers . delete ( state ) ;
152+ self . #deleteConsumer ( state ) ;
143153 return { __proto__ : null , done : true , value : undefined } ;
144154 }
145155
146156 // Check if data is available in buffer
147157 const bufferIndex = state . cursor - self . #bufferStart;
148158 if ( bufferIndex < self . #buffer. length ) {
149159 const chunk = self . #buffer. get ( bufferIndex ) ;
160+ const cursor = state . cursor ;
150161 state . cursor ++ ;
151- self . #tryTrimBuffer( ) ;
162+ if ( cursor === self . #cachedMinCursor &&
163+ -- self . #cachedMinCursorConsumers === 0 ) {
164+ self . #tryTrimBuffer( ) ;
165+ }
152166 return { __proto__ : null , done : false , value : chunk } ;
153167 }
154168
155169 if ( self . #sourceExhausted) {
156170 state . detached = true ;
157- self . #consumers . delete ( state ) ;
171+ self . #deleteConsumer ( state ) ;
158172 if ( self . #sourceError) throw self . #sourceError;
159173 return { __proto__ : null , done : true , value : undefined } ;
160174 }
@@ -163,7 +177,7 @@ class ShareImpl {
163177 const canPull = await self . #waitForBufferSpace( ) ;
164178 if ( ! canPull ) {
165179 state . detached = true ;
166- self . #consumers . delete ( state ) ;
180+ self . #deleteConsumer ( state ) ;
167181 if ( self . #sourceError) throw self . #sourceError;
168182 return { __proto__ : null , done : true , value : undefined } ;
169183 }
@@ -176,17 +190,19 @@ class ShareImpl {
176190 state . detached = true ;
177191 state . resolve = null ;
178192 state . reject = null ;
179- self . #consumers. delete ( state ) ;
180- self . #tryTrimBuffer( ) ;
193+ if ( self . #deleteConsumer( state ) ) {
194+ self . #tryTrimBuffer( ) ;
195+ }
181196 return { __proto__ : null , done : true , value : undefined } ;
182197 } ,
183198
184199 async throw ( ) {
185200 state . detached = true ;
186201 state . resolve = null ;
187202 state . reject = null ;
188- self . #consumers. delete ( state ) ;
189- self . #tryTrimBuffer( ) ;
203+ if ( self . #deleteConsumer( state ) ) {
204+ self . #tryTrimBuffer( ) ;
205+ }
190206 return { __proto__ : null , done : true , value : undefined } ;
191207 } ,
192208 } ;
@@ -254,9 +270,11 @@ class ShareImpl {
254270 this . #bufferStart++ ;
255271 for ( const consumer of this . #consumers) {
256272 if ( consumer . cursor < this . #bufferStart) {
273+ this . #deleteConsumerFromMin( consumer ) ;
257274 consumer . cursor = this . #bufferStart;
258275 }
259276 }
277+ this . #recomputeMinCursor( ) ;
260278 return true ;
261279 case 'drop-newest' :
262280 return true ;
@@ -324,18 +342,41 @@ class ShareImpl {
324342 }
325343
326344 #tryTrimBuffer( ) {
327- const minCursor = getMinCursor (
328- this . #consumers, this . #bufferStart + this . #buffer. length ) ;
329- const trimCount = minCursor - this . #bufferStart;
345+ if ( this . #cachedMinCursorConsumers === 0 ) {
346+ this . #recomputeMinCursor( ) ;
347+ }
348+ const trimCount = this . #cachedMinCursor - this . #bufferStart;
330349 if ( trimCount > 0 ) {
331350 this . #buffer. trimFront ( trimCount ) ;
332- this . #bufferStart = minCursor ;
351+ this . #bufferStart = this . #cachedMinCursor ;
333352 for ( let i = 0 ; i < this . #pullWaiters. length ; i ++ ) {
334353 this . #pullWaiters[ i ] ( ) ;
335354 }
336355 this . #pullWaiters = [ ] ;
337356 }
338357 }
358+
359+ #recomputeMinCursor( ) {
360+ const [ minCursor , minCursorConsumers ] = getMinCursor (
361+ this . #consumers, this . #bufferStart + this . #buffer. length ) ;
362+ this . #cachedMinCursor = minCursor ;
363+ this . #cachedMinCursorConsumers = minCursorConsumers ;
364+ }
365+
366+ #deleteConsumerFromMin( consumer ) {
367+ if ( consumer . cursor === this . #cachedMinCursor) {
368+ this . #cachedMinCursorConsumers-- ;
369+ }
370+ }
371+
372+ #deleteConsumer( consumer ) {
373+ if ( this . #consumers. delete ( consumer ) ) {
374+ const wasAtMin = consumer . cursor === this . #cachedMinCursor;
375+ this . #deleteConsumerFromMin( consumer ) ;
376+ return wasAtMin && this . #cachedMinCursorConsumers === 0 ;
377+ }
378+ return false ;
379+ }
339380}
340381
341382// =============================================================================
@@ -352,6 +393,8 @@ class SyncShareImpl {
352393 #sourceExhausted = false ;
353394 #sourceError = null ;
354395 #cancelled = false ;
396+ #cachedMinCursor = 0 ;
397+ #cachedMinCursorConsumers = 0 ;
355398
356399 constructor ( source , options ) {
357400 this . #source = source ;
@@ -383,6 +426,14 @@ class SyncShareImpl {
383426 } ;
384427
385428 this . #consumers. add ( state ) ;
429+ if ( this . #consumers. size === 1 ) {
430+ this . #cachedMinCursor = state . cursor ;
431+ this . #cachedMinCursorConsumers = 1 ;
432+ } else if ( state . cursor === this . #cachedMinCursor) {
433+ this . #cachedMinCursorConsumers++ ;
434+ } else {
435+ this . #recomputeMinCursor( ) ;
436+ }
386437 const self = this ;
387438
388439 return {
@@ -396,26 +447,30 @@ class SyncShareImpl {
396447 }
397448 if ( self . #sourceError) {
398449 state . detached = true ;
399- self . #consumers . delete ( state ) ;
450+ self . #deleteConsumer ( state ) ;
400451 throw self . #sourceError;
401452 }
402453 if ( self . #cancelled) {
403454 state . detached = true ;
404- self . #consumers . delete ( state ) ;
455+ self . #deleteConsumer ( state ) ;
405456 return { __proto__ : null , done : true , value : undefined } ;
406457 }
407458
408459 const bufferIndex = state . cursor - self . #bufferStart;
409460 if ( bufferIndex < self . #buffer. length ) {
410461 const chunk = self . #buffer. get ( bufferIndex ) ;
462+ const cursor = state . cursor ;
411463 state . cursor ++ ;
412- self . #tryTrimBuffer( ) ;
464+ if ( cursor === self . #cachedMinCursor &&
465+ -- self . #cachedMinCursorConsumers === 0 ) {
466+ self . #tryTrimBuffer( ) ;
467+ }
413468 return { __proto__ : null , done : false , value : chunk } ;
414469 }
415470
416471 if ( self . #sourceExhausted) {
417472 state . detached = true ;
418- self . #consumers . delete ( state ) ;
473+ self . #deleteConsumer ( state ) ;
419474 return { __proto__ : null , done : true , value : undefined } ;
420475 }
421476
@@ -436,13 +491,15 @@ class SyncShareImpl {
436491 self . #bufferStart++ ;
437492 for ( const consumer of self . #consumers) {
438493 if ( consumer . cursor < self . #bufferStart) {
494+ self . #deleteConsumerFromMin( consumer ) ;
439495 consumer . cursor = self . #bufferStart;
440496 }
441497 }
498+ self . #recomputeMinCursor( ) ;
442499 break ;
443500 case 'drop-newest' :
444501 state . detached = true ;
445- self . #consumers . delete ( state ) ;
502+ self . #deleteConsumer ( state ) ;
446503 return { __proto__ : null , done : true , value : undefined } ;
447504 }
448505 }
@@ -451,21 +508,25 @@ class SyncShareImpl {
451508
452509 if ( self . #sourceError) {
453510 state . detached = true ;
454- self . #consumers . delete ( state ) ;
511+ self . #deleteConsumer ( state ) ;
455512 throw self . #sourceError;
456513 }
457514
458515 const newBufferIndex = state . cursor - self . #bufferStart;
459516 if ( newBufferIndex < self . #buffer. length ) {
460517 const chunk = self . #buffer. get ( newBufferIndex ) ;
518+ const cursor = state . cursor ;
461519 state . cursor ++ ;
462- self . #tryTrimBuffer( ) ;
520+ if ( cursor === self . #cachedMinCursor &&
521+ -- self . #cachedMinCursorConsumers === 0 ) {
522+ self . #tryTrimBuffer( ) ;
523+ }
463524 return { __proto__ : null , done : false , value : chunk } ;
464525 }
465526
466527 if ( self . #sourceExhausted) {
467528 state . detached = true ;
468- self . #consumers . delete ( state ) ;
529+ self . #deleteConsumer ( state ) ;
469530 return { __proto__ : null , done : true , value : undefined } ;
470531 }
471532
@@ -474,15 +535,17 @@ class SyncShareImpl {
474535
475536 return ( ) {
476537 state . detached = true ;
477- self . #consumers. delete ( state ) ;
478- self . #tryTrimBuffer( ) ;
538+ if ( self . #deleteConsumer( state ) ) {
539+ self . #tryTrimBuffer( ) ;
540+ }
479541 return { __proto__ : null , done : true , value : undefined } ;
480542 } ,
481543
482544 throw ( ) {
483545 state . detached = true ;
484- self . #consumers. delete ( state ) ;
485- self . #tryTrimBuffer( ) ;
546+ if ( self . #deleteConsumer( state ) ) {
547+ self . #tryTrimBuffer( ) ;
548+ }
486549 return { __proto__ : null , done : true , value : undefined } ;
487550 } ,
488551 } ;
@@ -532,13 +595,36 @@ class SyncShareImpl {
532595 }
533596
534597 #tryTrimBuffer( ) {
535- const minCursor = getMinCursor (
536- this . #consumers, this . #bufferStart + this . #buffer. length ) ;
537- const trimCount = minCursor - this . #bufferStart;
598+ if ( this . #cachedMinCursorConsumers === 0 ) {
599+ this . #recomputeMinCursor( ) ;
600+ }
601+ const trimCount = this . #cachedMinCursor - this . #bufferStart;
538602 if ( trimCount > 0 ) {
539603 this . #buffer. trimFront ( trimCount ) ;
540- this . #bufferStart = minCursor ;
604+ this . #bufferStart = this . #cachedMinCursor;
605+ }
606+ }
607+
608+ #recomputeMinCursor( ) {
609+ const [ minCursor , minCursorConsumers ] = getMinCursor (
610+ this . #consumers, this . #bufferStart + this . #buffer. length ) ;
611+ this . #cachedMinCursor = minCursor ;
612+ this . #cachedMinCursorConsumers = minCursorConsumers ;
613+ }
614+
615+ #deleteConsumerFromMin( consumer ) {
616+ if ( consumer . cursor === this . #cachedMinCursor) {
617+ this . #cachedMinCursorConsumers-- ;
618+ }
619+ }
620+
621+ #deleteConsumer( consumer ) {
622+ if ( this . #consumers. delete ( consumer ) ) {
623+ const wasAtMin = consumer . cursor === this . #cachedMinCursor;
624+ this . #deleteConsumerFromMin( consumer ) ;
625+ return wasAtMin && this . #cachedMinCursorConsumers === 0 ;
541626 }
627+ return false ;
542628 }
543629}
544630
0 commit comments