Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions Framework/MainDriverApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ def send_dom_variables():
res = RequestFormatter.request("post",
RequestFormatter.form_uri("node_ai_contents/"),
data=json.dumps(data),
verify=False
timeout=600
)
if res.status_code == 500:
CommonUtil.ExecLog(sModuleInfo, res.json()["info"], 2)
Expand Down Expand Up @@ -1465,7 +1465,7 @@ def upload_step_report(run_id: str, tc_id: str, step_seq: int, step_id: int, exe
"execution_detail": execution_detail,
})
},
verify=False
timeout=600
)
duration = round(res.elapsed.total_seconds(), 2)
# if res.status_code == 200:
Expand Down Expand Up @@ -1506,17 +1506,18 @@ def upload_reports_and_zips(temp_ini_file, run_id):
"post",
RequestFormatter.form_uri("create_report_log_api/"),
data={"execution_report": json.dumps(tc_report)},
verify=False
timeout=600
)
else:
res = RequestFormatter.request("post",
RequestFormatter.form_uri("create_report_log_api/"),
data={
"execution_report": json.dumps(tc_report),
"processed_tc_id":processed_tc_id
},
files=[("file",perf_report_html)],
verify=False)
RequestFormatter.form_uri("create_report_log_api/"),
data={
"execution_report": json.dumps(tc_report),
"processed_tc_id": processed_tc_id
},
files=[("file", perf_report_html)],
timeout=600
)

if res.status_code == 200:
CommonUtil.ExecLog(sModuleInfo, f"Successfully uploaded the execution report of run_id {run_id}", 1)
Expand Down Expand Up @@ -1582,10 +1583,11 @@ def upload_reports_and_zips(temp_ini_file, run_id):
for zips in opened_zips:
files_list.append(("file",zips))
res = RequestFormatter.request("post",
RequestFormatter.form_uri("save_log_and_attachment_api/"),
files=files_list,
data={"machine_name": Userid},
verify=False)
RequestFormatter.form_uri("save_log_and_attachment_api/"),
files=files_list,
data={"machine_name": Userid},
timeout=600
)
if res.status_code == 200:
try:
res_json = res.json()
Expand Down Expand Up @@ -1629,19 +1631,21 @@ def retry_failed_report_upload():
report_json_path = failed_report_dir / folder / 'report.json'
report_json = json.load(open(report_json_path))
if not report_json.get('perf_filepath'):
res = RequestFormatter.request("post",
res = RequestFormatter.request(
"post",
RequestFormatter.form_uri("create_report_log_api/"),
data={"execution_report": report_json.get('execution_report')},
verify=False)
timeout=600
)
else:
res = RequestFormatter.request("post",
RequestFormatter.form_uri("create_report_log_api/"),
data={"execution_report": report_json.get('execution_report'),
"processed_tc_id":report_json.get('processed_tc_id')

},
},
files=[("file",open(failed_report_dir / folder / 'files' /report_json.get('perf_filepath'),'rb'))],
verify=False)
timeout=600
)

if res.status_code == 200:
CommonUtil.ExecLog(sModuleInfo, f"Successfully uploaded the execution report of run_id {report_json.get('run_id')}", 1)
Expand Down Expand Up @@ -1705,7 +1709,7 @@ def download_attachment(attachment_info: Dict[str, Any]):
file_name = url[file_name_start_pos:]
file_path = attachment_info["download_dir"] / file_name

r = RequestFormatter.request("get", url, stream=True)
r = RequestFormatter.request("get", url, stream=True, timeout=600)
if r.status_code == requests.codes.ok:
with open(file_path, 'wb') as f:
for data in r.iter_content(chunk_size=512*1024):
Expand Down
26 changes: 18 additions & 8 deletions Framework/Utilities/RequestFormatter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -- coding: utf-8 --
# -- coding: cp1252 --

import asyncio
from . import ConfigModule
import os
import requests
Expand Down Expand Up @@ -169,13 +169,27 @@ def request(*args, **kwargs):
"""
request() is a wrapper for requests.request which handles automatic session
management.
Default values:
verify = False
timeout = 70 sec
"""
renew_token_with_expiry_check()
if "verify" not in kwargs:
kwargs["verify"] = False
if "timeout" not in kwargs:
kwargs["timeout"] = 70


return session.request(*args, **kwargs)

# async wrapper
async def async_request(*args, **kwargs):
"""
Runs the blocking request() in a worker thread
so the event loop is not blocked.
"""
return await asyncio.to_thread(request, *args, **kwargs)


def Post(resource_path, payload=None, **kwargs):
renew_token_with_expiry_check()
Expand Down Expand Up @@ -210,16 +224,12 @@ def Get(resource_path, payload=None, **kwargs):
**kwargs
).json()

