Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 71 additions & 1 deletion Framework/MainDriverApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,76 @@ def run_test_case(
CommonUtil.CreateJsonReport(TCInfo=after_execution_dict)
return "passed"

# for sending variables without dom after command execution
def send_new_variables():
try:
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
variables = []
max_threshold = 50000
for var_name in shared.shared_variables:
if var_name.startswith("__") and var_name.endswith("__"):
continue
var_value = shared.shared_variables[var_name]
try:
if len(json.dumps(var_value)) > max_threshold:
builder = SchemaBuilder()
builder.add_object(var_value)
schema = builder.to_schema()
if len(json.dumps(schema)) <= max_threshold:
variables.append({
"type": "json_schema",
"variable_name": var_name,
"variable_value": schema,
"description": "",
})
else:
variables.append({
"type": "json_object",
"variable_name": var_name,
"variable_value": var_value,
"description": "",
})
except (json.decoder.JSONDecodeError, TypeError):
try:
dir_ = {}
for attr_name in dir(var_value):
if attr_name.startswith('__'):
continue
try:
attr_value = getattr(var_value, attr_name)
dir_[attr_name] = str(type(attr_value))
except Exception: # ignore getattr errors
pass
variables.append({
"type": f"non_json: {str(var_value)}",
"variable_name": var_name,
"variable_value": dir_,
"description": "",
})
except Exception as e:
CommonUtil.ExecLog(sModuleInfo, str(e), 2)
except Exception as e:
CommonUtil.ExecLog(sModuleInfo, str(e), 2)

dom = None

data = {
"variables": variables,
"dom_web": {"dom": dom},
"node_id": shared.Get_Shared_Variables('node_id').lower()
}
res = RequestFormatter.request("post",
RequestFormatter.form_uri("node_ai_contents/"),
data=json.dumps(data),
verify=False
)
if res.status_code == 500:
CommonUtil.ExecLog(sModuleInfo, res.json()["info"], 2)
elif res.status_code == 404:
CommonUtil.ExecLog(sModuleInfo, 'The chatbot API does not exist, server upgrade needed', 2)
return
except Exception as e:
CommonUtil.ExecLog(sModuleInfo, str(e), 2)

def send_dom_variables():
try:
Expand Down Expand Up @@ -1949,7 +2019,7 @@ def main(device_dict, all_run_id_info):
shared.Set_Shared_Variables("zeuz_enable_variable_logging", "False")

shared.Set_Shared_Variables("run_id", run_id)
shared.Set_Shared_Variables("node_id", CommonUtil.MachineInfo().getLocalUser())
shared.Set_Shared_Variables("node_id", CommonUtil.MachineInfo().getLocalUser(), True) # so node id can't be changed and variable updates are in sync

send_log_file_only_for_fail = ConfigModule.get_config_value("RunDefinition", "upload_log_file_only_for_fail")
send_log_file_only_for_fail = False if send_log_file_only_for_fail.lower() == "false" else True
Expand Down
207 changes: 207 additions & 0 deletions Framework/Utilities/repl_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
import json
import copy
import ssl
import traceback
from threading import Thread
import io
import contextlib
import sys

import websocket
import time

from Framework.Built_In_Automation.Shared_Resources import BuiltInFunctionSharedResources as sr
from Framework.MainDriverApi import send_new_variables


ws = None
connected = False
_stop = False



def _send(msg):
global ws
try:
if ws is None:
return
if not isinstance(msg, str):
msg = json.dumps(msg)
ws.send(msg)
except Exception:
pass


def close():
global ws, connected, _stop
connected = False
_stop = True
if ws is not None:
try:
ws.close(status=1000, reason="Closing REPL")
except Exception:
pass


def on_message(ws, message):
try:
data = json.loads(message)
except Exception:
print(f"[REPL] on_message non-JSON frame ignored: {message[:120]}")
return
if not isinstance(data, dict) or data.get("type") != "command":
return

code = data.get("msg", "")
# snapshot protected values
protected_list = []
protected_snapshot = {}
pre_existing = set()
try:
protected_list = list(getattr(sr, "protected_variables", []) or [])
for name in protected_list:
if name in sr.shared_variables:
protected_snapshot[name] = copy.deepcopy(sr.shared_variables[name])
pre_existing.add(name)
except Exception:
pass
output_text = ""
error_text = None
_preview = code[:200].replace("\n", "\\n")
print("[REPL] received command:", _preview)

buf = io.StringIO()
try:
with contextlib.redirect_stdout(buf):
# Try eval first for expressions
try:
result = None
try:
result = eval(code, sr.shared_variables, sr.shared_variables)
except SyntaxError:
# Not an expression so execute block
exec(code, sr.shared_variables, sr.shared_variables)
except NameError as ne:
ident = code.strip()
# see if single identifier referencing shared variable
if ident.isidentifier():
if ident in sr.shared_variables:
result = sr.shared_variables[ident]
else:
raise
else:
raise
if result is not None:
print(result)
except Exception:
raise
output_text = buf.getvalue().strip()
except Exception:
error_text = traceback.format_exc()
finally:
buf.close()
print("[REPL] execution completed:", output_text)

# restore protected values if tampered
tampered = []
try:
for name in protected_list:
if name in pre_existing:
pre_val = protected_snapshot.get(name, None)
if name not in sr.shared_variables or sr.shared_variables.get(name) != pre_val:
sr.shared_variables[name] = pre_val
tampered.append(name)
else:
# remove if did not exist before
if name in sr.shared_variables:
try:
del sr.shared_variables[name]
except Exception:
sr.shared_variables.pop(name, None)
tampered.append(name)
except Exception:
pass

if error_text:
_send({"type": "error", "msg": error_text})
else:
# add warning line if any protected var was tampered
if tampered:
if output_text:
output_text = output_text + "\n" + "\n".join(
f"(read-only) Reverted attempt to modify {n}" for n in tampered
)
else:
output_text = "\n".join(f"(read-only) Reverted attempt to modify {n}" for n in tampered)
_send({"type": "output", "msg": output_text})

# republish variables back to server so UI can refresh
try:
send_new_variables()
except Exception:
pass

# Signal completion so UI can refresh variables after execution fully finishes
try:
_send({"type": "output", "msg": "__done__"})
except Exception:
pass


def on_error(ws, error):
print(f"[REPL] on_error: {error}")
return


def on_close(ws=None, _a=None, _b=None):
global connected
connected = False
print("[REPL] connection closed")


def on_open(ws):
global connected
connected = True
print("[REPL] on_open: connected, sending status ping")
try:
_send({"type": "output", "msg": "__status__:node_online"})
except Exception:
pass


def _run_loop(url):
global ws, _stop
while not _stop:
try:
ws = websocket.WebSocketApp(
url,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws.on_open = on_open

ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, ping_interval=20, ping_timeout=10)

except Exception as e:
print(f"[REPL] exception in run loop: {e}")
if _stop:
break
time.sleep(5)


def connect(url):
global connected, _stop
try:
_stop = False
print(f"[REPL] connect() invoked url={url}")
sys.stdout.flush()
t = Thread(target=_run_loop, args=(url,))
t.daemon = True
t.start()
except Exception as outer:
print(f"[REPL] connect() exception: {outer}")
sys.stdout.flush()

def ping_state():
return {"connected": connected, "ws_is_none": ws is None, "stop": _stop}
17 changes: 17 additions & 0 deletions node_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ def adjust_python_path():
)
from Framework.Utilities import ConfigModule # noqa: E402
from Framework.Utilities import live_log_service # noqa: E402

from Framework.Utilities import repl_service # noqa: E402

from Framework.node_server_state import STATE, LoginCredentials # noqa: E402
from server import main as node_server # noqa: E402

Expand Down Expand Up @@ -382,6 +385,17 @@ def live_log_service_addr():
protocol = "ws"
server_addr = f"{protocol}://{server_url.netloc}"
return f"{server_addr}/faster/v1/ws/live_log/send/{node_id}"

def repl_service_addr():
server_url = urlparse(
ConfigModule.get_config_value("Authentication", "server_address")
)
if server_url.scheme == "https":
protocol = "wss"
else:
protocol = "ws"
server_addr = f"{protocol}://{server_url.netloc}"
return f"{server_addr}/faster/v1/ws/repl/send/{node_id}"

def deploy_srv_addr():
server_url = urlparse(
Expand All @@ -392,6 +406,9 @@ def deploy_srv_addr():
# Connect to the live log service.
live_log_service.connect(live_log_service_addr())

# Connect to the REPL service.
repl_service.connect(repl_service_addr())

# WARNING: For local development only.
# if "localhost" in host:
# deploy_srv_addr = deploy_srv_addr.replace("8000", "8300")
Expand Down