@@ -45,8 +45,6 @@ use chrono::Utc;
4545use cid:: Cid ;
4646use fil_actors_shared:: fvm_ipld_bitfield:: BitField ;
4747use fil_actors_shared:: v10:: runtime:: DomainSeparationTag ;
48- use futures:: stream:: FuturesUnordered ;
49- use futures:: stream:: StreamExt as _;
5048use fvm_ipld_blockstore:: Blockstore ;
5149use ipld_core:: ipld:: Ipld ;
5250use itertools:: Itertools as _;
@@ -68,6 +66,7 @@ use std::{
6866 time:: Duration ,
6967} ;
7068use tokio:: sync:: Semaphore ;
69+ use tokio:: task:: JoinSet ;
7170use tracing:: debug;
7271
7372const COLLECTION_SAMPLE_SIZE : usize = 5 ;
@@ -2932,11 +2931,12 @@ pub(super) async fn run_tests(
29322931 test_criteria_overrides : & [ TestCriteriaOverride ] ,
29332932 report_dir : Option < PathBuf > ,
29342933 report_mode : ReportMode ,
2934+ n_retries : usize ,
29352935) -> anyhow:: Result < ( ) > {
29362936 let forest = Into :: < Arc < rpc:: Client > > :: into ( forest) ;
29372937 let lotus = Into :: < Arc < rpc:: Client > > :: into ( lotus) ;
29382938 let semaphore = Arc :: new ( Semaphore :: new ( max_concurrent_requests) ) ;
2939- let mut futures = FuturesUnordered :: new ( ) ;
2939+ let mut tasks = JoinSet :: new ( ) ;
29402940
29412941 let filter_list = if let Some ( filter_file) = & filter_file {
29422942 FilterList :: new_from_file ( filter_file) ?
@@ -2988,41 +2988,60 @@ pub(super) async fn run_tests(
29882988 }
29892989
29902990 // Acquire a permit from the semaphore before spawning a test
2991- let permit = semaphore. clone ( ) . acquire_owned ( ) . await ? ;
2991+ let semaphore = semaphore. clone ( ) ;
29922992 let forest = forest. clone ( ) ;
29932993 let lotus = lotus. clone ( ) ;
2994- let future = tokio:: spawn ( async move {
2995- let test_result = test. run ( & forest, & lotus) . await ;
2996- drop ( permit) ; // Release the permit after test execution
2997- ( test, test_result)
2994+ let test_criteria_overrides = test_criteria_overrides. to_vec ( ) ;
2995+ tasks. spawn ( async move {
2996+ let mut n_retries_left = n_retries;
2997+ let mut backoff_secs = 2 ;
2998+ loop {
2999+ {
3000+ // Ignore the error since 'An acquire operation can only fail if the semaphore has been closed'
3001+ let _permit = semaphore. acquire ( ) . await ;
3002+ let test_result = test. run ( & forest, & lotus) . await ;
3003+ let success =
3004+ evaluate_test_success ( & test_result, & test, & test_criteria_overrides) ;
3005+ if success || n_retries_left == 0 {
3006+ return ( success, test, test_result) ;
3007+ }
3008+ // Release the semaphore before sleeping
3009+ }
3010+ // Sleep before each retry
3011+ tokio:: time:: sleep ( Duration :: from_secs ( backoff_secs) ) . await ;
3012+ n_retries_left = n_retries_left. saturating_sub ( 1 ) ;
3013+ backoff_secs = backoff_secs. saturating_mul ( 2 ) ;
3014+ }
29983015 } ) ;
2999-
3000- futures. push ( future) ;
30013016 }
30023017
30033018 // If no tests to run after filtering, return early without saving/printing
3004- if futures . is_empty ( ) {
3019+ if tasks . is_empty ( ) {
30053020 return Ok ( ( ) ) ;
30063021 }
30073022
3008- while let Some ( Ok ( ( test, test_result) ) ) = futures. next ( ) . await {
3009- let method_name = test. request . method_name . clone ( ) ;
3010- let success = evaluate_test_success ( & test_result, & test, test_criteria_overrides) ;
3023+ while let Some ( result) = tasks. join_next ( ) . await {
3024+ match result {
3025+ Ok ( ( success, test, test_result) ) => {
3026+ let method_name = test. request . method_name . clone ( ) ;
30113027
3012- report_builder. track_test_result (
3013- method_name. as_ref ( ) ,
3014- success,
3015- & test_result,
3016- & test. request . params ,
3017- ) ;
3028+ report_builder. track_test_result (
3029+ method_name. as_ref ( ) ,
3030+ success,
3031+ & test_result,
3032+ & test. request . params ,
3033+ ) ;
30183034
3019- // Dump test data if configured
3020- if let ( Some ( dump_dir) , Some ( test_dump) ) = ( & dump_dir, & test_result. test_dump ) {
3021- dump_test_data ( dump_dir, success, test_dump) ?;
3022- }
3035+ // Dump test data if configured
3036+ if let ( Some ( dump_dir) , Some ( test_dump) ) = ( & dump_dir, & test_result. test_dump ) {
3037+ dump_test_data ( dump_dir, success, test_dump) ?;
3038+ }
30233039
3024- if !success && fail_fast {
3025- break ;
3040+ if !success && fail_fast {
3041+ break ;
3042+ }
3043+ }
3044+ Err ( e) => tracing:: warn!( "{e}" ) ,
30263045 }
30273046 }
30283047
0 commit comments