except requests.exceptions.RequestException:
print(
"Exception in UpdateGet: Authentication Failed. Please check your server, username and password. "
"Please include full server name. Example: https://zeuz.zeuz.ai.\n"
"If you are using IP Address: Type in just the IP without http. Example: 12.15.10.6"
)
except requests.exceptions.RequestException as e:
print(e)
return ""

except Exception as e:
print("Get Exception: {}".format(e))
print(e)
return {}


Expand Down
2 changes: 1 addition & 1 deletion Framework/attachment_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def download_attachment(self, url: str):

headers = RequestFormatter.add_api_key_to_headers({})

with RequestFormatter.request("get", url, stream=True, verify=False,**headers) as r:
with RequestFormatter.request("get", url, stream=True, timeout=600,**headers) as r:
r.raise_for_status()
with open(path_to_downloaded_attachment, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
Expand Down
136 changes: 65 additions & 71 deletions Framework/deploy_handler/long_poll_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from colorama import Fore
from pathlib import Path
from urllib.parse import urlparse
import requests

from Framework.Utilities import RequestFormatter, ConfigModule, CommonUtil
from Framework.Utilities.RequestFormatter import REQUEST_TIMEOUT
Expand Down Expand Up @@ -237,7 +238,6 @@ def respond_to_key_request(self, request_id: str, private_key_pem: str) -> None:
"donor_node_id": node_id,
"private_key": private_key_pem
},
verify=False
)

if response.ok:
Expand All @@ -253,80 +253,74 @@ def respond_to_key_request(self, request_id: str, private_key_pem: str) -> None:

async def run(self, host: str) -> None:
reconnect = False
server_online = False
async with httpx.AsyncClient(timeout=httpx.Timeout(70.0), verify=False) as client:
while True:
if STATE.reconnect_with_credentials is not None:
print_online = False
while True:
if STATE.reconnect_with_credentials is not None:
break

if reconnect:
await asyncio.sleep(random.randint(1, 3))

await self.on_connect_callback(reconnect)

try:
reconnect = True
resp = await RequestFormatter.async_request("get", host, timeout=70)
if resp is None:
break

if reconnect:
if server_online:
await asyncio.sleep(0.1)
else:
await asyncio.sleep(random.randint(1, 3))
if resp.content.startswith(self.ERROR_PREFIX):
self.on_error(resp.content)
continue

await self.on_connect_callback(reconnect)
if resp.ok and print_online:
print_online = False
node_id = CommonUtil.MachineInfo().getLocalUser().lower()
print(f"🟢 {node_id} back to online")

try:
reconnect = True
resp = await self.fetch(host, client)
if resp is None:
break

if resp.content.startswith(self.ERROR_PREFIX):
server_online = False
self.on_error(resp.content)
continue

if resp.status_code == httpx.codes.NO_CONTENT:
server_online = False
continue

if not resp.is_success:
server_online = False
print(
"[deploy] facing difficulty communicating with the server, status code:",
resp.status_code,
" | reconnecting",
)
try:
print(Fore.YELLOW + str(resp.content))
except Exception:
pass

# Encountered a server error, retry.
await asyncio.sleep(random.randint(1, 3))
return

should_quit = await self.on_message(resp.content)
if should_quit:
break

reconnect = False
server_online = True
except httpx.ReadTimeout:
pass
except Exception:
traceback.print_exc()
print("[deploy] RETRYING...")

async def fetch(self, host: str, client: httpx.AsyncClient) -> httpx.Response | None:
try:
api_key = ConfigModule.get_config_value("Authentication", "api-key")
headers = {"X-API-KEY": api_key}

while True:
if resp.status_code == httpx.codes.NO_CONTENT:
continue

if resp.status_code == httpx.codes.BAD_GATEWAY:
print_online = True
print(Fore.YELLOW + "Server offline. Retrying after 30s")
await asyncio.sleep(30)
continue

if not resp.ok:
print(
"[deploy] Request Error, status code:",
resp.status_code,
"| reconnecting",
)

# Encountered a server error, retry.
await asyncio.sleep(random.randint(1, 3))
continue

should_quit = await self.on_message(resp.content)
if should_quit:
break

reconnect = False
except (
requests.exceptions.ConnectTimeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectionError,
) as e:
# Nginx down, VM down, network issue, docker-compose stopped
if STATE.reconnect_with_credentials is not None:
return None

try:
resp = await client.get(host, headers=headers)
return resp
except asyncio.CancelledError:
print_online = True
print(e)
print(Fore.YELLOW + "Retrying after 30s")
await asyncio.sleep(30)

except Exception as e:
if STATE.reconnect_with_credentials is not None:
return None
except Exception:
if STATE.reconnect_with_credentials is not None:
return None
await asyncio.sleep(0.1)
except Exception:
return None
print_online = True
print(e)
print(Fore.YELLOW + "Retrying after 30s")
await asyncio.sleep(30)

Loading
Loading