@@ -25,6 +25,8 @@ import (
2525 "net/http"
2626 "net/url"
2727 "time"
28+
29+ . "github.com/onsi/gomega"
2830)
2931
3032type RayJobSetup struct {
@@ -54,25 +56,25 @@ type RayClusterClientConfig struct {
5456var _ RayClusterClient = (* rayClusterClient )(nil )
5557
5658type rayClusterClient struct {
57- endpoint url.URL
58- httpClient * http.Client
59- authHeader string
59+ endpoint url.URL
60+ httpClient * http.Client
61+ bearerToken string
6062}
6163
6264type RayClusterClient interface {
6365 CreateJob (job * RayJobSetup ) (* RayJobResponse , error )
6466 GetJobDetails (jobID string ) (* RayJobDetailsResponse , error )
6567 GetJobLogs (jobID string ) (string , error )
66- GetAllJobsData () ([]map [string ]interface {}, error )
67- WaitForJobStatus (jobID string ) ( string , error )
68+ GetJobs () ([]map [string ]interface {}, error )
69+ WaitForJobStatus (test Test , jobID string ) string
6870}
6971
70- func NewRayClusterClient (dashboardEndpoint url.URL , config RayClusterClientConfig , authHeader string ) RayClusterClient {
72+ func NewRayClusterClient (dashboardEndpoint url.URL , config RayClusterClientConfig , bearerToken string ) RayClusterClient {
7173 tr := & http.Transport {
7274 TLSClientConfig : & tls.Config {InsecureSkipVerify : config .SkipTlsVerification },
7375 Proxy : http .ProxyFromEnvironment ,
7476 }
75- return & rayClusterClient {endpoint : dashboardEndpoint , httpClient : & http.Client {Transport : tr }, authHeader : authHeader }
77+ return & rayClusterClient {endpoint : dashboardEndpoint , httpClient : & http.Client {Transport : tr }, bearerToken : bearerToken }
7678}
7779
7880func (client * rayClusterClient ) CreateJob (job * RayJobSetup ) (response * RayJobResponse , err error ) {
@@ -101,15 +103,15 @@ func (client *rayClusterClient) CreateJob(job *RayJobSetup) (response *RayJobRes
101103 return
102104}
103105
104- func (client * rayClusterClient ) GetAllJobsData () ([]map [string ]interface {}, error ) {
106+ func (client * rayClusterClient ) GetJobs () ([]map [string ]interface {}, error ) {
105107 getAllJobsDetailsURL := client .endpoint .String () + "/api/jobs/"
106108
107109 req , err := http .NewRequest (http .MethodGet , getAllJobsDetailsURL , nil )
108110 if err != nil {
109111 return nil , err
110112 }
111- if client .authHeader != "" {
112- req .Header .Set ("Authorization" , "Bearer " + client .authHeader )
113+ if client .bearerToken != "" {
114+ req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
113115 }
114116 resp , err := client .httpClient .Do (req )
115117 if err != nil {
@@ -139,8 +141,8 @@ func (client *rayClusterClient) GetJobDetails(jobID string) (response *RayJobDet
139141 if err != nil {
140142 return nil , err
141143 }
142- if client .authHeader != "" {
143- req .Header .Set ("Authorization" , "Bearer " + client .authHeader )
144+ if client .bearerToken != "" {
145+ req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
144146 }
145147
146148 resp , err := client .httpClient .Do (req )
@@ -167,8 +169,8 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error
167169 if err != nil {
168170 return "" , err
169171 }
170- if client .authHeader != "" {
171- req .Header .Set ("Authorization" , "Bearer " + client .authHeader )
172+ if client .bearerToken != "" {
173+ req .Header .Set ("Authorization" , "Bearer " + client .bearerToken )
172174 }
173175 resp , err := client .httpClient .Do (req )
174176 if err != nil {
@@ -189,32 +191,30 @@ func (client *rayClusterClient) GetJobLogs(jobID string) (logs string, err error
189191 return jobLogs .Logs , err
190192}
191193
192- func (client * rayClusterClient ) WaitForJobStatus (jobID string ) ( string , error ) {
194+ func (client * rayClusterClient ) WaitForJobStatus (test Test , jobID string ) string {
193195 var status string
194- var prevStatus string
195196 fmt .Printf ("Waiting for job to be Succeeded...\n " )
196- var err error
197- var resp * RayJobDetailsResponse
198- for status != "SUCCEEDED" {
199- resp , err = client .GetJobDetails (jobID )
200- if err != nil {
201- time .Sleep (2 * time .Second )
202- continue
203- }
197+
198+ test .Eventually (func () string {
199+ resp , err := client .GetJobDetails (jobID )
200+ test .Expect (err ).ToNot (HaveOccurred ())
204201 statusVal := resp .Status
205202 if statusVal == "SUCCEEDED" || statusVal == "FAILED" {
206203 fmt .Printf ("JobStatus : %s\n " , statusVal )
207- prevStatus = statusVal
208- return prevStatus , err
204+ status = statusVal
205+ return status
209206 }
210- if prevStatus != statusVal && statusVal != "SUCCEEDED" {
207+ if status != statusVal && statusVal != "SUCCEEDED" {
211208 fmt .Printf ("JobStatus : %s...\n " , statusVal )
212- prevStatus = statusVal
209+ status = statusVal
213210 }
214- time .Sleep (3 * time .Second )
215- }
216- if prevStatus != "SUCCEEDED" {
217- err = fmt .Errorf ("Job failed !" )
211+ return status
212+ }, TestTimeoutDouble , 3 * time .Second ).Should (Or (Equal ("SUCCEEDED" ), Equal ("FAILED" )), "Job did not complete within the expected time" )
213+
214+ if status == "SUCCEEDED" {
215+ fmt .Printf ("Job succeeded !\n " )
216+ } else {
217+ fmt .Printf ("Job failed !\n " )
218218 }
219- return prevStatus , err
219+ return status
220220}
0 commit comments