11import asyncio
2+ import copy
23import json
34import os
45import re
56import sys
6- from typing import Dict , Optional , Tuple
7+ from typing import Any , Dict , Optional , Tuple
78
89import requests
910import structlog
@@ -21,7 +22,7 @@ def __init__(self):
2122 self .failed_tests = [] # Track failed tests
2223
2324 def call_codegate (
24- self , url : str , headers : dict , data : dict , provider : str
25+ self , url : str , headers : dict , data : dict , provider : str , method : str = "POST"
2526 ) -> Optional [requests .Response ]:
2627 logger .debug (f"Creating requester for provider: { provider } " )
2728 requester = self .requester_factory .create_requester (provider )
@@ -31,12 +32,12 @@ def call_codegate(
3132 logger .debug (f"Headers: { headers } " )
3233 logger .debug (f"Data: { data } " )
3334
34- response = requester .make_request (url , headers , data )
35+ response = requester .make_request (url , headers , data , method = method )
3536
3637 # Enhanced response logging
3738 if response is not None :
3839
39- if response .status_code != 200 :
40+ if response .status_code not in [ 200 , 201 , 204 ] :
4041 logger .debug (f"Response error status: { response .status_code } " )
4142 logger .debug (f"Response error headers: { dict (response .headers )} " )
4243 try :
@@ -174,7 +175,7 @@ async def run_test(self, test: dict, test_headers: dict) -> bool:
174175
175176 async def _get_testcases (
176177 self , testcases_dict : Dict , test_names : Optional [list [str ]] = None
177- ) -> Dict :
178+ ) -> Dict [ str , Dict [ str , str ]] :
178179 testcases : Dict [str , Dict [str , str ]] = testcases_dict ["testcases" ]
179180
180181 # Filter testcases by provider and test names
@@ -192,24 +193,102 @@ async def _get_testcases(
192193 testcases = filtered_testcases
193194 return testcases
194195
196+ async def _setup_muxing (
197+ self , provider : str , muxing_config : Optional [Dict ]
198+ ) -> Optional [Tuple [str , str ]]:
199+ """
200+ Muxing setup. Create the provider endpoints and the muxing rules
201+
202+ Return
203+ """
204+ # The muxing section was not found in the testcases.yaml file. Nothing to do.
205+ if not muxing_config :
206+ return
207+
208+ # Create the provider endpoint
209+ provider_endpoint = muxing_config .get ("provider_endpoint" )
210+ try :
211+ data_with_api_keys = self .replace_env_variables (provider_endpoint ["data" ], os .environ )
212+ response_create_provider = self .call_codegate (
213+ provider = provider ,
214+ url = provider_endpoint ["url" ],
215+ headers = provider_endpoint ["headers" ],
216+ data = json .loads (data_with_api_keys ),
217+ )
218+ created_provider_endpoint = response_create_provider .json ()
219+ except Exception as e :
220+ logger .warning (f"Could not setup provider endpoint for muxing: { e } " )
221+ return
222+ logger .info ("Created provider endpoint for muixing" )
223+
224+ muxes_rules : Dict [str , Any ] = muxing_config .get ("muxes" , {})
225+ try :
226+ # We need to first update all the muxes with the provider_id
227+ for mux in muxes_rules .get ("rules" , []):
228+ mux ["provider_id" ] = created_provider_endpoint ["id" ]
229+
230+ # The endpoint actually takes a list
231+ self .call_codegate (
232+ provider = provider ,
233+ url = muxes_rules ["url" ],
234+ headers = muxes_rules ["headers" ],
235+ data = muxes_rules .get ("rules" , []),
236+ method = "PUT" ,
237+ )
238+ except Exception as e :
239+ logger .warning (f"Could not setup muxing rules: { e } " )
240+ return
241+ logger .info ("Created muxing rules" )
242+
243+ return muxing_config ["mux_url" ], muxing_config ["trimm_from_testcase_url" ]
244+
245+ async def _augment_testcases_with_muxing (
246+ self , testcases : Dict , mux_url : str , trimm_from_testcase_url : str
247+ ) -> Dict :
248+ """
249+ Augment the testcases with the muxing information. Copy the testcases
250+ and execute them through the muxing endpoint.
251+ """
252+ test_cases_with_muxing = copy .deepcopy (testcases )
253+ for test_id , test_data in testcases .items ():
254+ # Replace the provider in the URL with the muxed URL
255+ rest_of_path = test_data ["url" ].replace (trimm_from_testcase_url , "" )
256+ new_url = f"{ mux_url } { rest_of_path } "
257+ new_test_data = copy .deepcopy (test_data )
258+ new_test_data ["url" ] = new_url
259+ new_test_id = f"{ test_id } _muxed"
260+ test_cases_with_muxing [new_test_id ] = new_test_data
261+
262+ logger .info ("Augmented testcases with muxing" )
263+ return test_cases_with_muxing
264+
195265 async def _setup (
196- self , testcases_file : str , test_names : Optional [list [str ]] = None
266+ self , testcases_file : str , provider : str , test_names : Optional [list [str ]] = None
197267 ) -> Tuple [Dict , Dict ]:
198268 with open (testcases_file , "r" ) as f :
199- testcases_dict = yaml .safe_load (f )
269+ testcases_dict : Dict = yaml .safe_load (f )
200270
201271 headers = testcases_dict ["headers" ]
202272 testcases = await self ._get_testcases (testcases_dict , test_names )
203- return headers , testcases
273+ muxing_result = await self ._setup_muxing (provider , testcases_dict .get ("muxing" , {}))
274+ # We don't have any muxing setup, return the headers and testcases
275+ if not muxing_result :
276+ return headers , testcases
277+
278+ mux_url , trimm_from_testcase_url = muxing_result
279+ test_cases_with_muxing = await self ._augment_testcases_with_muxing (
280+ testcases , mux_url , trimm_from_testcase_url
281+ )
282+
283+ return headers , test_cases_with_muxing
204284
205285 async def run_tests (
206286 self ,
207287 testcases_file : str ,
208288 provider : str ,
209289 test_names : Optional [list [str ]] = None ,
210290 ) -> bool :
211- headers , testcases = await self ._setup (testcases_file , test_names )
212-
291+ headers , testcases = await self ._setup (testcases_file , provider , test_names )
213292 if not testcases :
214293 logger .warning (
215294 f"No tests found for provider { provider } in file: { testcases_file } "
0 commit comments