1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15- use std:: collections:: VecDeque ;
15+ use std:: collections:: BinaryHeap ;
1616use std:: future:: Future ;
1717use std:: pin:: Pin ;
1818use std:: task:: { Context , Poll } ;
@@ -87,7 +87,7 @@ impl SearchPermitProvider {
8787 msg_sender : message_sender. downgrade ( ) ,
8888 num_warmup_slots_available : num_download_slots,
8989 total_memory_budget : memory_budget. as_u64 ( ) ,
90- permits_requests : VecDeque :: new ( ) ,
90+ permits_requests : BinaryHeap :: new ( ) ,
9191 total_memory_allocated : 0u64 ,
9292 #[ cfg( test) ]
9393 stopped : state_sender,
@@ -134,11 +134,72 @@ struct SearchPermitActor {
134134 /// When it happens, new permits will not be assigned until the memory is freed.
135135 total_memory_budget : u64 ,
136136 total_memory_allocated : u64 ,
137- permits_requests : VecDeque < ( oneshot :: Sender < SearchPermit > , u64 ) > ,
137+ permits_requests : BinaryHeap < LeafPermitRequest > ,
138138 #[ cfg( test) ]
139139 stopped : watch:: Sender < bool > ,
140140}
141141
142+ struct LeafPermitRequest {
143+ single_split_permit_requests : Vec < ( oneshot:: Sender < SearchPermit > , u64 ) > ,
144+ }
145+
146+ impl Ord for LeafPermitRequest {
147+ fn cmp ( & self , other : & Self ) -> std:: cmp:: Ordering {
148+ // we compare other with self and not the other way arround because we want a min-heap and
149+ // Rust's is a max-heap
150+ other
151+ . single_split_permit_requests
152+ . len ( )
153+ . cmp ( & self . single_split_permit_requests . len ( ) )
154+ }
155+ }
156+
157+ impl PartialOrd for LeafPermitRequest {
158+ fn partial_cmp ( & self , other : & Self ) -> Option < std:: cmp:: Ordering > {
159+ Some ( self . cmp ( other) )
160+ }
161+ }
162+
163+ impl PartialEq for LeafPermitRequest {
164+ fn eq ( & self , other : & Self ) -> bool {
165+ self . cmp ( other) . is_eq ( )
166+ }
167+ }
168+
169+ impl Eq for LeafPermitRequest { }
170+
171+ impl LeafPermitRequest {
172+ fn from_estimated_costs ( permit_sizes : Vec < u64 > ) -> ( Self , Vec < SearchPermitFuture > ) {
173+ let mut permits = Vec :: with_capacity ( permit_sizes. len ( ) ) ;
174+ let mut single_split_permit_requests = Vec :: with_capacity ( permit_sizes. len ( ) ) ;
175+ for permit_size in permit_sizes {
176+ let ( tx, rx) = oneshot:: channel ( ) ;
177+ single_split_permit_requests. push ( ( tx, permit_size) ) ;
178+ permits. push ( SearchPermitFuture ( rx) ) ;
179+ }
180+ // we do this so we can pop instead of using a vecdeque or continually call remove(0)
181+ single_split_permit_requests. reverse ( ) ;
182+ (
183+ LeafPermitRequest {
184+ single_split_permit_requests,
185+ } ,
186+ permits,
187+ )
188+ }
189+
190+ fn pop_if_smaller_than (
191+ & mut self ,
192+ max_size : u64 ,
193+ ) -> Option < ( oneshot:: Sender < SearchPermit > , u64 ) > {
194+ self . single_split_permit_requests
195+ . pop_if ( |( _, permit_size) | * permit_size <= max_size)
196+ }
197+
198+ fn is_empty ( & self ) -> bool {
199+ self . single_split_permit_requests . is_empty ( )
200+ }
201+ }
202+
142203impl SearchPermitActor {
143204 async fn run ( mut self ) {
144205 // Stops when the last clone of SearchPermitProvider is dropped.
@@ -155,12 +216,9 @@ impl SearchPermitActor {
155216 permit_sizes,
156217 permit_sender,
157218 } => {
158- let mut permits = Vec :: with_capacity ( permit_sizes. len ( ) ) ;
159- for permit_size in permit_sizes {
160- let ( tx, rx) = oneshot:: channel ( ) ;
161- self . permits_requests . push_back ( ( tx, permit_size) ) ;
162- permits. push ( SearchPermitFuture ( rx) ) ;
163- }
219+ let ( leaf_permit_request, permits) =
220+ LeafPermitRequest :: from_estimated_costs ( permit_sizes) ;
221+ self . permits_requests . push ( leaf_permit_request) ;
164222 self . assign_available_permits ( ) ;
165223 // The receiver could be dropped in the (unlikely) situation
166224 // where the future requesting these permits is cancelled before
@@ -196,13 +254,23 @@ impl SearchPermitActor {
196254 }
197255
198256 fn pop_next_request_if_serviceable ( & mut self ) -> Option < ( oneshot:: Sender < SearchPermit > , u64 ) > {
199- if self . num_warmup_slots_available == 0 {
257+ if self . num_warmup_slots_available == 0
258+ || self . total_memory_budget <= self . total_memory_allocated
259+ {
200260 return None ;
201261 }
202- if let Some ( ( _, next_permit_size) ) = self . permits_requests . front ( )
203- && self . total_memory_allocated + next_permit_size <= self . total_memory_budget
262+ let mut peeked = self . permits_requests . peek_mut ( ) ?;
263+
264+ if let Some ( permit_request) =
265+ peeked. pop_if_smaller_than ( self . total_memory_budget - self . total_memory_allocated )
204266 {
205- return self . permits_requests . pop_front ( ) ;
267+ if peeked. is_empty ( ) {
268+ drop ( peeked) ;
269+ // our modification can only have made our peeked element "higher priority", so the
270+ // element we'll pop must be the one we just put back
271+ self . permits_requests . pop ( ) ;
272+ }
273+ return Some ( permit_request) ;
206274 }
207275 None
208276 }
@@ -362,6 +430,75 @@ mod tests {
362430 }
363431 }
364432
433+ #[ tokio:: test]
434+ async fn test_search_permit_order_with_concurrent_search ( ) {
435+ let permit_provider = SearchPermitProvider :: new ( 4 , ByteSize :: mb ( 100 ) ) ;
436+ let mut all_futures = Vec :: new ( ) ;
437+ let first_batch_of_permits = permit_provider
438+ . get_permits ( repeat_n ( ByteSize :: mb ( 10 ) , 8 ) )
439+ . await ;
440+ assert_eq ! ( first_batch_of_permits. len( ) , 8 ) ;
441+ all_futures. extend (
442+ first_batch_of_permits
443+ . into_iter ( )
444+ . enumerate ( )
445+ . map ( move |( i, fut) | ( ( 1 , i) , fut) ) ,
446+ ) ;
447+
448+ let second_batch_of_permits = permit_provider
449+ . get_permits ( repeat_n ( ByteSize :: mb ( 10 ) , 2 ) )
450+ . await ;
451+ all_futures. extend (
452+ second_batch_of_permits
453+ . into_iter ( )
454+ . enumerate ( )
455+ . map ( move |( i, fut) | ( ( 2 , i) , fut) ) ,
456+ ) ;
457+
458+ let third_batch_of_permits = permit_provider
459+ . get_permits ( repeat_n ( ByteSize :: mb ( 10 ) , 6 ) )
460+ . await ;
461+ all_futures. extend (
462+ third_batch_of_permits
463+ . into_iter ( )
464+ . enumerate ( )
465+ . map ( move |( i, fut) | ( ( 3 , i) , fut) ) ,
466+ ) ;
467+
468+ // not super useful, considering what join set does, but still a tiny bit more sound.
469+ all_futures. shuffle ( & mut rand:: rng ( ) ) ;
470+
471+ let mut join_set = JoinSet :: new ( ) ;
472+ for ( res, fut) in all_futures {
473+ join_set. spawn ( async move {
474+ let permit = fut. await ;
475+ ( res, permit)
476+ } ) ;
477+ }
478+ let mut ordered_result: Vec < ( usize , usize ) > = Vec :: with_capacity ( 20 ) ;
479+ while let Some ( Ok ( ( ( batch_id, order) , _permit) ) ) = join_set. join_next ( ) . await {
480+ ordered_result. push ( ( batch_id, order) ) ;
481+ }
482+
483+ let mut counters = [ 0 ; 4 ] ;
484+ let expected_result: Vec < ( usize , usize ) > = [
485+ 1 , 1 , 1 , 1 , // initial 4 permits
486+ 2 , 2 , 1 , 1 , 1 , 1 , 3 , 3 , 3 , 3 , 3 , 3 ,
487+ ]
488+ . into_iter ( )
489+ . map ( |batch_id| {
490+ let order = counters[ batch_id] ;
491+ counters[ batch_id] += 1 ;
492+ ( batch_id, order)
493+ } )
494+ . collect ( ) ;
495+
496+ // for the first 4 permits, the order is not well defined as they are all granted at once,
497+ // and we poll futures in a random order. We sort them to fix that artifact
498+ ordered_result[ ..4 ] . sort ( ) ;
499+ assert_eq ! ( ordered_result, expected_result) ;
500+ }
501+
365502 #[ tokio:: test]
366503 async fn test_search_permit_early_drops ( ) {
367504 let permit_provider = SearchPermitProvider :: new ( 1 , ByteSize :: mb ( 100 ) ) ;
0 commit comments