3939from .autotuner_test_utils import AutoTunerTestUtils , accepted_rc
4040
4141from contextlib import contextmanager
42+ from ray .tune import ExperimentAnalysis
4243
4344cur_dir = os .path .dirname (os .path .abspath (__file__ ))
44- src_dir = os .path .join (cur_dir , "../src" )
45- orfs_dir = os .path .join (cur_dir , "../../../flow" )
46- os .chdir (src_dir )
45+ orfs_flow_dir = os .path .join (cur_dir , "../../../flow" )
46+
47+ # Maximum time (seconds) to wait for trials to start producing results.
48+ POLL_TIMEOUT = 300
49+ # Interval (seconds) between status polls.
50+ POLL_INTERVAL = 15
51+ # Maximum time (seconds) to wait for Ray cluster to shut down.
52+ RAY_SHUTDOWN_TIMEOUT = 120
4753
4854
4955@contextmanager
@@ -55,70 +61,160 @@ def managed_process(*args, **kwargs):
5561 try :
5662 yield proc
5763 finally :
58- if proc .poll () is None : # If the process is still running
59- proc .kill () # Forcefully kill it
64+ if proc .poll () is None :
65+ proc .kill ()
66+ proc .wait ()
67+
68+
69+ def get_experiment_status (experiment_dir ):
70+ """
71+ Check the status of a Ray Tune experiment by reading its directory.
72+
73+ Returns a dict with:
74+ - state: "not_started", "running", "finished"
75+ - num_trials: number of trials found
76+ - num_completed: number of trials that reported results
77+ """
78+ status = {
79+ "state" : "not_started" ,
80+ "num_trials" : 0 ,
81+ "num_completed" : 0 ,
82+ }
83+
84+ if not os .path .isdir (experiment_dir ):
85+ return status
86+
87+ # Check for experiment state file (created by Ray Tune)
88+ state_files = [
89+ f
90+ for f in os .listdir (experiment_dir )
91+ if f .startswith ("experiment_state" ) and f .endswith (".json" )
92+ ]
93+ if not state_files :
94+ return status
95+
96+ try :
97+ analysis = ExperimentAnalysis (experiment_dir )
98+ results = analysis .results
99+ status ["num_trials" ] = len (results )
100+ status ["num_completed" ] = sum (1 for r in results .values () if r is not None )
101+
102+ if status ["num_completed" ] == 0 :
103+ status ["state" ] = "running"
104+ elif status ["num_completed" ] < status ["num_trials" ]:
105+ status ["state" ] = "running"
106+ else :
107+ status ["state" ] = "finished"
108+ except Exception :
109+ # Experiment directory exists but state is not yet readable.
110+ status ["state" ] = "running"
111+
112+ return status
113+
114+
115+ def stop_ray_cluster (timeout = RAY_SHUTDOWN_TIMEOUT ):
116+ """
117+ Stop the Ray cluster, retrying until no nodes remain or timeout is reached.
118+ """
119+ start = time .time ()
120+ while time .time () - start < timeout :
121+ status_proc = subprocess .run (
122+ "ray status" , shell = True , capture_output = True , text = True
123+ )
124+ no_nodes = status_proc .returncode != 0
125+
126+ stop_proc = subprocess .run (
127+ "ray stop" , shell = True , capture_output = True , text = True
128+ )
129+ stop_ok = stop_proc .returncode in accepted_rc
130+
131+ if no_nodes and stop_ok :
132+ return True
133+ time .sleep (5 )
134+
135+ raise RuntimeError (f"Failed to stop Ray cluster within { timeout } seconds" )
60136
61137
62138class ResumeCheck (unittest .TestCase ):
63- # only test 1 platform/design.
139+ # Only test 1 platform/design.
64140 platform = "asap7"
65141 design = "gcd"
66142 samples = 5
67143 iterations = 2
144+ experiment_name = "test-resume"
68145
69146 def setUp (self ):
70147 self .config = os .path .join (
71- orfs_dir , "designs" , self .platform , self .design , "autotuner.json"
148+ orfs_flow_dir ,
149+ "designs" ,
150+ self .platform ,
151+ self .design ,
152+ "autotuner.json" ,
153+ )
154+ self .experiment_dir = os .path .join (
155+ orfs_flow_dir ,
156+ "logs" ,
157+ self .platform ,
158+ self .design ,
159+ self .experiment_name ,
72160 )
73161 self .jobs = self .samples
74162 self .num_cpus = os .cpu_count ()
75163
76- # How it works: Say we have 5 samples and 5 iterations.
77- # If we want to limit to only 5 trials (and avoid any parallelism magic by Ray)
78- # We can set resources_per_trial = NUM_CORES/5 = 3.2 (fractional resources_per_trial are allowed!)
79-
80- # Cast to 1 decimal place
164+ # Fractional resources_per_trial avoids parallelism issues with Ray.
81165 res_per_trial = float ("{:.1f}" .format (self .num_cpus / self .samples ))
82166 options = ["" , "--resume" ]
83- self .exec = AutoTunerTestUtils .get_exec_cmd ()
167+ self .executable = AutoTunerTestUtils .get_exec_cmd ()
84168 self .commands = [
85- f"{ self .exec } "
169+ f"{ self .executable } "
86170 f" --design { self .design } "
87171 f" --platform { self .platform } "
88172 f" --config { self .config } "
89173 f" --jobs { self .jobs } "
90- f" --experiment test-resume"
91- f" tune --iterations { self .iterations } --samples { self .samples } "
174+ f" --experiment { self .experiment_name } "
175+ f" tune --iterations { self .iterations } "
176+ f" --samples { self .samples } "
92177 f" --resources_per_trial { res_per_trial } "
93178 f" { c } "
94179 for c in options
95180 ]
96181
97182 def test_tune_resume (self ):
98- # Goal is to first run the first config (without resume) and then run the second config (with resume)
99- # and check if the run is able to complete.
100-
101- # Run the first config asynchronously.
102- print ("Running the first config" )
103- with managed_process (self .commands [0 ], shell = True ) as proc :
104- time .sleep (120 )
105-
106- # Keep trying to stop the ray cluster until it is stopped
107- while 1 :
108- proc = subprocess .run ("ray status" , shell = True )
109- no_nodes = proc .returncode != 0
110- proc = subprocess .run ("ray stop" , shell = True )
111- successful = proc .returncode in accepted_rc
112-
113- if no_nodes and successful :
114- break
115- time .sleep (10 )
116-
117- # Run the second config to completion
118- print ("Running the second config" )
119- proc = subprocess .run (self .commands [1 ], shell = True )
183+ # Step 1: Run the first config (without --resume) asynchronously.
184+ # Wait until at least one trial has completed, then kill it.
185+ print ("Step 1: Starting initial tuning run" )
186+ with managed_process (self .commands [0 ].split ()) as proc :
187+ start = time .time ()
188+ while time .time () - start < POLL_TIMEOUT :
189+ status = get_experiment_status (self .experiment_dir )
190+ print (
191+ f" Status: { status ['state' ]} , "
192+ f"trials: { status ['num_trials' ]} , "
193+ f"completed: { status ['num_completed' ]} "
194+ )
195+ if status ["num_completed" ] > 0 :
196+ print (
197+ f" { status ['num_completed' ]} trial(s) completed, "
198+ f"stopping initial run"
199+ )
200+ break
201+ time .sleep (POLL_INTERVAL )
202+ else :
203+ self .fail (f"No trials completed within { POLL_TIMEOUT } seconds" )
204+
205+ # Step 2: Stop the Ray cluster cleanly.
206+ print ("Step 2: Stopping Ray cluster" )
207+ stop_ray_cluster ()
208+ print (" Ray cluster stopped" )
209+
210+ # Step 3: Run the second config (with --resume) to completion.
211+ print ("Step 3: Resuming tuning run" )
212+ proc = subprocess .run (self .commands [1 ].split ())
120213 successful = proc .returncode in accepted_rc
121- self .assertTrue (successful )
214+ self .assertTrue (
215+ successful ,
216+ f"Resume run failed with return code { proc .returncode } " ,
217+ )
122218
123219
124220if __name__ == "__main__" :
0 commit comments